# Ex. 3. Experimental architecture comparison

### This notebook uses the template provided by Christian Igel, for the German Traffic Sign Recognition Benchmark. 
The notebook generates 4 different architectures (original, dropout1, dropout2, dropout3), 6 models for each architecture, for a total of 24 models (description in the report). It train the models and generate a plot visualizing the training progress of each model architecture. It uses each model for prediction, build a dataframe for each model architecture performance. Compute the mean and median of the loss and the accuracy on the test data. Finally, it generate two boxplots, one for the accuracy and the other for the loss, visualizing the difference between the classification performance of the model architectures.  
 
Since the notebook train and evaluate 24 models, it takes around 7 hours to run on my local machine (it takes much more on Google Colab). I generated this notebook only for providing the code of this exercise as a single file. Initially, I simply generated different models, trained and saved them to my locale machine, each one on a different notebook. Then, I loaded them into a single notebook and I computed the predictions, the evaluation and I generated the plots. But since it has been requested to provide the code for the exercises, I decided to provide a compact code in a single notebook able to generate the data I used in the report.

In [0]:
%matplotlib inline 
%tensorflow_version 2.x
import tensorflow as tf
import matplotlib.pyplot as plt 
import numpy as np
import pathlib
import os

print("TensorFlow version:", tf.__version__)
AUTOTUNE = tf.data.experimental.AUTOTUNE

TensorFlow version: 2.2.0


Download the data:

In [0]:
data_root="."
tf.keras.utils.get_file("GTSRB.zip",
                        "https://sid.erda.dk/share_redirect/EB0rrpZwuI",
                        cache_dir=data_root,
                        extract=True)


The images are stored in subdirectories. The names of the subdirectories encode the class labels. 

In [0]:
# Determine the number of training and test images
n_train = len(list(pathlib.Path(data_root).glob("datasets/GTSRB/train/*/*")))
n_test = len(list(pathlib.Path(data_root).glob("datasets/GTSRB/test/*/*")))

print("Number of training images:", n_train)
print("Number of test images:" ,n_test)

Some basic constants: 

In [0]:
no_classes = 43  
no_channels = 3  

# All images are initially resized to img_height x img_width
img_height  = 32  
img_width   = 32
# During training and testing the  images are cropped to img_height_crop x img_width_crop
img_height_crop = 28  
img_width_crop  = 28

batch_size = 128
steps_per_epoch = n_train // batch_size  # How many batches are there in each epoch?

Helper function for extracting the label information from the paths to the images and loading and preprocessing the images:

In [0]:
# Extract the label from the file path
def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  return tf.strings.to_number(parts[-2], tf.int32)

# Load image, convert it to floats, and resize it
def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_png(img, channels=no_channels)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # Resize the image to the desired size.
  return tf.image.resize(img, [img_width, img_height])

# Given the path and filename of an image, create the label and the input image
def process_path(file_path):
  label = get_label(file_path)
  # Load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

Create data sets based on the files:

In [0]:
# Tell dataset the lists of files containing the trainig and test images, respectively
list_ds_train = tf.data.Dataset.list_files(data_root + "/datasets/GTSRB/train/*/*")
list_ds_test  = tf.data.Dataset.list_files(data_root + "/datasets/GTSRB/test/*/*", shuffle=False)  # Fixed order for test time augemantation 

# Tell dataset how to extract images and labels
labeled_ds_train = list_ds_train.map(process_path, num_parallel_calls=AUTOTUNE) # num of parallel processing depends on the CPU core
labeled_ds_test = list_ds_test.map(process_path, num_parallel_calls=AUTOTUNE)

Functions for data preprocessing/augmentation:

In [0]:
# Used for training data augmentation
def augment(image, label):
    # Take a random sub-image 28x28 from the input
    image = tf.image.random_crop(image, [img_height_crop, img_width_crop, no_channels])
    # Only for the defines labels, random flip the image
    image = tf.case([(tf.equal(label, 11), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 12), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 13), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 17), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 18), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 26), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 30), lambda: tf.image.random_flip_left_right(image)),
                     (tf.equal(label, 35), lambda: tf.image.random_flip_left_right(image))], default = lambda: image)
    # Change the brightness (sometimes brighter, sometimes darker)
    image = tf.image.random_brightness(image, max_delta=0.1) # Random brightness
    # Clip all values from 0 and 1, so that they are the values of a proper image
    image = tf.clip_by_value(image, 0., 1.)
    return image, label

