## Imports

In [None]:
import cv2
import os
import glob
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt


from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Dense, BatchNormalization, GlobalAveragePooling2D, ReLU, RandomFlip, Dropout
from tensorflow.keras.optimizers import Adam

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

from dotenv import load_dotenv

## Data Loading

In [None]:
class LoadData:
  """
  LoadData class
  """

  def __init__(self, base_path):
    """
    Default constructor.
    """
    self.base_path = base_path

  def load_data(self):
    """
    Purpose:
      Loads the paths of all of the images, and create a one hot encoding for 
      their classifications. x and y lists share an index relation.
    Args:
      self - class instance.
    Returns:
      x - list of image paths.
      y - list of one hot encodings of each image's classification.
    """
    x = []
    y = []
    # Loop through each directory.
    for class_number in range(10):
      class_number_str = 'c' + str(class_number)
      # Path to all images in current class directory.
      path = os.path.join(self.base_path, 'imgs/data', class_number_str, '*.jpg')
      # Gets all file names matching given path.
      file_paths = glob.glob(path)
      sub_x = []
      sub_y = []
      # Loops through each path in the current class directory.
      for file_path in file_paths:
          sub_x.append(file_path)
          # Create one hot encoding.
          temp = np.zeros(10)
          temp[class_number] = 1
          sub_y.append(temp)
      # Shuffle the paths.
      self.shuffle_data(sub_x)
      x.append(sub_x)
      y.append(sub_y)
    print("Saved all image paths.")
    return x, y

  def shuffle_data(self, x):
    """
    Purpose:
      Shuffles the values of the given array.
    Args:
      self - class instance.
      x - the array to shuffle.
    """
    np.random.shuffle(x)

In [None]:
HEIGHT = 256
WIDTH = 256

load_dotenv()
PATH = os.getenv('PATH_TO_DATA')
x, y = LoadData(PATH).load_data()

## Data Preprocessing


