# 0. Import Libraries

In [2]:
# Uncomment at first run
# !pip install tensorflow
# !pip install numpy
# !pip install matplotlib
# !pip install scipy
# !pip install tqdm
# !pip install ipywidgets

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm.notebook import tqdm
import os

# 1. Set Hyper Parameters

In [3]:
learning_rate = 0.001
training_epochs = 15
batch_size = 100

# Checkpoint Configuration
cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'mnist_cnn_seq'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

# Data Augmentation

In [4]:
def data_augmentation(images, labels):
    aug_images = []
    aug_labels = []
    for x, y in tqdm(zip(images, labels)):
        aug_images.append(x)
        aug_labels.append(y)
        bg_value = np.median(x)
        for _ in range(4):
            # Rotating
            angle = np.random.randint(-15, 15, 1)
            rot_img = ndimage.rotate(x, int(angle), reshape=False, cval=bg_value)
            # Shifting
            shift = np.random.randint(-2, 2, 2)
            shift_img = ndimage.shift(rot_img, shift, cval=bg_value)
            
            aug_images.append(shift_img)
            aug_labels.append(y)
    aug_images = np.array(aug_images)
    aug_labels = np.array(aug_labels)
    return aug_images, aug_labels            

# 2. Make a Data Pipelining

In [10]:
# Data Loading
mnist = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

print(train_images.shape)

# Data Augmentation
# train_images, train_labels = data_augmentation(train_images, train_labels)

# Normalization
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.
train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)

# One-Hot Encoding
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size=500000).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)

(60000, 28, 28)


# 3. Build a Neural Network Model

## Sequential API

In [5]:
def create_model():
    model = keras.Sequential()
    model.add(keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME', input_shape=(28, 28, 1)))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME'))
    model.add(keras.layers.MaxPool2D(padding='SAME'))
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(256, activation=tf.nn.relu))
    model.add(keras.layers.Dropout(0.4))
    model.add(keras.layers.Dense(10))
    return model

model = create_model()
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 28, 28, 32)        320       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 14, 14, 64)        18496     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 4, 4, 128)         0         
_________________________________________________________________
flatten (Flatten)            (None, 2048)              0

## Functional API

In [6]:
def create_model_functional():
    inputs = keras.Input(shape=(28, 28, 1))
    conv1 = keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME')(inputs)
    pool1 = keras.layers.MaxPool2D(padding='SAME')(conv1)
    conv2 = keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME')(pool1)
    pool2 = keras.layers.MaxPool2D(padding='SAME')(conv2)
    conv3 = keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME')(pool2)
    pool3 = keras.layers.MaxPool2D(padding='SAME')(conv3)
    pool3_flat = keras.layers.Flatten()(pool3)
    dense4 = keras.layers.Dense(256, activation=tf.nn.relu)(pool3_flat)
    drop4 = keras.layers.Dropout(0.4)(dense4)
    logits = keras.layers.Dense(10)(drop4)
    return keras.Model(inputs=inputs, outputs=logits)

model = create_model()
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_3 (Conv2D)            (None, 28, 28, 32)        320       
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 14, 14, 64)        18496     
_________________________________________________________________
max_pooling2d_4 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
max_pooling2d_5 (MaxPooling2 (None, 4, 4, 128)         0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 2048)             

## Model Subclassing

In [7]:
class MNISTModel(keras.Model):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool1 = keras.layers.MaxPool2D(padding='SAME')
        self.conv2 = keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool2 = keras.layers.MaxPool2D(padding='SAME')
        self.conv3 = keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool3 = keras.layers.MaxPool2D(padding='SAME')
        self.pool3_flat = keras.layers.Flatten()
        self.dense4 = keras.layers.Dense(256, activation=tf.nn.relu)
        self.drop4 = keras.layers.Dropout(0.4)
        self.dense5 = keras.layers.Dense(10)
    
    def call(self, inputs, training=False):
        net = self.conv1(inputs)
        net = self.pool1(net)
        net = self.conv2(net)
        net = self.pool2(net)
        net = self.conv3(net)
        net = self.pool3(net)
        net = self.pool3_flat(net)
        net = self.dense4(net)
        net = self.drop4(net)
        net = self.dense5(net)
        return net

