# 6.2 Training a basic Convolutional Neural Network for classification of grayscale images of handwritten digits
This examples illustrates a basic CNN in Tensorflow/Keras, trained to classify handwritten digits from their grayscale images. It uses the well-known MNIST dataset. Except the convolutional layers it supports **Max Pooling** layers and **Normalization Layers** that are helpful for the stability of the learning process. 

In [None]:
# Mount GDrive, change directory and check contents of folder.

import os
from google.colab import drive
from google.colab import files

PROJECT_FOLDER = "/content/gdrive/My Drive/Colab Notebooks/CS345_SP22/6. CNN"

drive.mount('/content/gdrive/')
os.chdir(PROJECT_FOLDER)
print("Current dir: ", os.getcwd())

# Settings and Basic Package Imports

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from mllib.utils import RandomSeed

# __________ | Settings | __________
IS_PLOTING_DATA         = True
IS_DEBUGABLE            = False
IS_RETRAINING           = True
RandomSeed(2022)

# Hyperparameters
For each training experiment, we define all the model/training hyperparameters inside a Python dictionary.

In [None]:
CONFIG_BASELINE = {
                      "ModelName": "MNIST1"  
                    ,"DNN.InputFeatures": 28*28
                    ,"DNN.LayerNeurons": [512,10]
                    ,"DNN.Classes": 10
                    ,"Training.MaxEpoch": 20
                    ,"Training.BatchSize": 500
                    ,"Training.LearningRate": 0.3
                  }
CONFIG_CNN = {
                 "ModelName": "MNIST_CNN1"
                ,"CNN.InputShape": [28,28,1]
                ,"CNN.Classes": 10
                ,"CNN.ModuleCount": 6
                ,"CNN.ConvOutputFeatures": [9,16,24,32,48,48]
                ,"CNN.ConvWindows": [ [3,2,True], [3,1,True] ,  [3,1,True], [3,2,True], [3,1,True], [3,1,True] ]
                ,"CNN.PoolWindows": [  None      , None       ,  None      , None      , [3,2]     , None      ]
                ,"CNN.HasBatchNormalization": True
                ,"Training.MaxEpoch": 12
                ,"Training.BatchSize": 500
                ,"Training.LearningRate": 0.001               
            }
                     

We choose the hyperparameter set for the current model training experiment

In [None]:
CONFIG = CONFIG_CNN
IS_CNN = CONFIG == CONFIG_CNN