# Used for testing/evaluation
def crop_center(image, label):
  # I take the center of the image 28x28
  image = tf.image.resize_with_crop_or_pad(image, img_height_crop, img_width_crop)
  return image, label

# Could be used for test time augementation
def crop_random(image, label):
  image = tf.image.random_crop(image, [img_height_crop, img_width_crop, no_channels])
  return image, label


We prepare the data for training and testing differently. For example, for training we use data augmentation.

In [0]:
def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000):
  # `.cache(filename)` is used to cache preprocessing work for datasets that don't
  # fit in memory.
  if cache:
    if isinstance(cache, str):          
      ds = ds.cache(cache)             
    else:
      ds = ds.cache()  
  # We shuffle the data after the caching
  ds = ds.shuffle(buffer_size=shuffle_buffer_size, reshuffle_each_iteration=True)
  # Repeat forever
  ds = ds.repeat()                    
  # Do data augmentation
  ds = ds.map(augment, num_parallel_calls=AUTOTUNE) 
  # Partition in batches
  ds = ds.batch(batch_size)
  # Fetch batches in the background while the model training
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

def prepare_for_evaluation(ds, shuffle_buffer_size=1000):
  # For normal evaluation, we look at the center of the image
  ds = ds.map(crop_center, num_parallel_calls=AUTOTUNE)        
  # Partition in batches
  ds = ds.batch(batch_size)
  # Fetch batches in the background while the model training
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

def prepare_for_augmented_evaluation(ds):
  ds = ds.map(crop_random, num_parallel_calls=AUTOTUNE)
  ds = ds.batch(batch_size)
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

Let's have a look at a random batch of images:



In [0]:
# Helper function for displaying images
def show_batch(image_batch, label_batch, nrows=6, ncols=6):
  plt.figure(figsize=(10,10))
  for n in range(nrows*ncols):
      ax = plt.subplot(nrows, ncols, n+1)
      if no_channels == 3:
        plt.imshow(image_batch[n])
      else:
        plt.imshow(image_batch[n].reshape([img_height_crop, img_width_crop]))
      plt.title('class:' + str(label_batch[n]))
      plt.axis('off')
      
# Make training dataset 
train_ds = prepare_for_training(labeled_ds_train)

# Make training dataset 
test_ds = prepare_for_evaluation(labeled_ds_test)

# Get a batch of images and labels
image_batch, label_batch = next(iter(train_ds))

# Visualize images and labels
show_batch(image_batch.numpy(), label_batch.numpy())

Functions to generate models

In [0]:
sd_init = 0.01  

# Original-model
def original_generator(): 
  return tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, (5, 5), activation=None,  
                                  input_shape=(img_width_crop, img_height_crop, no_channels),
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Conv2D(64, (5, 5), activation=None, 
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(no_classes, activation='softmax')])
  
# Dropout1-model
def dropout1_generator(): 
  return tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, (5, 5), activation=None,  
                                  input_shape=(img_width_crop, img_height_crop, no_channels),
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Conv2D(64, (5, 5), activation=None, 
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(no_classes, activation='softmax')])

# Dropout2-model
def dropout2_generator(): 
  return tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, (5, 5), activation=None,  
                                  input_shape=(img_width_crop, img_height_crop, no_channels),
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Conv2D(64, (5, 5), activation=None, 
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(no_classes, activation='softmax')])

