## Residual network

In [None]:

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, BatchNormalization, Conv2D, Dense, Flatten, Add
import numpy as np
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt


![Fashion-MNIST overview image](data/fashion_mnist.png)

#### The Fashion-MNIST dataset

In this assignment, you will use the [Fashion-MNIST dataset](https://github.com/zalandoresearch/fashion-mnist). It consists of a training set of 60,000 images of fashion items with corresponding labels, and a test set of 10,000 images. The images have been normalised and centred. The dataset is frequently used in machine learning research, especially as a drop-in replacement for the MNIST dataset. 

- H. Xiao, K. Rasul, and R. Vollgraf. "Fashion-MNIST: a Novel Image Dataset for Benchmarking Machine Learning Algorithms." arXiv:1708.07747, August 2017.



#### Load the dataset

For this programming assignment, we will take a smaller sample of the dataset to reduce the training time.

In [None]:
# Load and preprocess the Fashion-MNIST dataset

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

train_images = train_images.astype(np.float32)
test_images = test_images.astype(np.float32)

train_images = train_images[:5000] / 255.
train_labels = train_labels[:5000]

test_images = test_images / 255.

train_images = train_images[..., np.newaxis]
test_images = test_images[..., np.newaxis]

In [None]:
# Create Dataset objects for the training and test sets

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.batch(32)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.batch(32)

In [None]:
# Get dataset labels

image_labels = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

#### Create custom layers for the residual blocks

In [None]:


class ResidualBlock(Layer):

    def __init__(self, **kwargs):
        super(ResidualBlock, self).__init__(**kwargs)
        
    def build(self, input_shape):
        
        self.batch_norm_1 = BatchNormalization(input_shape=input_shape)
        self.conv_1 = Conv2D(input_shape[-1], (3,3), padding='same')
        self.batch_norm_2 = BatchNormalization()
        self.conv_2 = Conv2D(input_shape[-1], (3,3), padding='same')        
        
        
    def call(self, inputs, training=False):
        
        x = self.batch_norm_1(inputs, training=training)
        x = tf.nn.relu(x)
        x = self.conv_1(x)
        x = self.batch_norm_2(inputs, training=training)
        x = tf.nn.relu(x)
        x = self.conv_2(x)

        return tf.add(inputs, x)        
        

In [None]:
#  create a model using your layer

test_model = tf.keras.Sequential([ResidualBlock(input_shape=(28, 28, 1), name="residual_block")])
test_model.summary()

In [None]:


class FiltersChangeResidualBlock(Layer):

    def __init__(self, out_filters, **kwargs):
        
        super(FiltersChangeResidualBlock, self).__init__(**kwargs)
        self.out_filters = out_filters       
        
        
    def build(self, input_shape):
        
        self.batch_norm_1 = BatchNormalization(input_shape=input_shape)
        self.conv_1 = Conv2D(input_shape[-1], (3,3), padding='same')
        self.batch_norm_2 = BatchNormalization()
        self.conv_2 = Conv2D(self.out_filters, (3,3), padding='same')

        self.conv_3 = Conv2D(self.out_filters, (1,1))         
        
        
    def call(self, inputs, training=False):
        
        x = self.batch_norm_1(inputs, training=training)
        x = tf.nn.relu(x)
        x = self.conv_1(x)
        x = self.batch_norm_2(inputs, training=training)
        x = tf.nn.relu(x)
        x = self.conv_2(x)
        final = self.conv_3(inputs)
        
        return tf.add(x, final)     
        

In [None]:
# Test your custom layer - the following should create a model using your layer

test_model = tf.keras.Sequential([FiltersChangeResidualBlock(16, input_shape=(32, 32, 3), name="fc_resnet_block")])
test_model.summary()

#### Create a custom model that integrates the residual blocks



In [None]:


class ResNetModel(Model):

    def __init__(self, **kwargs):
        
        super(ResNetModel, self).__init__(**kwargs)

        self.conv_1 = Conv2D(32, (7,7), strides=2)
        self.resnet_1 = ResidualBlock()
        self.conv_2 = Conv2D(32, (3,3), strides=2)
        self.resnet_2 = FiltersChangeResidualBlock(64)
        self.flatten = Flatten()
        self.dense = Dense(10, activation='softmax')        
        
        
    def call(self, inputs, training=False):
        
        x = self.conv_1(inputs)
        x = self.resnet_1(x, training)
        x = self.conv_2(inputs)
        x = self.resnet_2(x, training)
        x = self.flatten(x)
        
        return self.dense(x)        
        

In [None]:
# Create the model

resnet_model = ResNetModel()

#### Define the optimizer and loss function

We will use the Adam optimizer with a learning rate of 0.001, and the sparse categorical cross entropy function.

In [None]:
# Create the optimizer and loss

optimizer_obj = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_obj = tf.keras.losses.SparseCategoricalCrossentropy()

#### Define the grad function

In [None]:

@tf.function
def grad(model, inputs, targets, loss):
    
    with tf.GradientTape() as tape:
      predictions = model(inputs)
      loss_value = loss(targets, predictions)
      grads = tape.gradient(loss_value, model.trainable_variables)
        
    return loss_value, grads    
    

#### Define the custom training loop

In [None]:

def train_resnet(model, num_epochs, dataset, optimizer, loss, grad_fn):
    
    train_loss_results = []
    train_accuracy_results = []
    for epoch in range(num_epochs):

      epoch_loss_avg = tf.keras.metrics.Mean()
      epoch_accuracy = tf.keras.metrics.CategoricalAccuracy()

      for x, y in train_dataset:

        loss_value, grads = grad_fn(model, x, y, loss)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        epoch_loss_avg(loss_value)
        epoch_accuracy(to_categorical(y), model(x))

      train_loss_results.append(epoch_loss_avg.result())
      train_accuracy_results.append(epoch_accuracy.result())

    return train_loss_results, train_accuracy_results    
    

In [None]:
# Train the model for 8 epochs

train_loss_results, train_accuracy_results = train_resnet(resnet_model, 8, train_dataset, optimizer_obj, 
                                                          loss_obj, grad)

#### Plot the learning curves

In [None]:
fig, axes = plt.subplots(1, 2, sharex=True, figsize=(12, 5))

axes[0].set_xlabel("Epochs", fontsize=14)
axes[0].set_ylabel("Loss", fontsize=14)
axes[0].set_title('Loss vs epochs')
axes[0].plot(train_loss_results)

axes[1].set_title('Accuracy vs epochs')
axes[1].set_ylabel("Accuracy", fontsize=14)
axes[1].set_xlabel("Epochs", fontsize=14)
axes[1].plot(train_accuracy_results)
plt.show()

#### Evaluate the model performance on the test dataset

In [None]:
# Compute the test loss and accuracy

epoch_loss_avg = tf.keras.metrics.Mean()
epoch_accuracy = tf.keras.metrics.CategoricalAccuracy()

for x, y in test_dataset:
    model_output = resnet_model(x)
    epoch_loss_avg(loss_obj(y, model_output))  
    epoch_accuracy(to_categorical(y), model_output)

print("Test loss: {:.3f}".format(epoch_loss_avg.result().numpy()))
print("Test accuracy: {:.3%}".format(epoch_accuracy.result().numpy()))

#### Model predictions



In [None]:
# get model predictions on randomly selected test images

num_test_images = test_images.shape[0]

random_inx = np.random.choice(test_images.shape[0], 4)
random_test_images = test_images[random_inx, ...]
random_test_labels = test_labels[random_inx, ...]

predictions = resnet_model(random_test_images)

fig, axes = plt.subplots(4, 2, figsize=(16, 12))
fig.subplots_adjust(hspace=0.5, wspace=-0.2)

for i, (prediction, image, label) in enumerate(zip(predictions, random_test_images, random_test_labels)):
    axes[i, 0].imshow(np.squeeze(image))
    axes[i, 0].get_xaxis().set_visible(False)
    axes[i, 0].get_yaxis().set_visible(False)
    axes[i, 0].text(5., -2., f'Class {label} ({image_labels[label]})')
    axes[i, 1].bar(np.arange(len(prediction)), prediction)
    axes[i, 1].set_xticks(np.arange(len(prediction)))
    axes[i, 1].set_xticklabels(image_labels, rotation=0)
    pred_inx = np.argmax(prediction)
    axes[i, 1].set_title(f"Categorical distribution. Model prediction: {image_labels[pred_inx]}")
    
plt.show()