## Load and Preprocess

In [1]:
import tqdm
import random
import pathlib
import itertools
import collections

import os
import einops
import cv2 as cv
import numpy as np
import pandas as pd
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
import keras
from   keras import layers

In [2]:
train_full = pd.read_csv("./MNIST Dataset/sign_mnist_train.csv")
test       = pd.read_csv("./MNIST Dataset/sign_mnist_test.csv")

In [3]:
label_map = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"]
np.save("./labels.npy", label_map)

In [4]:
X_train_full = train_full.values[:,1:].astype(np.float32())
y_train_full = train_full.values[:,0]
X_test = test.values[:,1:].astype(np.float32())
y_test = test.values[:,0]

In [5]:
# Splitting validation set from the training set using stratified splitting
from sklearn.model_selection import StratifiedShuffleSplit

split = StratifiedShuffleSplit(n_splits = 1, test_size = 2500)
for train_index, test_index in split.split(X_train_full, y_train_full):
    X_train, X_val = X_train_full[train_index], X_train_full[test_index]
    y_train, y_val = y_train_full[train_index], y_train_full[test_index]

In [7]:
# Writes the csv files as .avi files
letter_storage  = np.zeros(len(label_map), dtype = np.int16)
train_sequences = [0] * X_train.shape[0]
test_sequences  = [0] * X_test.shape[0]
val_sequences   = [0] * X_val.shape[0]
temp_sequence   = [0] * 30
size = (28, 28)

try:
    os.mkdir("./MNIST Images/")
    os.mkdir("./MNIST Images/train/")
    os.mkdir("./MNIST Images/test/")
    os.mkdir("./MNIST Images/val/")
except Exception as e:
    print(e)
    pass

# Creates the training sequences
for i in range(0, X_train.shape[0]):
    path = "./MNIST Images/train/" + label_map[y_train[i]] + "/"
    name = str(letter_storage[y_train[i]]) + ".avi"
    try:
        os.mkdir(path)
    except Exception as e:
        # print(e)
        pass
    result = cv.VideoWriter(path + name, cv.VideoWriter_fourcc(*"MJPG"), 30, size)
    letter_storage[y_train[i]] += 1
    for j in range(0, 30):
        temp_sequence[j] = X_train[i]
    result.write(np.array(temp_sequence))
    train_sequences[i] = temp_sequence

# Creates the test sequences
for i in range(0, X_test.shape[0]):
    path = "./MNIST Images/test/" + label_map[y_test[i]] + "/"
    name = str(letter_storage[y_test[i]]) + ".avi"
    try:
        os.mkdir(path)
    except Exception as e:
        # print(e)
        pass
    result = cv.VideoWriter(path + name, cv.VideoWriter_fourcc(*"MJPG"), 30, size)
    for j in range(0, 30):
        temp_sequence[j] = X_test[i]
    result.write(np.array(temp_sequence))
    test_sequences[i] = temp_sequence

# Creates the val sequences
for i in range(0, X_val.shape[0]):
    path = "./MNIST Images/val/" + label_map[y_val[i]] + "/"
    name = str(letter_storage[y_val[i]]) + ".avi"
    try:
        os.mkdir(path)
    except Exception as e:
        # print(e)
        pass
    result = cv.VideoWriter(path + name, cv.VideoWriter_fourcc(*"MJPG"), 30, size)
    for j in range(0, 30):
        temp_sequence[j] = X_val[i]
    result.write(np.array(temp_sequence))
    val_sequences[i] = temp_sequence

train_sequences = np.array(train_sequences)
test_sequences  = np.array(test_sequences)
val_sequences   = np.array(val_sequences)

In [20]:
# Generates frames
class FrameGenerator:
    def __init__(self, n_frames, training = False):
        """
        Returns a set of frames with their associated label.
        @param n_frames: Number of frames.
        @param training: Boolean to determine if training dataset is being created.
        """
        self.n_frames = n_frames
        self.training = training
        self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
        self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

    def __call__(self):
        video_paths, classes = self.get_files_and_class_names()

        pairs = list(zip(video_paths, classes))

        if self.training:
            random.shuffle(pairs)

        for path, name in pairs:
            video_frames = frames_from_video_file(path, self.n_frames) 
            label = self.class_ids_for_name[name] # Encode labels
            yield video_frames, label

In [None]:
n_frames = 30
batch_size = 16

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths["train"], n_frames, training=True),
                                          output_signature = output_signature)

# Batch the data
train_ds = train_ds.batch(batch_size)

val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths["val"], n_frames),
                                        output_signature = output_signature)
val_ds = val_ds.batch(batch_size)