# MNIST
This [MNIST dataset](http://yann.lecun.com/exdb/mnist/) dataset, that dates back to 1998, has become a standard toy dataset to understand the image classification task. It contains 70000 grayscale images of 28x28 dimensions for the handwritten digits 0,1,..9. 
It is already splitted into a training set of 60000 images, while the rest 10000 are used to validate the model

# Dataset loading and previewing
We are reusing an existing dataset in Tensorflow format. We load the data and extract them as numpy arrays to view some images, and later use the target class labels for evaluation.

In [None]:
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt

(oTSData, oVSData), oDataSetInfo = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
  
# Takes one minibatch out of the dataset. Here the size of the minibatch is the total count of samples
for tImages, tLabels in oVSData.batch(oDataSetInfo.splits['test'].num_examples).take(1):
    nImages            = tImages.numpy()
    nTargetClassLabels = tLabels.numpy()  

print("VS image features tensor shape:" , nImages.shape)
print("VS image targets vector shape :", nTargetClassLabels.shape)

if IS_PLOTING_DATA:
    for nIndex, nSample in enumerate(nImages):
      nLabel = nTargetClassLabels[nIndex]
      if (nIndex >= 0 and nIndex <= 20):
           
        if nIndex == 0:
            print("Image sample shape            :", nSample.shape)
        nImage =  nSample.astype(np.uint8) 
        plt.imshow(nImage[:,:,0], cmap="gray") #https://matplotlib.org/stable/tutorials/colors/colormaps.html
        #plt.imshow(nImage[4:22, 0:15, :], cmap="gray") #https://matplotlib.org/stable/tutorials/colors/colormaps.html
        plt.title("Digit %d" % nLabel)
        plt.show()    

# Tensorflow/Keras data feeding pipelines
We are going to create a pipeline that will feed our training process with data. There is a method used with `map()` that is called for each sample to normalize the value, reshape its features shape and create one-hot encodings for its label. The pipeline uses
* `cache`: To cache the data in memory
* `shuffle`: To shuffly the samples at each step
* `batch`: To create mini-batches of samples
* `prefetch`: To use multi-threading for loading the samples into the learning process.

In [None]:
# -----------------------------------------------------------------------------------
def NormalizeAndReshapeImage(p_tImage, p_tLabel):
    # Normalizes color component values from `uint8` to `float32`.
    tNormalizedImage = tf.cast(p_tImage, tf.float32) / 255.
    # Reshapes the 3D tensor of the image (28x28x1) into a 782-dimensional vector
    tNormalizedImage = tf.reshape(tNormalizedImage, [CONFIG["DNN.InputFeatures"]])
    # Target class labels into one-hot encoding
    tTargetOneHot = tf.one_hot(p_tLabel, CONFIG["DNN.Classes"])
    
    return tNormalizedImage, tTargetOneHot
# -----------------------------------------------------------------------------------
def NormalizeImage(p_tImage, p_tLabel):
    # Normalizes color component values from `uint8` to `float32`.
    tNormalizedImage = tf.cast(p_tImage, tf.float32) / 255.
    # Target class labels into one-hot encoding
    tTargetOneHot = tf.one_hot(p_tLabel, CONFIG["CNN.Classes"])
    
    return tNormalizedImage, tTargetOneHot
# -----------------------------------------------------------------------------------


nBatchSize = CONFIG["Training.BatchSize"]

# ...... Training data feed pipeline ......
if IS_CNN:
  # For a Convolutional Neural Network we use a 3D tensor as input
  oTSData = oTSData.map(NormalizeImage, num_parallel_calls=tf.data.AUTOTUNE)
else:
  # For an MLP Neural Network we use the reshaped image vector a input
  oTSData = oTSData.map(NormalizeAndReshapeImage, num_parallel_calls=tf.data.AUTOTUNE)
    
oTSData = oTSData.cache()
oTSData = oTSData.shuffle(oDataSetInfo.splits['train'].num_examples)
oTSData = oTSData.batch(nBatchSize)
oTSData = oTSData.prefetch(tf.data.AUTOTUNE)
print("Training data feed object:", oTSData)

# ...... Validation data feed pipeline ......
if IS_CNN:
  oVSData = oVSData.map(NormalizeImage, num_parallel_calls=tf.data.AUTOTUNE)    
else:
  oVSData = oVSData.map(NormalizeAndReshapeImage, num_parallel_calls=tf.data.AUTOTUNE)
    
oVSData = oVSData.batch(oDataSetInfo.splits['test'].num_examples)
print("Validation data feed object:", oVSData)

# Convolutional Neural Network Class
We declare the class for a **Convolutional Deep Neural Network (CNN)** that has a variable depth of convolutional layers and support for max pooling layers after each convolutional layer.

In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras.layers import InputLayer, Flatten, Dense, BatchNormalization, Activation, Softmax
from tensorflow.keras.layers import Conv2D, MaxPooling2D  
from tensorflow.keras.regularizers import L2
# =========================================================================================================================
class CCNNCustom(keras.Model):
  # --------------------------------------------------------------------------------------
  # Constructor
  def __init__(self, p_oConfig):
    super(CCNNCustom, self).__init__()
    
    # ..................... Object Attributes ...........................
    self.Config = p_oConfig
    
    self.InputShape         = self.Config["CNN.InputShape"]
    self.ClassCount         = self.Config["CNN.Classes"]
    self.ModuleCount        = self.Config["CNN.ModuleCount"]
    
    self.ConvLayerFeatures  = self.Config["CNN.ConvOutputFeatures"]
    self.ConvWindows        = self.Config["CNN.ConvWindows"]
    self.PoolWindows        = self.Config["CNN.PoolWindows"]
    
    if "CNN.HasBatchNormalization" not in self.Config:
        self.Config["CNN.HasBatchNormalization"] = False
    
    self.KerasLayers        = []

    self.OutputLayer        = None
    self.SoftmaxActivation  = None
    self.Input              = None
    self.Structure          = None
    # ...................................................................
    
    # Default values for extra customization
    
    if "CNN.ActivationFunction" not in self.Config:
        self.Config["CNN.ActivationFunction"] = "relu"
                
    if "CNN.ConvHasBias" not in self.Config:
        self.Config["CNN.ConvHasBias"] = False

    if "CNN.KernelInitializer" not in self.Config:
        self.Config["CNN.KernelInitializer"] = "glorot_uniform"

    if "CNN.BiasInitializer" not in self.Config:
        self.Config["CNN.BiasInitializer"] = "zeros"

    if "Training.RegularizeL2" not in self.Config:
        self.Config["Training.RegularizeL2"] = False
                 
    if "Training.WeightDecay" not in self.Config:
        self.Config["Training.WeightDecay"] =  1e-5
        
    if self.Config["Training.RegularizeL2"]:
        print("Using L2 regularization of weights with weight decay %.6f" % self.Config["Training.WeightDecay"])

                                    
    self.Create()
  # --------------------------------------------------------------------------------------
  def Create(self):                # override a virtual in our base class
    # This loop creates stacked convolutional modules of the form   CONVOLUTION - ACTIVATION - NORMALIZATION - MAX POOLING
    for nModuleIndex in range(0, self.ModuleCount):
      nFeatures     = self.ConvLayerFeatures[nModuleIndex]
      oConvWindowSetup = self.ConvWindows[nModuleIndex]
      nWindowSize   = oConvWindowSetup[0]
      nStride       = oConvWindowSetup[1]
      
      sPaddingType      = "valid"
      if len(oConvWindowSetup) == 3:
          bIsPadding    = oConvWindowSetup[2]
          if bIsPadding:
              sPaddingType = "same"
      
      if self.Config["Training.RegularizeL2"]:
          oWeightRegularizer = L2(self.Config["Training.WeightDecay"])
      else:
          oWeightRegularizer = None
                        
      oConvolution = Conv2D(nFeatures, kernel_size=nWindowSize, strides=nStride, padding=sPaddingType
                            , use_bias=self.Config["CNN.ConvHasBias"]
                            , kernel_regularizer=oWeightRegularizer
                            , kernel_initializer=self.Config["CNN.KernelInitializer"]
                            , bias_initializer=self.Config["CNN.BiasInitializer"])
      self.KerasLayers.append(oConvolution)
      
      oActivation  = Activation(self.Config["CNN.ActivationFunction"])
      self.KerasLayers.append(oActivation)
      
      if self.Config["CNN.HasBatchNormalization"]:
          oNormalization = BatchNormalization()
          self.KerasLayers.append(oNormalization)
      
      oPoolWindow   = self.PoolWindows[nModuleIndex]
      # Set the pool size to None for a module that does not do Max Pooling.
      if oPoolWindow is not None:
          nPoolSize   = oPoolWindow[0]
          nPoolStride = oPoolWindow[1]
          oMaxPooling = MaxPooling2D(pool_size=[nPoolSize, nPoolSize], strides=[nPoolStride, nPoolStride])
          self.KerasLayers.append(oMaxPooling)
          
    
    # After the stack of convolutional modules, the activation cube will be flattened to a vector using a Flatten keras layer
    self.FlatteningLayer = Flatten()
    
    
    # The output layer for the classifier is a fully connected (dense) that has one neuron for each class.
    # You might consider the stack of convolutional modules functioning as the "hidden" layer in the 2-layer NN architecture.
    if self.Config["Training.RegularizeL2"]:
        oWeightRegularizer = L2(self.Config["Training.WeightDecay"])
    else:
        oWeightRegularizer = None          
    self.OutputLayer = Dense(self.ClassCount, use_bias=True
                             ,kernel_regularizer=oWeightRegularizer )
    
    # Instead of using sigmoid for each neuron, we use the softmax activation function so that neuron "fire" together. 
    self.SoftmaxActivation = Softmax()           
  # --------------------------------------------------------------------------------------------------------
  def call(self, p_tInput):        # overrides a virtual in keras.Model class
    bPrint = self.Structure is None
    if bPrint:
        self.Structure = []
      
    self.Input = p_tInput
    
    # ....... Convolutional Feature Extraction  .......
    # Feed forward to the next layer
    tA = p_tInput
    for nIndex,oKerasLayer in enumerate(self.KerasLayers):
        if bPrint:
            self.Structure.append([nIndex + 1, str(tA.name), str(tA.shape)])         
        tA = oKerasLayer(tA)

    # Flattens the activation cube to a vector
    tA = self.FlatteningLayer(tA)
    if bPrint:
        nIndex += 1
        self.Structure.append([nIndex + 1, str(tA.name), str(tA.shape)])        
    
    # ....... Classifier  .......
    # Fully connected (dense) layer that has a count of neurons equal to the classes, with softmax activation function
    tA = self.OutputLayer(tA)
    if bPrint:
        nIndex += 1
        self.Structure.append([nIndex + 1, str(tA.name), str(tA.shape)])        
    
    tA = self.SoftmaxActivation(tA)
    if bPrint:
        nIndex += 1
        self.Structure.append([nIndex + 1, str(tA.name), str(tA.shape)])        
        
    
    return tA
  # --------------------------------------------------------------------------------------
# =========================================================================================================================

# Create the Neural Network model and training algorithm objects


In [None]:
from models.CNN import CCNNBasic

oNN = CCNNBasic(CONFIG)

# -----------------------------------------------------------------------------------
def LRSchedule(epoch, lr):
    if epoch == 10:
        nNewLR = lr * 0.5
        print("Setting LR to %.5f" % nNewLR)
        return nNewLR
    else:
        return lr
# -----------------------------------------------------------------------------------    

nInitialLearningRate    = CONFIG["Training.LearningRate"]    

oCostFunction   = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
oOptimizer = tf.keras.optimizers.Adam(learning_rate=nInitialLearningRate)
oCallbacks = None   

# Training Process

In [None]:
sModelFolderName = CONFIG["ModelName"]
        
if (not os.path.isdir(sModelFolderName)) or IS_RETRAINING:
    oNN.compile(loss=oCostFunction, optimizer=oOptimizer, metrics=["accuracy"])

    if IS_DEBUGABLE:
        oNN.run_eagerly = True
        
    oProcessLog = oNN.fit(  oTSData, batch_size=nBatchSize
                            ,epochs=CONFIG["Training.MaxEpoch"]
                            ,validation_data=oVSData
                            ,callbacks=oCallbacks
                          )
    oNN.summary()          
    oNN.save(sModelFolderName)      
else:
    # The model is trained and its state is saved (all the trainable parameters are saved). We load the model to recall the samples 
    oNN = keras.models.load_model(sModelFolderName)
    oProcessLog = None
    oNN.summary()    

# Model Architecture Overview

In [None]:
import csv

with open("Model-Structure-%s.csv" % CONFIG["ModelName"], 'w') as f: 
    write = csv.writer(f)
    for oItem in oNN.Structure:
        print(oItem) 
        write.writerow(oItem) 

# Learning Process Overview

In [None]:
if oProcessLog is not None: # [PYTHON] Checks that object reference is not Null
    # list all data in history
    print("Keys of Keras training process log:", oProcessLog.history.keys())
    
    # Plot the accuracy during the training epochs
    plt.plot(oProcessLog.history['accuracy'])
    plt.plot(oProcessLog.history['val_accuracy'])
    plt.title('CNN Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()
    
    # Plot the error during the training epochs
    sCostFunctionNameParts = oCostFunction.name.split("_")                           # [PYTHON]: Splitting string into an array of strings
    sCostFunctionNameParts = [x.capitalize() + " " for x in sCostFunctionNameParts]  # [PYTHON]: List comprehension example 
    sCostFunctionName = " ".join(sCostFunctionNameParts)                             # [PYTHON]: Joining string in a list with the space between them
    
    
    plt.plot(oProcessLog.history['loss'])
    plt.plot(oProcessLog.history['val_loss'])
    plt.title('CNN ' + sCostFunctionName + " Error")
    plt.ylabel('Error')
    plt.xlabel('Epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

# Inference
Recalling samples to predict their class (i.e. classify).


In [None]:
nPredictedProbabilities = oNN.predict(oVSData)
nPredictedClassLabels  = np.argmax(nPredictedProbabilities, axis=1)

for nIndex, nProbs in enumerate(nPredictedProbabilities):
  if nIndex < 5:
    print("#%.2d Predicted:%d (Probabilities:%s) Actual:%d" % (nIndex+1, nPredictedClassLabels[nIndex], nProbs, nTargetClassLabels[nIndex])) # [PYTHON] Format string example
    if nIndex == 0:
      print("Sum of all output neuron activations:%.3f" % np.sum(nProbs))


# Evaluation

In [None]:
from mllib.evaluation import CEvaluator
from mllib.visualization import CPlotConfusionMatrix

# We create an evaluator object that will produce several metrics
oEvaluator = CEvaluator(nTargetClassLabels, nPredictedClassLabels)

oEvaluator.PrintConfusionMatrix()
print("Per Class Recall (Accuracy)  :", oEvaluator.Recall)
print("Per Class Precision          :", oEvaluator.Precision)
print("Average Accuracy: %.4f" % oEvaluator.AverageRecall)
print("Average F1 Score: %.4f" % oEvaluator.AverageF1Score)
      
oConfusionMatrixPlot = CPlotConfusionMatrix(oEvaluator.ConfusionMatrix)
oConfusionMatrixPlot.Show()      