In [None]:
class PreProcessing:
  """
  PreProcessing class.
  """

  def __init__(self):
    """
    Default constructor.
    """
    self.kernel = np.array([[-1, -1, -1],
                  [-1, 8,-1],
                  [-1, -1, -1]])
  
  def get_colour_type(self, img_path):
    """
    Purpose:
      Gets the colour type of the given image.
    Args:
      self - class instance.
      img_path - the path of the image to check the colour type of.
    Returns:
      The number of channels the image has. 3 for RBG/HSV and 1 for grayscale.
    """
    image = cv2.imread(img_path)
    if len(image.shape) == 3: return 3
    return 1

  def preprocess_image(self, img_path, height, width, training):
    """
    Purpose:
      Takes image path, reads it and applies image processing to it.
      Threshold grayscale image, sharpen a copy of the base image,
      add the results to get the final image, and resize the image 
      to the specified height and width. If training is true,
      image will have random erasing applied to it.
    Args:
      self - class instance.
      img_path - the path to the image.
      height - the height of the final image.
      width - the width of the final image.
      training - the training flag.
    Returns:
      Preprocessed image of type ndarray.
    """

    color_type = self.get_colour_type(img_path)
    # Image is in grayscale.
    if color_type == 1:
      img = cv2.imread(img_path, 0)
      # Apply adaptive thresholding.
      img_gray = cv2.threshold(img,0,255,cv2.THRESH_TRUNC+cv2.THRESH_OTSU) 
      # Sharpen image using lapacian filter.
      image_sharp = cv2.filter2D(src=img, ddepth=-1, kernel=self.kernel)

    # Image is in BGR/HSV
    elif color_type == 3:
      img = cv2.imread(img_path)
      # Convert to grayscale.
      img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
      # Apply adaptive thresholding.
      img_gray = cv2.threshold(img_gray,0,255,cv2.THRESH_TRUNC+cv2.THRESH_OTSU)
      # Sharpen image using laplacian filter.
      image_sharp = cv2.filter2D(src=img, ddepth=-1, kernel=self.kernel)
      # Convert sharpened image to grayscale.
      image_sharp = cv2.cvtColor(image_sharp, cv2.COLOR_BGR2GRAY)
    
    # Combine thresholded image and sharpened image.
    combined = cv2.add(image_sharp, img_gray[1])
    # Resize image.
    dst = cv2.resize(combined, (width, height))
    # Convert to BGR.
    img = cv2.cvtColor(dst, cv2.COLOR_GRAY2BGR)
    # Checks if image is apart of training set.
    if training:
      # Apply random erasing to processed image.
      img = self.random_erasing(img)
    return img

  def random_erasing(self, image, probability=0.5, sl = 0.02, sh = 0.4, r1 = 0.3, mean=[0.4914, 0.4822, 0.4465]):
    """
    Function that performs Random Erasing in Random Erasing Data Augmentation by Zhong et al. 
    -------------------------------------------------------------------------------------
    probability: The probability that the operation will be performed.
    sl: min erasing area
    sh: max erasing area
    r1: min aspect ratio
    mean: erasing value
    ------
    Source: https://github.com/zhunzhong07/Random-Erasing/blob/master/transforms.py
    """
    if np.random.uniform(0, 1) > probability:
        return image
    area = image.shape[0] * image.shape[1]
    for _ in range(100):
        target_area = np.random.uniform(sl, sh) * area
        aspect_ratio = np.random.uniform(r1, 1/r1)

        h = int(round(np.sqrt(target_area * aspect_ratio)))
        w = int(round(np.sqrt(target_area / aspect_ratio)))

        if w < image.shape[1] and h < image.shape[0]:
            x1 = np.random.randint(0, image.shape[0] - h)
            y1 = np.random.randint(0, image.shape[1] - w)
            if image.shape[2] == 3:
                image[x1:x1+h, y1:y1+w, 0] = mean[0]
                image[x1:x1+h, y1:y1+w, 1] = mean[1]
                image[x1:x1+h, y1:y1+w, 2] = mean[2]
            else:
                image[x1:x1+h, y1:y1+w, 0] = mean[0]
            return image
    return image
  
  def split_data(self, x, y, height, width):
    """
    Purpose:
      Split the data into test and train, and process the images.
    Args:
      self - class instance.
      x - the paths of all of the images.
      y - the classifications, related to x by index.
      height - the height of the image used in resizing.
      width - the width of the image used in resizing.
    Returns:
      ndarrays of x and y train and test sets.
    """
    x_train = []
    y_train = []
    x_test = []
    y_test = []
    # Get the split points of each class.
    split_points = self.percent_indexes(x)
    # Loop through all classes
    for class_num, (xi, yi) in enumerate(zip(x, y)):
      print(f"Preprocessing class: {class_num}.")
      # Loop through each image in the class.
      for image_number, (image_path, out) in enumerate(zip(xi, yi)):
        # Check if its less than the split point.
        if image_number < split_points[class_num]:
          # Process the image with training flag set to false, add it to the x_test
          # list along with the one hot encoding classification for the image.
          image = self.preprocess_image(image_path, height, width, False)
          x_test.append(image)
          y_test.append(out)
        else:
          # Process the image with training flag set to True, add it to the x_train
          # list along with the one hot encoding classification for the image.
          image = self.preprocess_image(image_path, height, width, True)
          x_train.append(image)
          y_train.append(out)
    return np.array(x_train), np.array(y_train), np.array(x_test), np.array(y_test)
  
  def percent_indexes(self, x):
    """
    Purpose:
      Used to get the points of where the x_test and x_train will be split.
      Split point is 20% of the number of images in each class.
    Args:
      self - class instance.
      x - the paths of all of the images.
    Returns:
      Returns a list of 10 index points to split each class.
    """
    split_points = []
    for xi in x:
      number_of_images = len(xi)
      split_point = int(number_of_images*0.2)
      split_points.append(split_point)
    return split_points

In [None]:
p = PreProcessing()
x_train, y_train, x_test, y_test = p.split_data(x, y, HEIGHT, WIDTH)

## Model