test_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths["test"], n_frames),
                                         output_signature = output_signature)

test_ds = test_ds.batch(batch_size)

## Create the Model

In [5]:
class Conv2Plus1D(keras.layers.Layer):
    def __init__(self, filters, kernel_size, padding):
        """
        A sequence of convolutional layers that first apply the convolution operation over the spatial dimensions, and then the temporal dimension. 
        """
        super().__init__()
        self.seq = keras.Sequential([  
            # Spatial decomposition
            layers.Conv3D(filters = filters,
                            kernel_size = (1, kernel_size[1], kernel_size[2]),
                            padding = padding),

            # Temporal decomposition
            layers.Conv3D(filters = filters, 
                            kernel_size = (kernel_size[0], 1, 1),
                            padding = padding)
            ])

    def call(self, x):
        return self.seq(x)

In [6]:
class ResidualMain(keras.layers.Layer):
    """
    Residual block of the model with convolution, layer normalization, and the activation function, ReLU.
    """
    def __init__(self, filters, kernel_size):
        super().__init__()
        self.seq = keras.Sequential([
            Conv2Plus1D(filters = filters,
                        kernel_size = kernel_size,
                        padding = "same"),
            layers.LayerNormalization(),
            layers.ReLU(),
            Conv2Plus1D(filters = filters, 
                        kernel_size = kernel_size,
                        padding = "same"),
            layers.LayerNormalization()
        ])

    def call(self, x):
        return self.seq(x)

In [7]:
class Project(keras.layers.Layer):
    """
    Project certain dimensions of the tensor as the data is passed through different sized filters and downsampled. 
    """
    def __init__(self, units):
        super().__init__()
        self.seq = keras.Sequential([
            layers.Dense(units),
            layers.LayerNormalization()
        ])

    def call(self, x):
        return self.seq(x)

In [8]:
def add_residual_block(input, filters, kernel_size):
    """
    Add residual blocks to the model. If the last dimensions of the input data and filter size does not match, project it such that last dimension matches.
    """
    out = ResidualMain(filters, kernel_size)(input)
    res = input

    # Using the Keras functional APIs, project the last dimension of the tensor to match the new filter size
    if out.shape[-1] != input.shape[-1]:
        res = Project(out.shape[-1])(res)

    return layers.add([res, out])

In [9]:
class ResizeVideo(keras.layers.Layer):
    def __init__(self, height, width):
        super().__init__()
        self.height = height
        self.width = width
        self.resizing_layer = layers.Resizing(self.height, self.width)

    def call(self, video):
        """
        Use the einops library to resize the tensor.
        @param video: Tensor representation of the video, in the form of a set of frames.
        @return A downsampled size of the video according to the new height and width it should be resized to.
        """
        # b stands for batch size, t stands for time, h stands for height, w stands for width, and c stands for the number of channels.
        old_shape = einops.parse_shape(video, "b t h w c")
        images = einops.rearrange(video, "b t h w c -> (b t) h w c")
        images = self.resizing_layer(images)
        videos = einops.rearrange(
        images, "(b t) h w c -> b t h w c",
        t = old_shape["t"])
        return videos

In [10]:
# Define the dimensions of one frame in the set of frames created
HEIGHT = 28
WIDTH  = 28

In [11]:
input_shape = (None, 10, HEIGHT, WIDTH, 3)
input = layers.Input(shape = (input_shape[1:]))
x = input

