## Neural Network for MNIST Classification:
---
Author : Frissian Viales\
Date : 09-13-2020

### Setup:

In [None]:
# Import Libraries:
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, Dropout, Conv2D, BatchNormalization, MaxPooling2D, Flatten
from tensorflow.keras import Sequential
from keras.utils import to_categorical
from keras.optimizers import SGD
from keras.models import load_model
from sklearn.datasets import fetch_openml
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split

In [None]:
def Fetch_Data():

  """ A function to download the MNIST dataset from skelarn.  """

  # Fetch data:
  mnist = fetch_openml('mnist_784', version=1, cache=True)

  # Preprocess data:
  mnist.target = mnist.target.astype(np.int8)
  reorder_train = np.array(sorted([(target, i) for i, target in enumerate(mnist.target[:60000])]))[:, 1]
  reorder_test = np.array(sorted([(target, i) for i, target in enumerate(mnist.target[60000:])]))[:, 1]
  mnist.data[:60000] = mnist.data[reorder_train]
  mnist.target[:60000] = mnist.target[reorder_train]
  mnist.data[60000:] = mnist.data[reorder_test + 60000]
  mnist.target[60000:] = mnist.target[reorder_test + 60000]

  # Assign features and target variables:
  X, y = mnist["data"], mnist["target"]
  print('Fetched MNIST dataset successfully.')
  return [X,y]

def Stratified_Split(X, y):

  """ A function to perform a stratified train-val-test split.""" 

  # Execute train/test split:
  X_train, X_test, y_train, y_test = train_test_split(
      X, y, 
      train_size=0.7, 
      random_state=69,
      stratify=y
      )
  
  # Execute test/validation split:
  X_test, X_val, y_test, y_val = train_test_split(
      X_test, y_test, 
      train_size=0.33, 
      random_state=69,
      stratify=y_test
      )
  
  print('Splited data into Train-Validation-Test sets.')
  return [X_train, X_test, X_val, y_train, y_test, y_val]    

from scipy.ndimage.interpolation import shift

def Shift_Image(image, dx, dy):

  """ A function to shift an image into one direction."""

  # Reshape:
  image = image.reshape((28, 28))
  shifted_image = shift(image, [dy, dx], cval=0, mode="constant")
  return shifted_image.reshape([-1])

def Data_Augmentation(X_train, y_train):

  """ A function to generate more training data for image classification.
  It shifts an image one pixel to each side. """

  # Initialize lists:
  X_train_augmented = [image for image in X_train]
  y_train_augmented = [label for label in y_train]

  # Create loops to shift each image to each side:
  for dx, dy in ((1, 0), (-1, 0), (0, 1), (0, -1)):

    for image, label in zip(X_train, y_train):

        X_train_augmented.append(Shift_Image(image, dx, dy))
        y_train_augmented.append(label)

  # Convert back to array:
  X_train_augmented = np.array(X_train_augmented)
  y_train_augmented = np.array(y_train_augmented)

  # Reshuffle data:
  shuffle_idx = np.random.permutation(len(X_train_augmented))
  X_train_augmented = X_train_augmented[shuffle_idx]
  y_train_augmented = y_train_augmented[shuffle_idx]
  print('Completed data augmentation successfully.')

  return [X_train_augmented, y_train_augmented]


def Data_Preprocessing(X, y, model_type='DNN'):

  """ A function to prepare the dataset for passing it to a NN.""" 

  # Perform data normalization: 
  Scaler = MinMaxScaler()
  X_Scaled = Scaler.fit_transform(X)

  # Perform Train-Validation-Test split:
  X_train, X_test, X_val, y_train, y_test, y_val = Stratified_Split(X_Scaled, y)

  # Perform data augmentation:
  X_train_augmented, y_train_augmented = Data_Augmentation(X_train, y_train)

  # Reshape data to 3D for CNN models:
  if model_type == 'CNN': 

    X_train_augmented = X_train_augmented.reshape((X_train_augmented.shape[0], 28, 28, 1))
    X_test = X_test.reshape((X_test.shape[0], 28, 28, 1))
    X_val = X_val.reshape((X_val.shape[0], 28, 28, 1))

  # Perform One-Hot encoding to the outputs:
  y_train_augmented = to_categorical(y_train_augmented)
  y_test = to_categorical(y_test)
  y_val = to_categorical(y_val)

  print('Data preprocessing completed.\n')
  return [X_train_augmented, y_train_augmented, X_val, y_val, X_test, y_test]

