# Deep 2D-CNN

# Preliminaries

Check for GPU

In [None]:
import tensorflow as tf

In [None]:
if tf.test.gpu_device_name() != '/device:GPU:0':
  print("No GPU found")
else:
  print("GPU ready: {}".format(tf.test.gpu_device_name()))

Import preprocessing helper functions

In [None]:
import sys
sys.path.append("../../helper-modules")
from preprocessing_utils import read_in_data, preprocess

Read in the data to df_train, df_val and df_test

In [None]:
df_train, df_val, df_test = read_in_data()

Create X_train, y_train, X_val, y_val, X_test, y_test

In [None]:
(X_train, y_train), (X_val, y_val), (X_test, y_test) = preprocess("CNN", df_train, df_test, df_val)

# Model Building

Imports

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, AvgPool2D, Flatten, Dropout, Activation
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from itertools import product
import time
import matplotlib.pyplot as plt

Enable seed setting for improved reproducibility

In [None]:
import os
import random
import numpy as np

In [None]:
def set_seed(seed=42):
  SEED=seed

  # 1. Set `PYTHONHASHSEED` environment variable at a fixed value
  os.environ['PYTHONHASHSEED']=str(SEED)

  # 2. Set `python` built-in pseudo-random generator at a fixed value
  random.seed(SEED)

  # 3. Set `numpy` pseudo-random generator at a fixed value
  np.random.seed(SEED)

  # 4. Set the `tensorflow` pseudo-random generator at a fixed value
  tf.random.set_seed(SEED)

Extract number of classes (10 classes)

In [None]:
NUM_CLASSES = df_test["label"].nunique() 

Model design

In [None]:
def compile_model(num_filters_tuple, filter_size = 3, padding_type="same", act_function="relu", pooling_type="max", dense_layer_size=16, dropout_rate=0.1, l_rate=0.001, reg='l2', opt_algo='adam'):
    """
    Compiles a deep 2D-CNN model with the given hyperparameters.
    
    Architecture:
    
    Conv layer -> Conv layer -> Pooling layer -> Conv layer -> Conv layer -> Pooling layer
    ->  Fully connected layer -> Softmax output layer
    
    Hyperparameter options:
    
    num_filters_tuple : A 4-tuple with the number of filters for each conv layer
    filter_size : an int with the size of the filters (3 -> 3 x 3 filter)
    padding_type : either "same" or "valid"
    act_function : options include "relu", "sigmoid" and "tanh"
    pooling_type : either "max" or "avg"
    dense_layer_size : an int with the number of neurons in the fully connected layer
    dropout_rate : a float with the rate of dropout to be applied to the fully connected layer
    l_rate : a float with the learning rate
    reg : string with the regularization type e.g. "l2"
    opt_algo : the optimization algorithm, "adam" or "sgd"
    """  
    model = Sequential()

    # CONVOLUTIONAL LAYER 1
    model.add(Conv2D(filters=num_filters_tuple[0], kernel_size=(filter_size,filter_size), input_shape=(40, 37, 1), padding=padding_type))
    model.add(Activation(act_function))

    # CONVOLUTIONAL LAYER 2
    model.add(Conv2D(filters=num_filters_tuple[1], kernel_size=(filter_size,filter_size), padding=padding_type))
    model.add(Activation(act_function))

    # POOLING LAYER 1
    if pooling_type == "avg":  # strides defaults to pool_size
      model.add(AvgPool2D(pool_size=(2, 2)))
    elif pooling_type == "max":
      model.add(MaxPool2D(pool_size=(2, 2)))
    else:
      raise Exception("Invalid pooling option entered")
      
    # CONVOLUTIONAL LAYER 3
    model.add(Conv2D(filters=num_filters_tuple[2], kernel_size=(filter_size,filter_size), padding=padding_type))
    model.add(Activation(act_function))

     # CONVOLUTIONAL LAYER 4
    model.add(Conv2D(filters=num_filters_tuple[3], kernel_size=(filter_size,filter_size), padding=padding_type))
    model.add(Activation(act_function))

    # POOLING LAYER 2
    if pooling_type == "avg":  # strides defaults to pool_size
      model.add(AvgPool2D(pool_size=(2, 2)))
    elif pooling_type == "max":
      model.add(MaxPool2D(pool_size=(2, 2)))
    else:
      raise Exception("Invalid pooling option entered")

    # FLATTEN OUTPUT
    model.add(Flatten())

    # # FULLY CONNECTED LAYER
    model.add(Dense(dense_layer_size, activation=act_function))

    # # DROPOUT to prevent overfitting
    model.add(Dropout(dropout_rate))
    
    # LAST LAYER IS THE CLASSIFIER, THUS 12 POSSIBLE CLASSES
    model.add(Dense(NUM_CLASSES, activation='softmax', kernel_regularizer=reg))

    # Optimization algorithm
    if opt_algo == 'adam':
      opt = tf.keras.optimizers.Adam(learning_rate=l_rate)
    elif opt_algo == 'sgd':
      opt = tf.keras.optimizers.SGD(learning_rate=l_rate)

    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])
    
    return model