In [None]:
class VGG16:
  """
  VGG16 model class.
  """

  def __init__(self, input_shape=(None, None, None, 3)):
    """
    Default constructor.
    """
    self.model = self.create_model(input_shape)
    self.model.summary()

  def create_model(self, input_shape):
    """
    Purpose:
      Creates the VGG16 model.
    Args:
      self - class instance.
      input_shape - the shape of the input into the model. 
                    (BATCH SIZE, HEIGHT, WIDTH, NUMBER OF CHANNELs)
    Returns:
      The compiled tensorflow model.
    """
    # Data augmentation.
    data_aug = Sequential([RandomFlip("horizontal")])

    # Model.
    model = Sequential([
      # Data augmentation layer.
      data_aug,

      # First Convolutional Block.
      Conv2D(32, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(32, (3, 3), padding='same'),
      BatchNormalization(axis=3),
      ReLU(),
      MaxPool2D((2, 2), strides=(2, 2)),

      # Second Convolutional Block.
      Conv2D(64, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(64, (3, 3), padding='same'),
      BatchNormalization(axis=3),
      ReLU(),
      MaxPool2D((2, 2), strides=(2, 2)),

      # Third Convolutional Block.
      Conv2D(128, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(128, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(128, (3, 3), padding='same'),
      BatchNormalization(axis=3),
      ReLU(),
      MaxPool2D((2, 2), strides=(2, 2)),

      # Fourth Convolutional Block.
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(axis=3),
      ReLU(),
      MaxPool2D((2, 2), strides=(2, 2)),

      # Fifth Convolutional Block.
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(),
      ReLU(),
      MaxPool2D((1, 1), strides=(1, 1)),
      Conv2D(256, (3, 3), padding='same'),
      BatchNormalization(axis=3),
      ReLU(),
      MaxPool2D((2, 2), strides=(2, 2)),

      # Fully Connected Layers.
      GlobalAveragePooling2D(),
      Dense(1024, activation='relu'),
      Dropout(0.5),
      Dense(10, activation='softmax')
    ])
    # Build the model with the provided input shape.
    model.build(input_shape=input_shape)
    # Compile the model with adam optimizer and categorical crossentropy.
    model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

  def fit_model(self, x_train, y_train, epochs, batch_size, verbose):
    """
    Purpose:
      Fits the model with the training data sets.
    Args:
      self - class instance.
      x_train - the input training data set.
      y_train - the output training data set.
      epochs - the number of epochs to train the model.
      batch_size - the batch size to use in training.
      verbose - the verbose flag to use to display the training progress.
    Returns:
      None
    """
    self.model.fit(x_train, y_train, epochs=epochs, verbose=verbose, batch_size=batch_size)
    
  def evaluate_model(self, x_test, y_test):
    """
    Purpose:
      Evaluates the model with the test data set.
    Args:
      self - class instance.
      x_test - the input test data set.
      y_test - the output test data set.
    Returns:
      None
    """
    test_loss, test_acc = self.model.evaluate(x_test, y_test)
    print(f"Test lost: {test_loss} -- Test accuracy: {test_acc}")

  def predict_model(self, x_test, y_test, batch_size, verbose):
    """
    Purpose:
      Predict the output of the model with the input test set.
    Args:
      self - class instance.
      x_test - the input test data set.
      y_test - the output test data set.
      verbose - the verbose flag to use to display the predicting progress.
    Returns:
      None
    """
    y_pred = self.model.predict(x_test, batch_size=batch_size, verbose=verbose)
    # Display confusion matrix and accuracy of predicted values.
    self.display_prediction_results(y_pred, y_test)

  def display_prediction_results(self, y_pred, y_test):
    """
    Purpose:
      Displays the confusion matrix and the models accuracy, recall, precision and F1 score.
    Args:
      self - class instance.
      y_pred - the predicted outputs.
      y_test - the actual outputs.
    Returns:
      None
    """
    # Get the index of the max value in each sub array.
    y_pred = np.argmax(y_pred, axis=1)
    y_test = np.argmax(y_test, axis=1)
    # Names of possible classes.
    class_names = ['c0','c1','c2','c3','c4','c5','c6','c7','c8','c9']
    # Generate confusion matrix.
    conf_mat = confusion_matrix(y_test, y_pred)
    display = ConfusionMatrixDisplay(confusion_matrix=conf_mat, display_labels=class_names)
    # Increases the size of the displayed confusion matrix
    fig, ax = plt.subplots(figsize=(10,10))
    # Plot the confusion matrix.
    display.plot(ax=ax, values_format='')
    # Output the models accuracy, recall, precision and F1 score.
    print(classification_report(y_test, y_pred, target_names=class_names))



In [None]:
EPOCHS = 15
VERBOSE = 1
BATCH_SIZE = 16


model = VGG16((None, HEIGHT, WIDTH, 3))

In [None]:
model.fit_model(x_train, y_train, EPOCHS, BATCH_SIZE, VERBOSE)
model.evaluate_model(x_test, y_test)
model.predict_model(x_test, y_test, BATCH_SIZE, VERBOSE)