model = MNISTModel()

## Model Subclassing Ensemble(Best)

In [8]:
class MNISTModel(keras.Model):
    def __init__(self):
        super(MNISTModel, self).__init__()
        self.conv1 = keras.layers.Conv2D(filters=32, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool1 = keras.layers.MaxPool2D(padding='SAME')
        self.conv2 = keras.layers.Conv2D(filters=64, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool2 = keras.layers.MaxPool2D(padding='SAME')
        self.conv3 = keras.layers.Conv2D(filters=128, kernel_size=3, activation=tf.nn.relu, padding='SAME')
        self.pool3 = keras.layers.MaxPool2D(padding='SAME')
        self.pool3_flat = keras.layers.Flatten()
        self.dense4 = keras.layers.Dense(256, activation=tf.nn.relu)
        self.drop4 = keras.layers.Dropout(0.4)
        self.dense5 = keras.layers.Dense(10)
    
    def call(self, inputs, training=False):
        net = self.conv1(inputs)
        net = self.pool1(net)
        net = self.conv2(net)
        net = self.pool2(net)
        net = self.conv3(net)
        net = self.pool3(net)
        net = self.pool3_flat(net)
        net = self.dense4(net)
        net = self.drop4(net)
        net = self.dense5(net)
        return net

models = []
num_models = 3
for _ in range(num_models):
    models.append(MNISTModel())

# 4. Define a Loss Function

In [9]:
def loss_fn(model, images, labels):
    logits = model(images, training=True)
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))
    return loss

# 5.Calculate a Gradient

In [10]:
def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

# 6. Select an Optimizer

In [11]:
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)

# 7. Define a Metric for Model's Performance

In [12]:
def evaluate(models, images, labels):
    predictions = tf.zeros_like(labels)
    for model in models:
        logits = model(images, training=False)
        predictions += logits        
    correct_prediction = tf.equal(tf.argmax(predictions, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    return accuracy

# 8. Make a Checkpoint for Saving

In [13]:
checkpoints = []
for m in range(num_models):
    checkpoints.append(tf.train.Checkpoint(cnn=models[m]))

# 9. Train and Validate a Neural Network Model

In [14]:
for epoch in range(training_epochs):
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0

    for images, labels in tqdm(train_dataset):
        for model in models:
            grads = grad(model, images, labels)
            optimizer.apply_gradients(zip(grads, model.variables))
            loss = loss_fn(model, images, labels)
            avg_loss = avg_loss + loss
        acc = evaluate(models, images, labels)
        avg_train_acc = avg_train_acc + acc
        train_step += 1
    avg_loss = avg_loss / train_step
    avg_train_acc = avg_train_acc / train_step

    for images, labels in test_dataset:
        acc = evaluate(models, images, labels)
        avg_test_acc = avg_test_acc + acc
        test_step += 1
    avg_test_acc = avg_test_acc / test_step

    print(f'Epoch: {epoch + 1}, loss = {avg_loss:.8f}, train accuracy = {avg_train_acc:.4f}, test accuracy = {avg_test_acc:.4f}')

    for idx, checkpoint in enumerate(checkpoints):
        checkpoint.save(file_prefix=checkpoint_prefix + f'-{idx}')

  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 1, loss = 0.22926131, train accuracy = 0.9837, test accuracy = 0.9950


  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 2, loss = 0.07009447, train accuracy = 0.9963, test accuracy = 0.9954


  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 3, loss = 0.04766445, train accuracy = 0.9980, test accuracy = 0.9963


  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 4, loss = 0.03521881, train accuracy = 0.9987, test accuracy = 0.9954


  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 5, loss = 0.02818876, train accuracy = 0.9993, test accuracy = 0.9965


  0%|          | 0/3000 [00:00<?, ?it/s]

Epoch: 6, loss = 0.02231972, train accuracy = 0.9995, test accuracy = 0.9962


  0%|          | 0/3000 [00:00<?, ?it/s]

KeyboardInterrupt: 