x = Conv2Plus1D(filters = 16, kernel_size = (3, 7, 7), padding = "same")(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 4
x = add_residual_block(x, 128, (3, 3, 3))

x = layers.GlobalAveragePooling3D()(x)
x = layers.Flatten()(x)
x = layers.Dense(10)(x)

model = keras.Model(input, x)

model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 10, 28, 28,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv2_plus1d (Conv2Plus1D)     (None, 10, 28, 28,   3152        ['input_1[0][0]']                
                                16)                                                               
                                                                                                  
 batch_normalization (BatchNorm  (None, 10, 28, 28,   64         ['conv2_plus1d[0][0]']           
 alization)                     16)                                                           

## Compile and Fit the Model

In [12]:
# Defines the optimizer
customOptimizer = keras.optimizers.Adam(5e-4)

# Defines the loss function
customLoss = keras.losses.SparseCategoricalCrossentropy()

# Defines the callbacks to include
my_callbacks = [
    #tf.keras.callbacks.ModelCheckpoint(filepath = "./MNIST Models 2D/Accuracy/mnist_detection.acc.{epoch:02d}-{val_accuracy:.2f}.h5", monitor = "val_accuracy", mode = "max", save_best_only = True),
    #tf.keras.callbacks.ModelCheckpoint(filepath = "./MNIST Models 2D/Loss/mnist_detection.loss.{epoch:02d}-{val_loss:.2f}.h5", monitor = "val_loss", mode = "min", save_best_only = True),
    tf.keras.callbacks.EarlyStopping  (patience = 200, monitor = "accuracy"),
    #tf.keras.callbacks.TensorBoard    (log_dir  = "./Logs")
]

# Compile the model
model.compile(loss = customLoss, optimizer = customOptimizer, metrics = ["accuracy"])

In [None]:
# Fits the model
history = model.fit(x = train_ds, epochs = 10, validation_data = val_ds, callbacks = my_callbacks, use_multiprocessing = True)

## Evaluate Model

In [13]:
def plot_history(history):
    """
    Plotting training and validation learning curves.
    @param history: model history with all the metric measures
    """
    fig, (ax1, ax2) = plt.subplots(2)

    fig.set_size_inches(18.5, 10.5)

    # Plot loss
    ax1.set_title("Loss")
    ax1.plot(history.history["loss"], label = "train")
    ax1.plot(history.history["val_loss"], label = "test")
    ax1.set_ylabel("Loss")

    # Determine upper bound of y-axis
    max_loss = max(history.history["loss"] + history.history["val_loss"])

    ax1.set_ylim([0, np.ceil(max_loss)])
    ax1.set_xlabel("Epoch")
    ax1.legend(["Train", "Validation"]) 

    # Plot accuracy
    ax2.set_title("Accuracy")
    ax2.plot(history.history["accuracy"],  label = "train")
    ax2.plot(history.history["val_accuracy"], label = "test")
    ax2.set_ylabel("Accuracy")
    ax2.set_ylim([0, 1])
    ax2.set_xlabel("Epoch")
    ax2.legend(["Train", "Validation"])

    plt.show()

    plot_history(history)

In [14]:
def get_actual_predicted_labels(dataset): 
    """
    Creates a list of actual ground truth values and the predictions from the model.
    @param dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.
    @return Ground truth and predicted values for a particular dataset.
    """
    actual = [labels for _, labels in dataset.unbatch()]
    predicted = model.predict(dataset)

    actual = tf.stack(actual, axis=0)
    predicted = np.concatenate(predicted, axis = 0)
    predicted = np.argmax(predicted, axis = 1)

    return actual, predicted

In [15]:
def plot_confusion_matrix(actual, predicted, labels, ds_type):
    """
    Plots the confusion matrix.
    """
    cm = tf.math.confusion_matrix(actual, predicted)
    ax = sns.heatmap(cm, annot = True, fmt = "g")
    sns.set(rc={"figure.figsize":(12, 12)})
    sns.set(font_scale=1.4)
    ax.set_title("Confusion matrix of action recognition for " + ds_type)
    ax.set_xlabel("Predicted Action")
    ax.set_ylabel("Actual Action")
    plt.xticks(rotation=90)
    plt.yticks(rotation=0)
    ax.xaxis.set_ticklabels(labels)
    ax.yaxis.set_ticklabels(labels)

In [16]:
def calculate_classification_metrics(y_actual, y_pred, labels):
    """
    Calculate the precision and recall of a classification model using the ground truth and predicted values.
    @param y_actual: Ground truth labels.
    @param y_pred: Predicted labels.
    @param labels: List of classification labels.
    @return Precision and recall measures.
    """
    cm = tf.math.confusion_matrix(y_actual, y_pred)
    tp = np.diag(cm) # Diagonal represents true positives
    precision = dict()
    recall = dict()
    for i in range(len(labels)):
        col = cm[:, i]
        fp = np.sum(col) - tp[i] # Sum of column minus true positive is false negative

        row = cm[i, :]
        fn = np.sum(row) - tp[i] # Sum of row minus true positive, is false negative

        precision[labels[i]] = tp[i] / (tp[i] + fp) # Precision 

        recall[labels[i]] = tp[i] / (tp[i] + fn) # Recall

    return precision, recall

In [None]:
# Evaluate the model
model.evaluate(test_ds, return_dict=True)

In [None]:
# Prints the results of the training
actual, predicted = get_actual_predicted_labels(train_ds)
plot_confusion_matrix(actual, predicted, labels, "training")

In [None]:
# Prints the results of the testing
actual, predicted = get_actual_predicted_labels(test_ds)
plot_confusion_matrix(actual, predicted, labels, "test")

In [None]:
# Calculates precission and recall
precision, recall = calculate_classification_metrics(actual, predicted, labels) # Test dataset

In [None]:
precision

In [None]:
recall