# MNIST convolutional neural networks

* Make a networks like LeNet5 structure with MNIST data
* input pipeline: `tf.data`
* `Eager execution`
* `Functional API`

## Import modules

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import time

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import clear_output

import tensorflow as tf
from tensorflow.keras import layers
tf.enable_eager_execution()

os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [None]:
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

### Import data

In [None]:
# Load training and eval data from tf.keras
(train_data, train_labels), (test_data, test_labels) = \
    tf.keras.datasets.mnist.load_data()

train_data = train_data / 255.
train_data = train_data.reshape([-1, 28, 28, 1])
train_data = train_data.astype(np.float32)
train_labels = train_labels.astype(np.int32)

test_data = test_data / 255.
test_data = test_data.reshape([-1, 28, 28, 1])
test_data = test_data.astype(np.float32)
test_labels = test_labels.astype(np.int32)

### Show the MNIST

In [None]:
index = 219
print("label = {}".format(train_labels[index]))
plt.imshow(train_data[index][...,0])
plt.colorbar()
#plt.gca().grid(False)
plt.show()

## Set up dataset with `tf.data`

### input pipeline `tf.data.Dataset` and Transformation

In [None]:
def one_hot_label(image, label):
  label = tf.one_hot(label, depth=10)
  return image, label

In [None]:
tf.set_random_seed(219)
batch_size = 32
max_epochs = 1

# for train
N = len(train_data)
train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_labels))
train_dataset = train_dataset.shuffle(buffer_size = 10000)
train_dataset = train_dataset.map(one_hot_label)
train_dataset = train_dataset.batch(batch_size = batch_size)
print(train_dataset)

# for test
test_dataset = tf.data.Dataset.from_tensor_slices((test_data, test_labels))
test_dataset = test_dataset.map(one_hot_label)
test_dataset = test_dataset.batch(batch_size = batch_size)
print(test_dataset)

## Create the model

* Use `tf.keras.layers`

* Use `tf.keras.Sequential()` API (01.mnist.LeNet5.ipynb)
```python
model = tf.keras.Sequential()
model.add(layers.Conv2D(filters=32, kernel_size=[5, 5], padding='same', activation='relu'))
model.add(layers.MaxPool2D())
model.add(layers.Conv2D(filters=64, kernel_size=[5, 5], padding='same', activation='relu'))
model.add(layers.MaxPool2D())
model.add(layers.Flatten())
model.add(layers.Dense(1024, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
```

In [None]:
class MNISTModel(tf.keras.Model):
  def __init__(self):
    super(MNISTModel, self).__init__()
    self.conv1 = layers.Conv2D(filters=32, kernel_size=[5, 5], padding='same', activation='relu')
    self.pool1 = layers.MaxPool2D()
    self.conv2 = layers.Conv2D(filters=64, kernel_size=[5, 5], padding='same', activation='relu')
    self.pool2 = layers.MaxPool2D()
    self.flatten = layers.Flatten()
    self.dense1 = layers.Dense(units=1024, activation='relu')
    self.dense2 = layers.Dense(units=10, activation='softmax')

  def call(self, inputs):
    """Run the model."""
    conv1 = self.conv1(inputs)
    pool1 = self.pool1(conv1)
    conv2 = self.conv2(pool1)
    pool2 = self.pool2(conv2)
    flatten = self.flatten(pool2)
    dense1 = self.dense1(flatten)
    logits = self.dense2(dense1)
    
    return logits

In [None]:
model = MNISTModel()

In [None]:
# without training, just inference a model in eager execution:
for images, labels in train_dataset.take(1):
  predictions = model(images[0:1])
  print("Predictions: ", predictions.numpy())

In [None]:
model.summary()

## Train a model

### Define loss and accuray functions

In [None]:
loss_object = tf.keras.losses.CategoricalCrossentropy()
acc_object = tf.keras.metrics.CategoricalAccuracy()

### Define a optimizer

In [None]:
# use Adam optimizer 
optimizer = tf.train.AdamOptimizer(1e-4)

# record loss and accuracy for every epoch
mean_loss = tf.keras.metrics.Mean("loss")
mean_accuracy = tf.keras.metrics.Mean("accuracy")

# save loss and accuracy history for plot
loss_history = []
accuracy_history = [(0, 0.0)]

### Train a model

In [None]:
print("start training!")
global_step = 0
num_batches_per_epoch = int(N / batch_size)

for epoch in range(max_epochs):
  
  for step, (images, labels) in enumerate(train_dataset):
    start_time = time.time()
    
    with tf.GradientTape() as tape:
      predictions = model(images)
      loss_value = loss_object(labels, predictions)
      acc_value = acc_object(labels, predictions)

    gradients = tape.gradient(loss_value, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    global_step += 1
    
    mean_loss(loss_value)
    mean_accuracy(acc_value)
    loss_history.append((global_step, mean_loss.result().numpy()))
    
    if global_step % 10 == 0:
      clear_output(wait=True)
      epochs = epoch + step / float(num_batches_per_epoch)
      duration = time.time() - start_time
      examples_per_sec = batch_size / float(duration) 
      print("epochs: {:.2f}, step: {}, loss: {:.3g}, accuracy: {:.4g}% ({:.2f} examples/sec; {:.4f} sec/batch)".format(
          epochs, global_step, mean_loss.result().numpy(), mean_accuracy.result().numpy()*100, examples_per_sec, duration))
      
  # save mean accuracy for plot
  accuracy_history.append((global_step, mean_accuracy.result().numpy()))

  # clear the history
  mean_accuracy.reset_states()

print("training done!")

### Plot the loss funtion

In [None]:
plt.plot(*zip(*loss_history), label='loss')
plt.xlabel('Number of steps')
plt.ylabel('Loss value [cross entropy]')
plt.legend()
plt.show()

In [None]:
plt.plot(*zip(*accuracy_history), 'bo', label='accuracy')
plt.xlabel('Number of steps')
plt.ylabel('Accuracy value')
plt.legend()
plt.show()

## Evaluate a model

### Test trained model

* test accuracy: 0.9798 for 1 epochs

In [None]:
for images, labels in test_dataset:
  predictions = model(images)
  acc_object(labels, predictions)
  
print("test accuracy: {:.4g}%".format(acc_object.result() * 100))

### Plot test set

In [None]:
np.random.seed(219)

In [None]:
test_batch_size = 16
batch_index = np.random.choice(len(test_data), size=test_batch_size, replace=False)

batch_xs = test_data[batch_index]
batch_ys = test_labels[batch_index]
y_pred_ = model(batch_xs)

fig = plt.figure(figsize=(16, 10))
for i, (px, py) in enumerate(zip(batch_xs, y_pred_)):
  p = fig.add_subplot(4, 8, i+1)
  if np.argmax(py) == batch_ys[i]:
    p.set_title("y_pred: {}".format(np.argmax(py)), color='blue')
  else:
    p.set_title("y_pred: {}".format(np.argmax(py)), color='red')
  p.imshow(px.reshape(28, 28))
  p.axis('off')