# Experiment: Grid Search

Defining grid search hyperparameter options

In [None]:
num_filters_tuples = [(4, 4, 4, 4), (8, 8, 8, 8), (16, 16, 16, 16), (32, 32, 32, 32), (64, 64, 64, 64), (128, 128, 128, 128)]
l_rates = [0.01, 0.005, 0.001, 0.0005, 0.0001]
dropout_rates = [0.05, 0.1, 0.2, 0.5]
pooling_types = ["max"]
filter_sizes = [3]
padding_types = ["same"]
act_functions=["relu"]
regs = ['l2']
opt_algos = ['adam']

all_hparams = [num_filters_tuples, l_rates, dropout_rates, pooling_types, filter_sizes, padding_types, act_functions, regs, opt_algos]

Conduct the grid search

In [None]:
def grid_search():
  """
  For each `num_filters_tuples` option (corresponding to a given number of parameters), conduct a grid search
  over the hyperparameter options, writing the validation results and model details for each model to a file.
  
  All trained models are saved as h5 files. 
  """
  best_model = None
  best_accuracy = 0
  best_history = None

  EPOCHS = 50
  BATCH_SIZE = 32
  STOPPING_PATIENCE = 5

  STARTING_POINT = 1

  with open('../training-results/ExperimentLogs_Deep-CNN.csv', 'w') as log_file:
      log_file.write("ModelNumber;Timestamp;NumEpochs;ValAccuracy;NumParams;NumFiltersPerLayer;DenseLayerSize;LearningRate;DropoutRate;PoolingType;FilterSize;PaddingType;ActivationFunction;Regularization;OptimizationAlgorithm\n")

  for model_num, hparam_set in enumerate(list(product(*all_hparams))[(STARTING_POINT-1):], start=STARTING_POINT):
    # Extract hyperparams for current model from grid search grid
    num_filters_tuple, l_rate, dropout_rate, pooling_type, filter_size, padding_type, act_function, reg, opt_algo = hparam_set
    dense_layer_size = num_filters_tuple[0]  # set number of dense layer neurons to number of filters of first two conv layers

    # Define hyperparameters for the current model 
    hparams = {
        "num_filters_tuple" : num_filters_tuple,
        "filter_size" : filter_size, 
        "padding_type" : padding_type, 
        "act_function" : act_function, 
        "pooling_type" : pooling_type, 
        "dense_layer_size" : dense_layer_size, 
        "dropout_rate" : dropout_rate, 
        "l_rate": l_rate,
        "reg" : reg,
        "opt_algo" : opt_algo
    }

    # Compile model and count number of parameters
    model = compile_model(**hparams)
    num_params = model.count_params()

    model_str = f"{num_filters_tuple}; {dense_layer_size}; {l_rate}; {dropout_rate}; {pooling_type}; {filter_size}; {padding_type}; {act_function}; {reg}; {opt_algo}"
    print(f"Training model {model_num} with {num_params} params - {model_str}")

    # Prevent overfitting with early stopping
    early_stop = EarlyStopping(monitor='val_accuracy', patience=STOPPING_PATIENCE)
    # For saving best val accuracy model as an h5 (for test predictions later)
    model_check = ModelCheckpoint(f"../trained-models/Deep-CNN_{model_num}.h5", monitor='val_accuracy', mode='max', verbose=0, save_best_only=True)

    # Set seed for reproducibility
    set_seed(42)

    # Train model
    # Note: one reason val accuracy might be higher than train accuracy during training is because dropout affects training but not validation
    history = model.fit(X_train, y_train, epochs=EPOCHS, batch_size=BATCH_SIZE, validation_data=(X_val, y_val), callbacks=[early_stop, model_check])
    n_epochs = len(history.history['loss'])
                   
    # Time stamp when model finished training
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    # Extract best validation accuracy at inex 1 (index 0 has the loss)
    val_accuracy = max(history.history['val_accuracy'])
    print(f"Best val accuracy: {val_accuracy}")

    # Store results in log file
    with open('../training-results/ExperimentLogs_Deep-CNN.csv', 'a') as log_file:
      log_file.write(f"{model_num};{timestamp};{n_epochs};{val_accuracy};{num_params};{model_str}\n")

    if val_accuracy > best_accuracy:
      best_accuracy = val_accuracy
      best_model = model
      best_history = history

  # Return the model with the highest validation accuracy
  return best_model, best_history


In [None]:
best_model, best_history = grid_search()

# Evaluation of the best model

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

Training and validation performances over epochs

In [None]:
plt.plot(best_history.history['loss'])
plt.plot(best_history.history['val_loss'])
plt.title('Convergence of the loss function')
plt.xlabel('epochs')
plt.ylabel('loss')
plt.legend(['train','val'])

In [None]:
plt.plot(best_history.history['accuracy'])
plt.plot(best_history.history['val_accuracy'])
plt.title('Convergence of the accuracy')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.legend(['train','val'])

Confusion matrix

In [None]:
plot_conf_mtx(best_model, X_val, y_val, df_val)