# Dropout3-model
def dropout3_generator(): 
  return tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, (5, 5), activation=None,  
                                  input_shape=(img_width_crop, img_height_crop, no_channels),
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Conv2D(64, (5, 5), activation=None, 
                                  bias_initializer=tf.initializers.TruncatedNormal(mean=sd_init, stddev=sd_init)),
            tf.keras.layers.ELU(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.MaxPooling2D(pool_size=(2,2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(no_classes, activation='softmax')])

Save models architectures

In [0]:
# Save 6 models for the original architecture
original1 = original_generator()
original2 = original_generator()
original3 = original_generator()
original4 = original_generator()
original5 = original_generator()
original6 = original_generator()
original1.summary()
original_models = [original1, original2, original3, 
                   original4, original5, original6]

# Save 6 models for the dropout1 architecture
dropout1_1 = dropout1_generator()
dropout1_2 = dropout1_generator()
dropout1_3 = dropout1_generator()
dropout1_4 = dropout1_generator()
dropout1_5 = dropout1_generator()
dropout1_6 = dropout1_generator()
dropout1_1.summary()
dropout1_models = [dropout1_2, dropout1_2, dropout1_3,
                   dropout1_4, dropout1_5, dropout1_6]

# Save 6 models for the dropout2 architecture
dropout2_1 = dropout2_generator()
dropout2_2 = dropout2_generator()
dropout2_3 = dropout2_generator()
dropout2_4 = dropout2_generator()
dropout2_5 = dropout2_generator()
dropout2_6 = dropout2_generator()
dropout2_1.summary()
dropout2_models = [dropout2_1, dropout2_2, dropout2_3,
                   dropout2_4, dropout2_5, dropout2_6]

# Save 6 models for the dropout3 architecture
dropout3_1 = dropout3_generator()
dropout3_2 = dropout3_generator()
dropout3_3 = dropout3_generator()
dropout3_4 = dropout3_generator()
dropout3_5 = dropout3_generator()
dropout3_6 = dropout3_generator()
dropout3_1.summary()
dropout3_models = [dropout3_1, dropout3_2, dropout3_3,
                   dropout3_4, dropout3_5, dropout3_6]

# Store the lists of models in a single list
model_architectures = [original_models, dropout1_models, 
                       dropout2_models, dropout3_models]

Define optimizer and compile the models:

In [0]:
opt = tf.keras.optimizers.Adam(learning_rate=0.001, epsilon=0.1)

for architecture in model_architectures:
  for model in architecture:
    model.compile(loss='sparse_categorical_crossentropy', optimizer=opt, 
                  metrics=['accuracy']) 

Do the learning:

In [0]:
# Train the models
history_list = []

# Use validation only for the first model of each architecture
for architecture in model_architectures:
  i = 1
  for model in architecture:
    if i == 1:
      history = model.fit(train_ds, epochs=800, steps_per_epoch=steps_per_epoch, 
                          validation_freq=1, validation_data=test_ds)
      history_list.append(history)
    else:
      model.fit(train_ds, epochs=800, steps_per_epoch=steps_per_epoch)
    i += 1

Model evaluation

In [0]:
# Summarize history for accuracy  
def accuracy_plot(i_arch, model_name):   
  """
  i_arch : integer (0 = original, 1 = dropout1, etc)
  model_name : sring ("Original", "Dropout1", etc)
  """                 
  plt.plot(history_list[i_arch].history['accuracy'],zorder=2)
  plt.plot(history_list[i_arch].history['val_accuracy'])
  plt.title(model_name + '-model accuracy')
  plt.yticks(np.arange(0.1, 1.1, 0.1))
  plt.ylim(0,1.05)
  plt.grid(zorder=0, color="lightgray")
  plt.ylabel('Accuracy')
  plt.xlabel('Epoch')
  plt.legend(['train', 'test'], loc='lower right')
  plt.savefig(model_name + "_val_acc.png", dpi=300)
  plt.show()

# Summarize history for loss
def loss_plot(i_arch, model_name):
  """
  i_arch : integer (0 = original, 1 = dropout1, etc)
  model_name : sring ("Original", "Dropout1", etc)
  """  
  plt.plot(history_list[i_arch].history['loss'],zorder=2)
  plt.plot(history_list[i_arch].history['val_loss'])
  plt.title(model_name + '-model loss')
  plt.yticks(np.arange(0, 4, 0.5))
  plt.ylim(-0.15,3.8)
  plt.grid(zorder=0, color="lightgray")
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['train', 'test'], loc='upper right')
  plt.savefig(model_name + "_val_loss.png", dpi=300)
  plt.show()

In [0]:
# Save and visualize the plot of the training progress of the first model of each architecture
architectures_names = ["Original", "Dropout1", "Dropout2", "Dropout3"]
for i, name in enumerate(architectures_names):
  accuracy_plot(i, name)
  loss_plot(i, name)

In [0]:
# Make predictions

eval_train_ds = prepare_for_evaluation(labeled_ds_train)
eval_test_ds = prepare_for_evaluation(labeled_ds_test)

def evaluate(model):
  test_result = model.evaluate(eval_test_ds)
  train_result = model.evaluate(eval_train_ds)
  return test_result, train_result