In [None]:
def CNN_Model():

  # Initialize sequencial model:
  model = Sequential()

  # Add hidden layers:
  model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=(28, 28, 1)))
  model.add(MaxPooling2D((2, 2)))
  model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform'))
  model.add(Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_uniform'))
  model.add(MaxPooling2D((2, 2)))
  model.add(Flatten())
  model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
  model.add(Dense(10, activation='softmax'))

  # Compile the model:
  opt = SGD(lr=0.01, momentum=0.9)
  model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
  return model

def DNN_Model():
  
  # Initialize sequencial model:
  model = Sequential()

  # Add hidden layers:
  model.add(Dense(784, activation='relu', input_shape=(784,)))
  model.add(Dense(100, activation='relu'))
  model.add(Dropout(0.2))
  model.add(Dense(100, activation='relu'))
  model.add(Dropout(0.2))
  model.add(Dense(100, activation='relu'))
  model.add(Dropout(0.2))        
  model.add(Dense(100, activation='relu'))
  model.add(Dropout(0.2))
  model.add(Dense(100, activation='relu'))
  model.add(Dropout(0.2))
  model.add(Dense(10, activation='softmax'))
  
  # Compile the model:
  model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
  return model

In [None]:
# evaluate a model using k-fold cross-validation
def Evaluate_Model(model, X_train, y_train, X_val, y_val, n_folds=5, epochs=10):

  """ A function to perform k-fold cross validation. """ 

  # Initialize lists to store results:
  print('Initializing model training.\n')
  scores, histories = list(), list()
  # Prepare cross validation:
  kfold = KFold(n_folds, shuffle=True, random_state=69)
  n = 1

  # Enumerate splits:
  for train_ix, test_ix in kfold.split(X_train):

    # Select rows for train and test:
    trainX, trainY, testX, testY = X_train[train_ix], y_train[train_ix], X_train[test_ix], y_train[test_ix]
    # Train the model:
    history = model.fit(trainX, trainY, epochs=epochs, validation_data=(testX, testY), verbose=1)
    # Evaluate the model:
    _, acc = model.evaluate(X_val, y_val, verbose=0)
    print('> %.3f' % (acc * 100.0))
    # Save results:
    scores.append(acc)
    histories.append(history)
    print(f'Completed training round {n} out of {n_folds}.')
    n = n + 1

  return scores, histories

In [None]:
# plot diagnostic learning curves
def Plot_Learning_Curves(histories):

  """ PLot learning curves. """
  
  # Set matplotlib style and plot size:
  plt.style.use('dark_background')
  plt.figure(figsize=(21,13))

  # Plot data:
  for i in range(len(histories)):

    # Plot training and validation loss:
    plt.subplot(2, 1, 1)
    plt.title('Cross Entropy Loss', fontsize=16)
    n = len(histories[i].history['loss'])
    epochs = np.arange(1, n+1, step=1.0)
    plt.plot(epochs, histories[i].history['loss'], color='cyan', label=f'Training_{i+1}')
    plt.plot(epochs, histories[i].history['val_loss'], color='#ff9900', label=f'Testing_{i+1}')
    plt.xticks(np.arange(1, n+1, step=1.0))

      # Plot training and validation accuracy:
    plt.subplot(2, 1, 2)
    plt.title('Classification Accuracy', fontsize=16)
    n = len(histories[i].history['loss'])
    epochs = np.arange(1, n+1, step=1.0)  
    plt.plot(epochs, histories[i].history['accuracy'], color='cyan', label=f'Training_{i+1}')
    plt.plot(epochs, histories[i].history['val_accuracy'], color='#ff9900', label=f'Testing_{i+1}')
    plt.xticks(np.arange(1, n+1, step=1.0))  
    plt.xlabel('epochs', fontsize=12)

  plt.legend(['Training','Test'], fontsize=12)  

# summarize model performance
def Scores_BoxPlot(scores):

  """ Plot distribution of evaliation scores per cross fold. """

  # Configure style and set parameters:
  plt.style.use('dark_background')
  edge_color = 'crimson'
  fill_color = 'lightsalmon'
  mean = round(np.mean(scores)*100, 2)
  median = round(np.median(scores)*100, 2)
  std = round(np.std(scores)*100, 2)

  # Summary stats:
  print(f'\nAccuracy Summary: Mean={mean}% Median={median}% Std={std}%\n')

  # Plot boxplot:
  plt.figure(figsize=(13,8))
  plt.title('Evaluation Scores Distirbution', fontsize=16)
  box = plt.boxplot(scores, patch_artist=True)

  # Format colors:
  for element in ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps']:
    plt.setp(box[element], color=edge_color, linewidth=2)

  for patch in box['boxes']:
    patch.set(facecolor=fill_color)     

  plt.show()  

In [None]:
# run the test harness for evaluating a model
def Run_Test_Harness(model_type='CNN', epochs=10, n_folds=5, save_model=False):

  """ Run test """ 

  # Load dataset:
  X, y = Fetch_Data()

  # Process data: 
  X_train, y_train, X_val, y_val, X_test, y_test = Data_Preprocessing(X, y, model_type=model_type)

  # CNN model training:
  if model_type == 'CNN':

    # Initialize model:
    model = CNN_Model()

    # Just fit the model and save it:
    if save_model:

      model.fit(X_train, y_train, epochs=epochs, verbose=1)
      model.save(f'MNIST_Keras_{model_type}.h5')

    # Just perform cross evaluation:
    else:

      # Evaluation: 
      scores, histories = Evaluate_Model(
          model,
          X_train,
          y_train, 
          X_val, 
          y_val,
          epochs=epochs,
          n_folds=n_folds
          )
      
      # # Plot learning curves:
      Plot_Learning_Curves(histories)

      # Report estimated performance:
      Scores_BoxPlot(scores) 

  # DNN model training:     
  elif model_type == 'DNN':

    # Initialize model:
    model = DNN_Model()

    # Just fit the model and save it:
    if save_model:

      model.fit(X_train, y_train, epochs=epochs, verbose=1)
      model.save(f'MNIST_Keras_{model_type}.h5') 
      print(f'Saved MNIST_Keras_{model_type} successfully.')   
    
    # Just perform cross evaluation:
    else:
      scores, histories = Evaluate_Model(
          DNN_Model(),
          X_train,
          y_train, 
          X_val, 
          y_val,
          epochs=epochs,
          n_folds=n_folds
          )  

      # Plot learning curves:
      Plot_Learning_Curves(histories)

      # Report estimated performance:
      Scores_BoxPlot(scores) 

  else:

    print('Model type does not exist.') 

  print('\nEnd of script.')
  return None

### Run Test Harness:

In [None]:
Run_Test_Harness(model_type='CNN', epochs=10, n_folds=5)

In [None]:
Run_Test_Harness(model_type='DNN', epochs=10, n_folds=5)