# Function to generate a dataframe with the evaluation of the model
def get_evaluation_df(models_list):
  print("Evaluating model architecture..")
  df = np.array(evaluate(models_list[0])).reshape(1,4)
  for i in range(1, 6):
    temp_df = np.array(evaluate(models_list[i])).reshape(1,4)
    df = np.concatenate((df, temp_df), axis=0) 
  return df

In [0]:
# Store the accuracy and loss on train and test set into dataframes
original_df = get_evaluation_df(original_models)
dropout1_df = get_evaluation_df(dropout1_models)
dropout2_df = get_evaluation_df(dropout2_models)
dropout3_df = get_evaluation_df(dropout3_models)

In [0]:
# Print the evaluation
def print_eval(model_name, evaluation_df):
  print(">> " + model_name + "-model <<\n")
  print("Mean:\n  test loss,  test acc, train loss, train acc\n", 
        np.mean(evaluation_df, axis=0),"\n")
  print("Median:\n  test loss,  test acc, train loss, train acc\n", 
        np.median(evaluation_df, axis=0))
  print("\nAll trials:")
  print("  Test loss, Test acc,  Train loss, Train acc")
  print(evaluation_df, "\n")

print_eval("Original", original_df)
print_eval("Dropout1", dropout1_df)
print_eval("Dropout2", dropout2_df)
print_eval("Dropout3", dropout3_df)

In [0]:
# Plot accuracy and loss performance on test set

# Boxplot test accuracy
def boxplot_acc(is_train_complete = True):
  fig = plt.figure()
  ax = fig.add_subplot(111)
  medianprops = dict(color="red")
  labels = ["Original", "Dropout1", "Dropout2", "Dropout3"]
  box = ax.boxplot([original_df[:,1], dropout1_df[:,1], 
                    dropout2_df[:,1], dropout3_df[:,1]], 
                  labels = labels, 
                  patch_artist = True, 
                  sym = 'x',
                  medianprops = medianprops)
  colors = ['#1f77b4', 'lightsteelblue', "mediumseagreen", "plum"]
  for patch, color in zip(box['boxes'], colors):
      patch.set_facecolor(color)
  ax.set_title("Original model vs dropouts models test accuracy")
  if is_train_complete == True:
    ax.set_ylim(0.935, 0.985)
    major_ticks = np.arange(0.94, 0.98, 0.01)
    minor_ticks = np.arange(0.94, 0.98, 0.005)
    ax.set_yticks(major_ticks)
    ax.set_yticks(minor_ticks, minor=True)
    plt.grid(which = "major", axis = "y", alpha = 0.5)
    plt.grid(which = "minor", axis = "y", alpha = 0.5)
  else:
    plt.grid(axis = "y", alpha = 0.5)  
  ax.set_ylabel("Test accuracy", fontsize = 12)  
  plt.savefig("test_acc_boxplot.png", dpi = 300)
  plt.show()

# Boxplot test loss
def boxplot_loss(is_train_complete = True):
  fig = plt.figure()
  ax = fig.add_subplot(111)
  medianprops = dict(color="red")
  labels = architectures_names
  box = ax.boxplot([original_df[:,0], dropout1_df[:,0], 
                    dropout2_df[:,0], dropout3_df[:,0]], 
                  labels = labels, 
                  patch_artist = True, 
                  sym = 'x',
                  medianprops = medianprops)
  colors = ['#1f77b4', 'lightsteelblue', "mediumseagreen", "plum"]
  for patch, color in zip(box['boxes'], colors):
      patch.set_facecolor(color)
  ax.set_title("Original model vs dropouts models test loss")
  if is_train_complete == True:
    ax.set_ylim(0.5, 0.75)
    major_ticks = np.arange(0.1, 0.75, 0.1)
    minor_ticks = np.arange(0.05, 0.75, 0.05)
    ax.set_yticks(major_ticks)
    ax.set_yticks(minor_ticks, minor=True)
    plt.grid(which = "major", axis = "y", alpha = 0.5)
    plt.grid(which = "minor", axis = "y", alpha = 0.5)
  else:
    plt.grid(axis = "y", alpha = 0.5) 
  ax.set_ylabel("Test loss", fontsize = 12)  
  
  plt.savefig("test_loss_boxplot", dpi = 300)
  plt.show()

In [0]:
# If training is completed for all 800 epochs
boxplot_acc(is_train_complete=True)
boxplot_loss(is_train_complete=True)

# # If training is not completed for all 800 epochs, just for testing the code
# boxplot_acc(is_train_complete=False)
# boxplot_loss(is_train_complete=False)