## Import Modules

In [None]:
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.linalg import svd

from abc import ABC
from copy import deepcopy
import math
import random
import sys
import time

## Configuration

Change these variables to change how the notebook is run.

- `run_best_only`
    - Set to `True` to run the best classifier only, and save the test set predictions to the Output folder. This setting should be used for benchmarking the time taken to run the notebook, 
    - Set to `False` to run all classifiers, and output statistics. This is the setting that was used to collect statistics for our report. This setting will not run any classifiers on the provided test set.

In [None]:
run_best_only = True

## Running the Notebook

The notebook should be placed in the following directory structure:

- Algorithm
    - notebook (this file)
    - Input
        - images_training.h5
        - labels_training.h5
        - images_testing.h5
    - Output
        - predicted_labels.h5
        
`images_training.h5` is the set of 30000 28x28 images used for training the models.

`labels_training.h5` is the corresponding set of 30000 labels used for training the models.

`images_testing.h5` is the test set of 10000 labels used for testing the models.

`predicted_labels.h5` is a file generated by the best classifier in this notebook, when the `run_best_only` flag is set to `True`. It contains the predicted outputs of the 10000 test images.

## Data loading
Load training and testing data from the four provided files.

In [None]:
def load_data():
    with h5py.File('./Input/images_training.h5', 'r') as h:
        train_data = np.copy(h['datatrain'])

    with h5py.File('./Input/labels_training.h5', 'r') as h:
        train_labels = np.copy(h['labeltrain'])

    with h5py.File('./Input/images_testing.h5', 'r') as h:
        test_data = np.copy(h['datatest'])

    return train_data, train_labels, test_data

## Preprocessing
### Stratified folds

In [None]:
def get_stratified_folds(x: np.ndarray, y: np.ndarray, n: int=10):
    y = y[:, np.newaxis]
    data = np.concatenate((x, y), axis=1)
    classes = np.unique(y)
    class_split = []
    folds = [[] for i in range(n)]

    leftover = []

    for c in classes:
        # Split dataset by class and get n folds for each class
        c_data = data[data[:, -1] == c]
        split_size = int(c_data.shape[0] / n)
        split_idxs = list(range(split_size, c_data.shape[0], split_size))
        c_data_split = np.array_split(c_data, split_idxs)

        # Take out any samples that are leftover.
        # These will be added in at the end to ensure
        # proper stratification of folds.
        remainder  = c_data.shape[0] % n
        if remainder != 0:
            leftover_split = c_data_split.pop()
            leftover.append(leftover_split)

        # Merge n folds for each class into n folds containing all classes
        class_split.append(c_data_split)
        fold_idx = 0
        for split in c_data_split:
            folds[fold_idx].append(split)
            fold_idx += 1

    # Turn folds to numpy arrays
    folds_np = []
    for fold in folds:
        fold_np = np.concatenate(fold)
        folds_np.append(fold_np)

    # Distribute leftovers across folds.
    leftover = np.concatenate(leftover)
    for i in range(leftover.shape[0]):
        fold_idx = i % n
        leftover_split = leftover[i][np.newaxis, :]
        folds_np[fold_idx] = np.concatenate([folds_np[fold_idx], leftover_split])

    # Prove folds are stratified.
    # for fold in folds_np:
    #     print(fold.shape)
    #     unique, counts = np.unique(fold[:, -1], return_counts=True)
    #     print(tuple(zip(unique, counts)))
    #     print()

    folds_np = np.stack(folds_np)
    return folds_np[:, :, :-1], folds_np[:, :, -1]

### Data shuffling

In [None]:
def shuffle_data(x: np.ndarray, y:  np.ndarray):
    y = y[:, np.newaxis]
    data = np.concatenate((x, y), axis=1)
    np.random.shuffle(data)
    return data[:,:-1], data[:,data.shape[1]-1:].squeeze()

### Multiclass to binary class conversion

In [None]:
def binary_partition_by_class(x: np.ndarray, y: np.ndarray, partition_map: set):
    classes = np.unique(y)
    for c in partition_map:
        if c not in classes:
            raise ValueError('Value {} in parition_map was not found in y.'.format(c))

    partition = np.copy(y)
    for c in partition_map:
        partition = (partition==c).astype('uint8')

    return x, partition

### Principal Component Analysis

In [None]:
# Returns projected vector, explained variance for all components
# PCA based on SVD in order to save computational resources and have more accurate results#
# Reference (with links to papers) https://stats.stackexchange.com/questions/79043/why-pca-of-data-by-means-of-svd-of-the-data
def PCA(A: np.ndarray, n_components=6):
    E = np.mean(A, axis=0)
    C = A - E
    u, s, v = svd(C)

    explained_var = np.cumsum(s) / np.sum(s)
    proj_A = np.dot(C,v[:,range(0,n_components)])

    return proj_A, explained_var

## Helper Functions
Define helper functions and classes to be used by the different classifiers. These include:

- **Distance functions**
    - Manhattan distance
    - Euclidean distance
    - Squared Euclidean distance
    - Minkowski distance

- **Activation functions**
    - Sigmoid
    - tanh
    - Relu
    - Softplus
    
- **Error functions**
    - Squared error
    
- **Metrics**
    - Accuracy
    - Confusion matrix
    - Precision
    - Recall
    - F1

### Distance Functions

In [None]:
def manhattan(x: np.ndarray, y: np.ndarray, axis: int):
    return np.sum(np.abs(x - y), axis=axis)

def euclidean(x: np.ndarray, y: np.ndarray, axis: int):
    return minkowski(x, y, 2, axis=axis)

def euclidean_squared(x: np.ndarray, y: np.ndarray, axis: int):
    return np.sum(np.abs(x - y)**2, axis=axis)

def minkowski(x: np.ndarray, y: np.ndarray, pow: int, axis: int):
    return np.sum(np.abs(x - y)**pow, axis=axis)**(1/pow)

### Activation Functions

In [None]:
class ActivationFunction:

    def __init__(self, func: callable, d_func: callable):
        self.func = func
        self.d_func = d_func

def f_sigmoid(x: np.array):
    return 1 / (1 + np.exp(-x))

def df_sigmoid(x: np.array):
    sig_x = f_sigmoid(x)
    return sig_x * (1 - sig_x)

def f_tanh(x: np.array):
    e_nx = np.exp(-x)
    e_x = np.exp(x)
    return (e_x - e_nx)/(e_x + e_nx)

def df_tanh(x: np.array):
    tanh_x = f_tanh(x)
    return 1 - tanh_x**2

def f_relu(x: np.array):
    x_copy = np.copy(x)
    x_copy[x_copy<0] = 0
    return x_copy

def df_relu(x: np.array):
    ret = (x > 0).astype(int)
    return ret

def f_softplus(x: np.array):
    e_x = np.exp(x)
    return np.log(1 + e_x)

def df_softplus(x: np.array):
    return f_sigmoid(x)


sigmoid = ActivationFunction(f_sigmoid, df_sigmoid)
tanh = ActivationFunction(f_tanh, df_tanh)
relu = ActivationFunction(f_relu, df_relu)
softplus = ActivationFunction(f_softplus, df_softplus)

### Error Functions

In [None]:
class ErrorFunction:

    def __init__(self, func: callable, d_func: callable):
        self.func = func
        self.d_func = d_func

def f_sse(predicted: np.ndarray, actual: np.ndarray):
    return 0.5*(actual - predicted)**2

def df_sse(predicted: np.ndarray, actual: np.ndarray):
    return actual - predicted

sse = ErrorFunction(f_sse, df_sse)

### Metrics

In [None]:
def accuracy(y_hat, y):
    if (y_hat.shape == y.shape):
        pred = (y_hat == y)
        return len(np.where(pred == True)[0]) / len(y)

def confusion_matrix(y_hat, y):
    if (y_hat.shape == y.shape):
        classes = set(np.unique(y_hat)).intersection(np.unique(y))

        mat = [[None for c in classes] for c_hat in classes]
        c_hat_idx = 0

        for c_hat in classes:
            y_hat_equal_c_hat = y_hat == c_hat
            c_idx = 0
            for c in classes:
                y_equal_c = y == c
                mat_entry = np.sum(y_hat_equal_c_hat * y_equal_c)
                # Actual count of class c is sum of column
                # Num samples predicted as class c_hat is sum of row
                mat[c_hat_idx][c_idx] = mat_entry
                c_idx += 1
            c_hat_idx += 1

        mat = np.asarray(mat)
        return mat, classes

def recall(y_hat, y):
    if (y_hat.shape == y.shape):
        conf_mat, classes = confusion_matrix(y_hat, y)
        return np.diagonal(conf_mat) / np.sum(conf_mat, axis=0)

def precision(y_hat, y):
    if (y_hat.shape == y.shape):
        conf_mat, classes = confusion_matrix(y_hat, y)
        return np.diagonal(conf_mat) / np.sum(conf_mat, axis=1)

def f1_score(y_hat, y):
    if (y_hat.shape == y.shape):
        r = recall(y_hat, y)
        p = precision(y_hat, y)
        return (2 * p * r) / (p + r)

## Models

Six different models were implemented:
 
- Linear SVM (Applied to this dataset using the one-versus-rest approach)
- Binary logistic regression (Applied to this dataset using the one-versus-rest approach)
- Multinomial logistic regression
- Naive Bayes
- K-Nearest neighbour
- Neural Network

#### Classifier interface
Each model conforms to a `Classifier` interface, which has a `train` and a `predict` method.

In [None]:
class Classifier(ABC):

    def __init__(self):
        return

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """
        return

    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """
        return

#### One Versus Rest
Define a **one-versus-rest ensemble** that takes $n$ binary classifiers, where $n$ is the number of classes in the dataset (10 for the MNIST-Fashion dataset).

The $k$th binary classifier is trained on a modified dataset, where class $c_k$ is relabelled as 0 and all other classes are relabelled as 1.

Once all classifiers have been trained, new samples are classified by calling the `predict` method on each of the $n$ classifiers, where the $k$th classifier returns a probability that the data sample belongs to class $c_k$. The largest of these probabilities becomes the predicted class for the sample.

In [None]:
class OneVersusRest(Classifier):

    def __init__(self, binary_classifier: Classifier, output=True):
        self.classifier = binary_classifier
        self.classifiers = []
        self.output = output

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """
        probs = []
        for c in self.classifiers:
            c_classes, c_prob = c.predict(x)
            probs.append(c_prob)

        probs = np.asarray(probs)
        predictions = np.argmax(probs, axis=0)
        return predictions, probs

    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """
        self.classifiers = []

        classes = np.unique(y)
        c_idx = 0
        for c in classes:
            c_idx += 1
            if self.output:
                print('Training binary classifier {} of {}'.format(c_idx, len(classes)))
            classifier_copy = deepcopy(self.classifier)
            data, labels = binary_partition_by_class(x, y, {c,})
            classifier_copy.train(data, labels)
            self.classifiers.append(classifier_copy)


### Linear Support Vector Machine

In [None]:
class LinearSVM(Classifier):

    # 'alpha' - learning rate of the classifier
    # 'features' - features wanted from feature selection via PCA
    def __init__(self, alpha = 0.01, features = 180):
        self.alpha = alpha
        self.features = features
        return

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """

        # Select only a subset of features
        print('Performing PCA on test data. This may take a minute or two...')
        new_x, exp_var = PCA(x, self.features)
        w = self.weights[0:len(x),:]
        
        # Predict classes using trained weights
        predicted_y = 0
        for i in range(self.features):
                cur_w = w[:,i].reshape(len(x), 1)
                cur_x = new_x[:,i].reshape(len(x), 1)
                y_pred += cur_w + cur_x

        # Enter predicted classes
        predictions = []
        for i in predicted_y:
            if i > 1:
                predictions.append(1)
            else:
                predictions.append(-1)

        return predictions

    def train(self, x: np.ndarray, y: np.ndarray, epochs=1):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """

        # Select only a subset of features
        print('Performing PCA on training data. This may take a minute or two...')
        new_x, exp_var = PCA(x, self.features)

        new_y = y.reshape(len(x), 1)
        w = np.zeros((len(x), self.features))

        for e in range(1, epochs+1):
            print('Performing epoch {}...'.format(e))

            # Calculate predicted y from current weight
            cur_y = 0
            for i in range(self.features):
                cur_w = w[:,i].reshape(len(x), 1)
                cur_x = new_x[:,i].reshape(len(x), 1)
                cur_y += cur_w + cur_x

            product = cur_y * new_y

            # Adjust weights
            for i, v in enumerate(product):
                
                # When correctly classified
                if v >= 1:
                    for j in range(self.features):
                        cur_w = w[:,j].reshape(len(x), 1)
                        cur_x = new_x[:,j].reshape(len(x), 1)
                        cur_w = cur_w - self.alpha * (2 * 1 / e * cur_w)
                        w[:,j] = np.squeeze(cur_w)

                # When incorrectly classified
                else:
                    for j in range(self.features):
                        cur_w = w[:,j].reshape(len(x), 1)
                        cur_x = new_x[:,j].reshape(len(x), 1)
                        cur_w = cur_w + self.alpha * (cur_w[i] * new_y[i] - 2 * 1 / e * cur_w)
                        w[:,j] = np.squeeze(cur_w)

        self.weights = w
        print('Successfully trained weights for Linear SVM.')
        return

### Binary Logistic Regression

In [None]:
class LogisticRegression(Classifier):

    def __init__(self, input_size: int, eta=0.01, activation=sigmoid, batch_size=64, epochs=25, output=True):
        # Validate input_size
        if type(input_size) != int:
            raise TypeError('Invalid type for param input_size, expected {}'.format(int))
        if input_size <= 0:
            raise ValueError('Parameter input_size must be positive')

        # Validate eta
        if type(eta) != float:
            raise TypeError('Invalid type for param eta, expected {}'.format(float))
        if eta <= 0:
            raise ValueError('Parameter eta must be positive')

        # Ensure batch_size is > 0
        if type(batch_size) != int:
            raise TypeError('Invalid type for param batch_size, expected {}'.format(int))
        if batch_size <= 0:
            raise ValueError('Parameter batch_size must be positive')

        # Ensure epochs is > 0
        if type(epochs) != int:
            raise TypeError('Invalid type for param epochs, expected {}'.format(int))
        if epochs <= 0:
            raise ValueError('Parameter epochs must be positive')

        self.input_size = input_size
        self.eta = eta
        self.activation = activation
        self.batch_size = batch_size
        self.epochs = epochs

        self.weights = None
        self.output = output
        self.randomise()

    def randomise(self):
        """
        Randomly initialise the layer's weights and biases.
        """
        self.weights = np.random.standard_normal((self.input_size + 1,))

    def adjust_weights(self, dw: np.ndarray):
        
        if dw.shape != self.weights.shape:
            raise ValueError('Expected param dw of shape {} but got {}'.format(self.weights.shape, dw.shape))

        self.weights += self.eta * dw

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """
        ones = np.ones((x.shape[0], 1))
        x_tilde = np.hstack((x, ones))
        probability = self.activation.func(x_tilde @ self.weights[:, np.newaxis]).squeeze()
        return np.round(probability).astype('uint8'), probability

    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """

        classes = np.unique(y)
        if len(classes) != 2:
            raise ValueError('Expected 2 unique classes in param y but got {}.'.format(len(classes)))

        y_idxs = y
        y_idxs[y_idxs==classes[0]] = 0
        y_idxs[y_idxs==classes[1]] = 1
        
        if self.output:
            print('Training started')
            
        epoch_accs = [0,]
        epoch_times = []
        
        epoch_idx = 0
        for e in range(self.epochs):

            data, lbl = shuffle_data(x, y_idxs)
            epoch_correct = 0
            epoch_start = time.perf_counter()

            data_split = np.split(data, range(self.batch_size, len(x), self.batch_size))
            lbl_split = np.split(lbl, range(self.batch_size, len(x), self.batch_size))

            batch_idx = 0
            while batch_idx < len(data_split):

                batch = data_split[batch_idx]
                batch_lbls = lbl_split[batch_idx]

                batch_pred_lbls, batch_pred_probs = self.predict(batch)

                batch_correct = (batch_pred_lbls == batch_lbls).sum()

                err = batch_pred_probs - batch_lbls

                ones = np.ones((batch.shape[0], 1))
                batch_tilde = np.hstack((batch, ones))
                dw = -np.sum(batch_tilde.T * err, axis=1)
                self.adjust_weights(dw)

                epoch_correct += batch_correct
                batch_idx += 1

            epoch_idx += 1
            epoch_end = time.perf_counter()

            epoch_acc = epoch_correct/x.shape[0]
            epoch_time = epoch_end - epoch_start
            
            epoch_accs.append(epoch_acc)
            epoch_times.append(epoch_time)

            if self.output:
                print()
                print('Epoch {} complete in {:.3f}s'.format(e+1, epoch_time))
                print('Accuracy: {:.2f}%'.format(epoch_acc * 100))
        
        # Return epoch accuracies and epoch time
        return epoch_accs, epoch_times

### Multinomial Logistic Regression

In [None]:
class MultinomialLogisticRegression(Classifier):

    def __init__(self, input_size: int, n_classes: int, eta=0.01, batch_size=64, epochs=25, output=True):
        # Validate input_size
        if type(input_size) != int:
            raise TypeError('Invalid type for param input_size, expected {}'.format(int))
        if input_size <= 0:
            raise ValueError('Parameter input_size must be positive')

        # Validate n_classes
        if type(n_classes) != int:
            raise TypeError('Invalid type for param n_classes, expected {}'.format(int))
        if n_classes <= 0:
            raise ValueError('Parameter n_classes must be positive')

        # Validate eta
        if type(eta) != float:
            raise TypeError('Invalid type for param eta, expected {}'.format(float))
        if eta <= 0:
            raise ValueError('Parameter eta must be positive')

        # Ensure batch_size is > 0
        if type(batch_size) != int:
            raise TypeError('Invalid type for param batch_size, expected {}'.format(int))
        if batch_size <= 0:
            raise ValueError('Parameter batch_size must be positive')

        # Ensure epochs is > 0
        if type(epochs) != int:
            raise TypeError('Invalid type for param epochs, expected {}'.format(int))
        if epochs <= 0:
            raise ValueError('Parameter epochs must be positive')

        self.input_size = input_size
        self.n_classes = n_classes
        self.eta = eta
        self.batch_size = batch_size
        self.epochs = epochs
        self.output = output
        self.weights = None
        self.randomise()

    def randomise(self):
        """
        Randomly initialise the layer's weights and biases.
        """
        self.weights = np.random.standard_normal((self.input_size + 1, self.n_classes))

    def adjust_weights(self, dw: np.ndarray):
        
        if dw.shape != self.weights.shape:
            raise ValueError('Expected param dw of shape {} but got {}'.format(self.weights.shape, dw.shape))

        self.weights += self.eta * dw

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """
        ones = np.ones((x.shape[0], 1))
        x_tilde = np.hstack((x, ones))
        probabilities = x_tilde @ self.weights
        #print(probabilities)
        return np.argmax(probabilities, axis=1), probabilities

    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """

        classes = np.unique(y)
        if len(classes) != self.n_classes:
            raise ValueError('Expected {} unique classes in param y but got {}.'.format(self.n_classes, len(classes)))

        if self.output:
            print('Training started')
        epoch_accs = [0,]
        epoch_times = []
        
        epoch_idx = 0
        for e in range(self.epochs):

            data, lbl = shuffle_data(x, y)
            epoch_correct = 0
            epoch_start = time.perf_counter()

            data_split = np.split(data, range(self.batch_size, len(x), self.batch_size))
            lbl_split = np.split(lbl, range(self.batch_size, len(x), self.batch_size))

            batch_idx = 0
            while batch_idx < len(data_split):

                batch = data_split[batch_idx]
                batch_lbls = lbl_split[batch_idx]

                batch_pred_lbls, batch_pred_probs = self.predict(batch)

                batch_correct = (batch_pred_lbls == batch_lbls).sum()
                batch_actual = np.zeros(batch_pred_probs.shape)
                for i in range(batch_actual.shape[1]):
                    batch_actual[batch_lbls.T.squeeze() == i, i] = 1

                # Compute weight changes for weights for each class
                # https://math.stackexchange.com/questions/1428344/what-is-the-derivation-of-the-derivative-of-softmax-regression-or-multinomial-l/2081449
                e_pred = np.exp(batch_pred_probs)
                d_logs = batch_actual - e_pred/np.sum(e_pred, axis=1)[:, np.newaxis]
                ones = np.ones((batch.shape[0], 1))
                batch_tilde = np.hstack((batch, ones))
                dw = (batch_tilde.T @ d_logs)

                self.adjust_weights(dw)

                epoch_correct += batch_correct
                batch_idx += 1

            epoch_idx += 1
            epoch_end = time.perf_counter()
            
            epoch_acc = epoch_correct/x.shape[0]
            epoch_time = epoch_end - epoch_start
            
            epoch_accs.append(epoch_acc)
            epoch_times.append(epoch_time)

            if self.output:
                print()
                print('Epoch {} complete in {:.3f}s'.format(e+1, epoch_time))
                print('Accuracy: {:.2f}%'.format(epoch_acc * 100))
        
        # Return epoch accuracies and epoch time
        return epoch_accs, epoch_times

### Naive Bayes

In [None]:
class MultinomialNaiveBayes(Classifier):

    def __init__(self, alpha=1.0):
        # Additive (Laplace/Lidstone) smoothing parameter,
        # used to avoid zero theta_ck probabilities. Works
        # better for larger training sets.
        self.alpha = alpha

    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Compute prior (pi_k) probability for the classes in y,
        and posterior (theta_ck) probabilities for attributes given classes.

        These are used to predict the class of new samples.
        """
        # Separate into classes
        self.classes = np.unique(y)        
        data_by_class = [[data for data, lbl in zip(x, y) if lbl == c] for c in self.classes]
        
        # Compute prior probabilities
        data_count = x.shape[0]
        self.pi_c = np.array([len(class_data) / data_count for class_data in data_by_class])
        
        # Compute posterior probabilities
        attrib_counts_by_class = np.array([np.sum(class_data, axis=0) for class_data in data_by_class]) + self.alpha
        total_counts_by_class = np.sum(attrib_counts_by_class, axis=1)
        self.theta_ck = attrib_counts_by_class / total_counts_by_class[:, np.newaxis]

    def get_log_likelihood(self, x):
        """
        Get the log-likelihood for a new set of data x.
        """

        log_pi_c = np.log(self.pi_c)
        log_theta_ck = np.log(self.theta_ck)

        # Sum up x_k theta_k for each class for each sample,
        # then add log_pi_c for each sample.
        return (log_theta_ck @ x.T + log_pi_c[:, np.newaxis]).T

    def predict(self, x: np.ndarray) -> np.ndarray:
        return np.argmax(self.get_log_likelihood(x), axis=1)

### K-Nearest Neighbour

In [None]:
class NearestNeighbour(Classifier):

    def __init__(self, k: int, dist: callable=euclidean_squared):
        if k <= 0:
            raise ValueError('Parameter k must be greater than 0.')

        self.k = k
        self.data = None
        self.classes = None
        self.dist = dist

    def clear(self):
        """
        Reset all classifier parameters to their defaults.
        """
        self.data = None
        self.classes = None

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """
        if x.shape[1] != self.data.shape[1]:
            raise ValueError('Invalid dimension of param x, expected {} but got {}'.format(self.data.shape[1], x.shape[1]))
        
        start = time.perf_counter()
        predictions = np.apply_along_axis(self.__predict_single, 1, x)
        end = time.perf_counter()
        print('Predicted {} samples in {}s'.format(x.shape[0], end - start))
        return predictions

    def __predict_single(self, x: np.ndarray) -> np.ndarray:
        dst = self.dist(self.data, x, axis=1)
        k_smallest = np.argpartition(dst, self.k)[:self.k]
        k_classes = self.classes[k_smallest]
        return np.argmax(np.bincount(k_classes))


    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        K-Nearest-Neighbour is a lazy learning algorithm, so we simply store
        the data for prediction later.
        """
        self.data = x
        self.classes = y.astype('uint8')


### Neural Network

#### Neural Netowork Layer Interface

Define an interface for Neural Network layers. This was originally designed with the intention of implementing dropout, pooling and Relu layers for a convolutional neural network, but time did not allow for this.

In [None]:
class NeuralNetworkLayer(ABC):

    def __init__(self, output_shape: tuple, activation=sigmoid):
        # Ensure shapes are valid
        NeuralNetworkLayer.__validate_shape(output_shape)
        self.output_shape = output_shape
        self.input_shape = None
        self.activation = activation

    def set_input_shape(self, shape: tuple):
        NeuralNetworkLayer.__validate_shape(shape)
        self.input_shape = shape

    @staticmethod
    def __validate_shape(shape: tuple):
        for val in shape:
            if not isinstance(val, int):
                raise TypeError('Invalid shape, expected a tuple of ints')
            if val <= 0:
                raise ValueError('Invalid value in shape, expected > 0.')

#### Flat Dense Layer

A typical fully connected layer for use in neural networks.

In [None]:
class FlatDenseLayer(NeuralNetworkLayer):

    def __init__(self, output_shape: tuple, activation=sigmoid):
        super().__init__(output_shape, activation)
        self.raw_outputs = None
        self.outputs = None

    @classmethod
    def from_size(cls, size: int):
        return cls((size,))

    def randomise(self):
        """
        Randomly initialise the layer's weights and biases.
        """
        self.biases = None
        self.weights = None

        if self.input_shape is not None:
            self.biases = np.random.standard_normal((self.output_shape[0], 1))
            self.weights = np.random.standard_normal((self.output_shape[0], self.input_shape[0]))

    @staticmethod
    def __get_valid_batch(array: np.ndarray, shape: tuple):
        """
        Given a specified shape (s0, s1, ..., sk), verify that the array
        is of shape (s0, s1, ..., sk, n).
        """
        if len(array.shape) > len(shape) + 1:
            raise ValueError('Too many dimensions in param array.')

        if len(array.shape) < len(shape):
            raise ValueError('Not enough dimensions in param array.')
        
        # Check if array has shape (s0, s1, ..., sk)
        # If so, cast to shape (s0, s1, ..., sk, 1)
        if len(array.shape) == len(shape):
            array = array[:, np.newaxis]

        if array.shape[:-1] != shape:
            raise ValueError('Invalid shape of param array, expected {} but got {}'.format(shape, array.shape))
        
        return array


    def get_activations(self, ipt: np.ndarray):
        """
        Get the activations/outputs of the current layer, given the input array.
        """

        # No input_shape implies first layer, just return the input as the output
        if self.input_shape is None:
            ipt = FlatDenseLayer.__get_valid_batch(ipt, self.output_shape)
            
            self.outputs = ipt
            self.raw_outputs = ipt

            return ipt

        ipt = FlatDenseLayer.__get_valid_batch(ipt, self.input_shape)

        raw_out = self.weights @ ipt + self.biases

        self.raw_outputs = raw_out
        self.outputs = self.activation.func(raw_out)

        return self.outputs

    def adjust_weights(self, eta: float, dw: np.ndarray, db: np.ndarray):
        
        if dw.shape != self.weights.shape:
            raise ValueError('Expected param dw of shape {} but got {}'.format(self.weights.shape, dw.shape))
        
        if db.shape != self.biases.shape:
            raise ValueError('Expected param db of shape {} but got {}'.format(self.biases.shape, db.shape))

        self.weights += eta * dw
        self.biases += eta * db

#### Neural Network Class

Defines a neural network, with customisable layer structure, learning rate, activation functions, batch sizes and epoch count.

The network is trained using mini-batch gradient descent.

In [None]:
class NeuralNetwork(Classifier):

    def __init__(self, layers: list, eta=0.01, batch_size=64, epochs=25, output=True):
        # Ensure layers is a ist of NeuralNetworkLayer objects
        for l in layers:
            if not isinstance(l, NeuralNetworkLayer):
                raise TypeError('Invalid type in list {}, expected {}'.format(layers, NeuralNetworkLayer))
        
        # Validate eta
        if type(eta) != float:
            raise TypeError('Invalid type for param eta, expected {}'.format(float))
        if eta <= 0:
            raise ValueError('Parameter eta must be positive')
        
        # Ensure batch_size is > 0
        if type(batch_size) != int:
            raise TypeError('Invalid type for param batch_size, expected {}'.format(int))
        if batch_size <= 0:
            raise ValueError('Parameter batch_size must be positive')

        # Ensure epochs is > 0
        if type(epochs) != int:
            raise TypeError('Invalid type for param epochs, expected {}'.format(int))
        if epochs <= 0:
            raise ValueError('Parameter epochs must be positive')
        
        self.layers = layers
        self.eta = eta
        self.batch_size = batch_size
        self.epochs = epochs
        self.output = output

        # Link layers
        i = 1
        while i < len(self.layers):
            self.layers[i].set_input_shape(self.layers[i-1].output_shape)
            i += 1
        
        # Randomise weights
        self.__randomise()

    def __randomise(self):
        """
        Randomly initialise the network's weights and biases.
        """
        for layer in self.layers:
            layer.randomise()

    def clear(self):
        """
        Reset all classifier parameters to their defaults.
        """
        return

    def predict(self, x: np.ndarray) -> np.ndarray:
        """
        Predict the class of each row of data in x.
        """

        activation = x.T
        for layer in self.layers:
            activation = layer.get_activations(activation)
        return np.argmax(activation, axis=0), activation


    def train(self, x: np.ndarray, y: np.ndarray):
        """
        Train the classifier on a dataset x and corresponding labels y.
        """
        if self.output:
            print('Training started')
            
        epoch_accs = [0,]
        epoch_times = []
        
        epoch_idx = 0
        for e in range(self.epochs):

            data, lbl = shuffle_data(x, y)
            
            
            epoch_correct = 0
            epoch_start = time.perf_counter()

            data_split = np.split(data, range(self.batch_size, len(x), self.batch_size))
            lbl_split = np.split(lbl, range(self.batch_size, len(x), self.batch_size))

            batch_idx = 0
            while batch_idx < len(data_split):

                batch = data_split[batch_idx]
                batch_lbls = lbl_split[batch_idx]
                
                batch_pred_lbls, batch_pred = self.predict(batch)
                batch_correct = (batch_pred_lbls == batch_lbls.squeeze()).sum()

                batch_actual = np.zeros(batch_pred.shape)
                for i in range(batch_actual.shape[0]):
                    batch_actual[i, batch_lbls.T.squeeze() == i] = 1
                
                dC_da = sse.d_func(batch_pred, batch_actual)

                # Iterate through layers, propagating errors
                layer_idx = len(self.layers) - 1
                while layer_idx > 0:

                    layer = self.layers[layer_idx]
                    prev_layer = self.layers[layer_idx-1]

                    # Compute weight changes using chain rule
                    d_sigma = layer.activation.d_func(layer.raw_outputs)
                    dC_dz = dC_da * d_sigma  # da_dz = d_sigma(z) since a = sigma(z)

                    # Sum over batch to get dw for each weight.
                    # dz_dw = x = prev_outputs.
                    dw = dC_dz @ prev_layer.outputs.T

                    # Compute bias changes
                    # Sum over batch to get dw for each bias.
                    # dz_dw = 1. 
                    db = dC_dz.sum(axis=1)
                    db = db[:, np.newaxis]

                    layer.adjust_weights(self.eta, dw, db)
                    
                    # Compute error to propagate
                    dC_da = np.dot(layer.weights.T, dC_dz)

                    layer_idx -= 1

                epoch_correct += batch_correct
                batch_idx += 1

            epoch_idx += 1
            epoch_end = time.perf_counter()
            
            epoch_acc = epoch_correct/x.shape[0]
            epoch_time = epoch_end - epoch_start
            
            epoch_accs.append(epoch_acc)
            epoch_times.append(epoch_time)

            if self.output:
                print()
                print('Epoch {} complete in {:.3f}s'.format(e+1, epoch_time))
                print('Accuracy: {:.2f}%'.format(epoch_acc * 100))
        
        # Return epoch accuracies and epoch time
        return epoch_accs, epoch_times

## Training and Evaluation

### Load data and generate folds

In [None]:
# Load training and testing data
train_data, train_labels, test_data = load_data()
print('Training set data:')
print(train_data.shape)
print('Training set labels:')
print(train_labels.shape)
print('Training set labels:')
print(test_data.shape)
print()

# Generate stratified folds
n_folds = 10
fold_data, fold_labels = get_stratified_folds(train_data, train_labels, n=n_folds)

folds = []

for i in range(n_folds):
    fold_train_data = np.concatenate([fold_data[:i], fold_data[i + 1:]])
    fold_train_data = np.concatenate(fold_train_data)

    fold_train_labels = np.concatenate([fold_labels[:i], fold_labels[i + 1:]])
    fold_train_labels = np.concatenate(fold_train_labels)

    fold_validation_data = fold_data[i]
    fold_validation_labels = fold_labels[i]

    folds.append((fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels))

print('{} folds generated successfully.'.format(n_folds))

### Model evaluator
Define a function for evaluating a model and gathering relevant statistics.

In [None]:
def evaluate_model(model, train_data, train_labels, validation_data, validation_labels):
    print(f'{model.__class__.__name__}:')
    
    start_train = time.perf_counter()
    train_result = model.train(train_data, train_labels)
    end_train = time.perf_counter()
    train_time = end_train - start_train
    
    if train_result is not None:
        # Graph epoch accuracies and times
        epoch_accs, epoch_times = train_result
        plot_epoch(epoch_accs, epoch_times)
    
    start_predict = time.perf_counter()
    result = model.predict(validation_data)
    end_predict = time.perf_counter()
    predict_time = end_predict - start_predict

    # Some predict methods return probabilities as well as classes,
    # so just take the classes if that is the case.
    if isinstance(result, tuple):
        validation_pred = result[0]
    else:
        validation_pred = result
    
    # Print metrics
    acc = accuracy(validation_pred, validation_labels)
    prec = precision(validation_pred, validation_labels)
    rcl = recall(validation_pred, validation_labels)
    f1 = f1_score(validation_pred, validation_labels)
    
    print('\nTraining time: {:.04f}s'.format(train_time))
    print('\nPrediction time: {:.04f}s'.format(predict_time))
    print('\nValidation Set Accuracy: {:.02f}%'.format(100*acc))
    mat, classes = confusion_matrix(validation_pred, validation_labels)
    print('\nPrecision:\n{}\n'.format(np.round(prec, 4)))
    print('Recall:\n{}\n'.format(np.round(rcl, 4)))
    print('F1:\n{}\n'.format(np.round(f1, 4)))
    print('Confusion Matrix:')
    print(mat)
    print()
    
    return train_time, predict_time, acc, mat, prec, rcl, f1

### Plotting Tools

In [None]:
def plot_epoch(epoch_accs, epoch_times):
    epoch_idxs = np.arange(0, len(epoch_accs), 1)
    plt.figure(figsize = (20,20))
    plt.rc('font', size=24)
    
    plt.subplot(2, 1, 1)
    plt.plot(epoch_idxs, epoch_accs, 'o-')
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.title('Epoch accuracy on training data')
    plt.ylabel('Epoch accuracy')
    plt.xlabel('Epoch')
    plt.grid()

    plt.subplot(2, 1, 2)
    epoch_times.insert(0, 0)
    plt.plot(epoch_idxs, np.cumsum(epoch_times), 'o-')
    plt.ylim(bottom=0)
    plt.title('Cumulative training time across epochs')
    plt.ylabel('Cumulative training time (s)')
    plt.xlabel('Epoch')
    plt.grid()
    
    plt.show()

### Best classifier

In [None]:
def save_predictions(predictions):
    with h5py.File('./Output/predicted_labels.h5') as h:
        if h['output'] is None:
            h.create_dataset('output', data=predictions)
        else:
            data = h['output']
            data[...] = predictions 
    print('Saved {} predictions.\n'.format(predictions.shape[0]))

In [None]:
def validate_best_classifier(train_data, train_labels, validation_data, validation_labels):
    # Create, train and validate model
    model = NeuralNetwork([
        FlatDenseLayer((784,), activation=sigmoid),
        FlatDenseLayer((100,), activation=sigmoid),
        FlatDenseLayer((20,), activation=sigmoid),
        FlatDenseLayer((10,), activation=sigmoid),
    ], eta=0.05, batch_size=64, epochs=250, output=False)
    
    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

def test_best_classifier(train_data, train_labels, test_data):
    # Create and train model
    model = NeuralNetwork([
        FlatDenseLayer((784,), activation=sigmoid),
        FlatDenseLayer((100,), activation=sigmoid),
        FlatDenseLayer((20,), activation=sigmoid),
        FlatDenseLayer((10,), activation=sigmoid),
    ], eta=0.05, batch_size=64, epochs=250, output=False)
    model.train(train_data, train_labels)
    
    # Predict test set samples and save predictions
    print('Predicting {} test samples...'.format(test_data.shape[0]))
    result = model.predict(test_data)
    # Some predict methods return probabilities as well as classes,
    # so just take the classes if that is the case.
    if isinstance(result, tuple):
        test_pred = result[0]
    else:
        test_pred = result

    save_predictions(test_pred)

In [None]:
if not run_best_only:
    print('EVALUATING BEST CLASSIFIER:')
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []

    fold_idx = 0
    for fold in folds:
        fold_idx += 1
        print('FOLD {} of {}'.format(fold_idx, len(folds)))
        fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
        train_time, pred_time, acc, mat, prec, rcl, f1 = validate_best_classifier(fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)

    print()
    print('Mean training time: {:.4f}s'.format(sum(train_time_lst)/len(train_time_lst)))
    print()
    print('Mean prediction time: {:.4f}s'.format(sum(pred_time_lst)/len(pred_time_lst)))
    print()
    print('Mean Accuracy: {:.4f}'.format(sum(acc_lst)/len(acc_lst)))
    print()
    print('All folds Confusion Matrix:')
    print(np.sum(np.stack(mat_lst), axis=0))
    print()
    print('Mean Precision:')
    print(np.mean(np.stack(prec_lst), axis=0))
    print()
    print('Mean Recall:')
    print(np.mean(np.stack(rcl_lst), axis=0))
    print()
    print('Mean F1:')
    print(np.mean(np.stack(f1_lst), axis=0))
    print()

In [None]:
if run_best_only:
    print('TESTING BEST CLASSIFIER:')
    test_best_classifier(train_data, train_labels, test_data)
    
    #Test loading of predicted labels:
    with h5py.File('./Output/predicted_labels.h5', 'r') as h:
        test_labels = np.copy(h['output'])
    print(test_labels.shape)
    print(test_labels)

### All classifiers


### Logistic Regression
##### Investigate Hyperparameter Effects
Investigate the best learning rate for the Logistic regression classifier.

In [None]:
def optimise_and_plot_learning_rate(etas, model, train_data, train_labels, validation_data, validation_labels):
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []

    for eta in etas:
        print('ETA:', eta)

        # Create and train model
        model_copy = deepcopy(model)
        model.eta = eta
        train_time, pred_time, acc, mat, prec, rcl, f1 = evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(np.mean(prec))
        rcl_lst.append(np.mean(rcl))
        f1_lst.append(np.mean(f1))
        
    plt.figure(figsize = (20,20))
    plt.rc('font', size=16)
    plt.subplots_adjust(hspace=0.3)

    plt.subplot(3, 2, 1)
    plt.plot(etas, train_time_lst, 'o-')
    plt.ylim(bottom=0)
    plt.title('Learning Rate vs Training time')
    plt.ylabel('Training time (s)')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.subplot(3, 2, 2)
    plt.plot(etas, pred_time_lst, 'o-')
    plt.ylim(bottom=0)
    plt.title('Learning Rate vs Prediction time')
    plt.ylabel('Prediction time (s)')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.subplot(3, 2, 3)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(etas, acc_lst, 'o-')
    plt.title('Learning Rate vs Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.subplot(3, 2, 4)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(etas, prec_lst, 'o-')
    plt.title('Learning Rate vs Class average precision')
    plt.ylabel('Class average precision')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.subplot(3, 2, 5)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(etas, rcl_lst, 'o-')
    plt.title('Learning Rate vs Class average recall')
    plt.ylabel('Class average recall')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.subplot(3, 2, 6)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(etas, f1_lst, 'o-')
    plt.title('Learning Rate vs Class average F1')
    plt.ylabel('Class average F1')
    plt.xlabel('Learning rate')
    plt.grid()

    plt.show()

In [None]:
if not run_best_only:
    etas = [0.1, 0.05, 0.025, 0.01, 0.005, 0.0025, 0.001, 0.0005, 0.00025, 0.0001]
    epochs = 50
    batch_size = 64
    activation = sigmoid

    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    lr_model = LogisticRegression(train_data.shape[1], eta=eta, activation=activation, batch_size=batch_size, epochs=epochs, output=False)
    model = OneVersusRest(lr_model)
    
    optimise_and_plot_learning_rate(etas, model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)

Investigate how many epochs to run, per classifier. This is determined by working out when the underlying binary classifiers are not improving their accuracy on the training set, and ensuring that their accuracy on the validation set is reasonable.

In [None]:
if not run_best_only:
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    classifier = LogisticRegression(fold_train_data.shape[1], eta=0.001, activation=sigmoid, batch_size=64, epochs=100, output=False)

    classes = np.unique(fold_train_labels)
    c_idx = 0
    for c in classes:
        c_idx += 1
        print('Training classifier {} of {}'.format(c_idx, len(classes)))
        classifier_copy = deepcopy(classifier)
        c_train_data, c_train_labels = binary_partition_by_class(fold_train_data, fold_train_labels, {c,})
        c_validation_data, c_validation_labels = binary_partition_by_class(fold_validation_data, fold_validation_labels, {c,})
        evaluate_model(classifier_copy, c_train_data, c_train_labels, c_validation_data, c_validation_labels)

##### Best Logistic Regression

In [None]:
def best_logistic_regression(train_data, train_labels, validation_data, validation_labels):
    # Create and train model
    lr_model = LogisticRegression(train_data.shape[1], eta=0.001, activation=sigmoid, batch_size=64, epochs=50, output=False)
    model = OneVersusRest(lr_model)
    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

In [None]:
if not run_best_only:
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []

    fold_idx = 0
    for fold in folds:
        fold_idx += 1
        print('FOLD {} of {}'.format(fold_idx, len(folds)))
        fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
        train_time, pred_time, acc, mat, prec, rcl, f1 = best_logistic_regression(fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)

    print()
    print('Mean training time: {:.4f}s'.format(sum(train_time_lst)/len(train_time_lst)))
    print()
    print('Mean prediction time: {:.4f}s'.format(sum(pred_time_lst)/len(pred_time_lst)))
    print()
    print('Mean Accuracy: {:.4f}'.format(sum(acc_lst)/len(acc_lst)))
    print()
    print('All folds Confusion Matrix:')
    print(np.sum(np.stack(mat_lst), axis=0))
    print()
    print('Mean Precision:')
    print(np.mean(np.stack(prec_lst), axis=0))
    print()
    print('Mean Recall:')
    print(np.mean(np.stack(rcl_lst), axis=0))
    print()
    print('Mean F1:')
    print(np.mean(np.stack(f1_lst), axis=0))
    print()

### Multinomial Logistic Regression
##### Investigate Hyperparameter Effects

In [None]:
if not run_best_only:
    etas = [0.1, 0.05, 0.025, 0.01, 0.005, 0.0025, 0.001, 0.0005, 0.00025, 0.0001]
    epochs = 250
    batch_size = 64

    model = MultinomialLogisticRegression(
        train_data.shape[1],
        len(np.unique(train_labels)),
        eta=etas[0],
        epochs=epochs,
        batch_size=batch_size,
        output=False
    )
    
    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    optimise_and_plot_learning_rate(etas, model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)

##### Best Multinomial Logistic Regression

In [None]:
def best_multinomial_logistic_regression(train_data, train_labels, validation_data, validation_labels):
    # Create and train model
    model = MultinomialLogisticRegression(
        train_data.shape[1],
        len(np.unique(train_labels)),
        eta=0.05,
        epochs=250,
        output=False
    )

    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

In [None]:
if not run_best_only:
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []

    fold_idx = 0
    for fold in folds:
        fold_idx += 1
        print('FOLD {} of {}'.format(fold_idx, len(folds)))
        fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
        train_time, pred_time, acc, mat, prec, rcl, f1 = best_multinomial_logistic_regression(fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)

    print()
    print('Mean training time: {:.4f}s'.format(sum(train_time_lst)/len(train_time_lst)))
    print()
    print('Mean prediction time: {:.4f}s'.format(sum(pred_time_lst)/len(pred_time_lst)))
    print()
    print('Mean Accuracy: {:.4f}'.format(sum(acc_lst)/len(acc_lst)))
    print()
    print('All folds Confusion Matrix:')
    print(np.sum(np.stack(mat_lst), axis=0))
    print()
    print('Mean Precision:')
    print(np.mean(np.stack(prec_lst), axis=0))
    print()
    print('Mean Recall:')
    print(np.mean(np.stack(rcl_lst), axis=0))
    print()
    print('Mean F1:')
    print(np.mean(np.stack(f1_lst), axis=0))
    print()

### Naive Bayes
##### Investigate Hyperparameter Effects

There is no need to tune hyperparameters for Naive Bayes, since it is deterministic. The only parameter for Naive  Bayes is the Laplacian smoothing parameter, used to avoid zero posterior probabilities. This should be as small as possible but still nonzero, so as not to skew the posterior probabilities.

##### Best Naive Bayes

In [None]:
def best_naive_bayes(train_data, train_labels, validation_data, validation_labels):
    # Create and train model
    model = MultinomialNaiveBayes()
    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

In [None]:
def evaluate_n_fold(model_func, folds):
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []

    fold_idx = 0
    for fold in folds:
        fold_idx += 1
        print('FOLD {} of {}'.format(fold_idx, len(folds)))
        fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
        train_time, pred_time, acc, mat, prec, rcl, f1 = model_func(fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)

    print()
    print('Mean training time: {:.4f}s'.format(sum(train_time_lst)/len(train_time_lst)))
    print()
    print('Mean prediction time: {:.4f}s'.format(sum(pred_time_lst)/len(pred_time_lst)))
    print()
    print('Mean Accuracy: {:.4f}'.format(sum(acc_lst)/len(acc_lst)))
    print()
    print('All folds Confusion Matrix:')
    print(np.sum(np.stack(mat_lst), axis=0))
    print()
    print('Mean Precision:')
    print(np.mean(np.stack(prec_lst), axis=0))
    print()
    print('Mean Recall:')
    print(np.mean(np.stack(rcl_lst), axis=0))
    print()
    print('Mean F1:')
    print(np.mean(np.stack(f1_lst), axis=0))
    print()

In [None]:
if not run_best_only:
    evaluate_n_fold(best_naive_bayes, folds)

### K-Nearest Neighbour
##### Investigate Hyperparameter Effects

Investigate various distance functions. These were investigated first, as they have less of an impact on accuracy than the value of $k$. It is also important to reduce training time as much as possible.

In [None]:
if not run_best_only:
    dist_funcs = [manhattan, euclidean_squared, euclidean]
    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    for dist in dist_funcs:
        model = NearestNeighbour(k=5, dist=dist)
        evaluate_model(model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)

Investigate different values for $k$. It is also important to reduce training time as much as possible, whilst also obtaining reasonable accuracy on the validation set.

In [None]:
if not run_best_only:
    ks = [1, 2, 3, 5, 8, 13, 21, 34, 55]
    
    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []
    
    for k in ks:
        model = NearestNeighbour(k=k, dist=manhattan)
        train_time, pred_time, acc, mat, prec, rcl, f1 = evaluate_model(model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)
    
 

In [None]:
if not run_best_only:
    cls_map = ['T-shirt/Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
    plt.figure(figsize = (20,20))
    plt.rc('font', size=16)
    plt.subplots_adjust(hspace=0.3)

    plt.subplot(3, 2, 1)
    plt.plot(ks, train_time_lst, 'o-')
    plt.ylim(bottom=0)
    plt.title('k vs Training time')
    plt.ylabel('Training time (s)')
    plt.xlabel('k')
    plt.grid()

    plt.subplot(3, 2, 2)
    plt.plot(ks, pred_time_lst, 'o-')
    plt.ylim(bottom=0)
    plt.title('k vs Prediction time')
    plt.ylabel('Prediction time (s)')
    plt.xlabel('k')
    plt.grid()

    plt.subplot(3, 2, 3)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(ks, acc_lst, 'o-')
    plt.title('k vs Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('k')
    plt.grid()

    plt.subplot(3, 2, 4)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(ks, np.mean(np.stack(prec_lst), axis=1), 'o-')
    plt.title('k vs Mean class precision')
    plt.ylabel('Mean class precision')
    plt.xlabel('k')
    # plt.legend(cls_map)
    plt.grid()

    plt.subplot(3, 2, 5)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(ks, np.mean(np.stack(rcl_lst), axis=1), 'o-')
    plt.title('k vs Mean class recall')
    plt.ylabel('Mean class recall')
    plt.xlabel('k')
    # plt.legend(cls_map)
    plt.grid()

    plt.subplot(3, 2, 6)
    plt.ylim(bottom=0)
    plt.ylim(top=1)
    plt.plot(ks, np.mean(np.stack(f1_lst), axis=1), 'o-')
    plt.title('k vs Mean class F1')
    plt.ylabel('Mean class F1')
    plt.xlabel('k')
    # plt.legend(cls_map)
    plt.grid()

    plt.show()

##### Best K-Nearest Neighbour

In [None]:
def best_knn(train_data, train_labels, validation_data, validation_labels):
    # Create and train model
    model = NearestNeighbour(k=3, dist=manhattan)
    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

In [None]:
if not run_best_only:
    evaluate_n_fold(best_knn, folds)

### Neural Network
##### Investigate Hyperparameter Effects

1. Determine best network structure
2. Determine best learning rate
3. Determine appropriate number of epochs

Only sigmoid activations were used as the model was either unstable for other activations or obviously much too slow.

In [None]:
if not run_best_only:
    epochs = 500
    eta = 0.01
    batch_size = 64
    activation = sigmoid
    
    models = [
        NeuralNetwork([
            FlatDenseLayer((784,), activation=activation),
            FlatDenseLayer((10,), activation=activation),
            FlatDenseLayer((10,), activation=sigmoid),
        ], eta=eta, batch_size=batch_size, epochs=epochs, output=False),
        NeuralNetwork([
            FlatDenseLayer((784,), activation=activation),
            FlatDenseLayer((10,), activation=activation),
            FlatDenseLayer((10,), activation=activation),
            FlatDenseLayer((10,), activation=sigmoid),
        ], eta=eta, batch_size=batch_size, epochs=epochs, output=False),
        NeuralNetwork([
            FlatDenseLayer((784,), activation=activation),
            FlatDenseLayer((100,), activation=activation),
            FlatDenseLayer((10,), activation=activation),
            FlatDenseLayer((10,), activation=sigmoid),
        ], eta=eta, batch_size=batch_size, epochs=epochs, output=False),
        NeuralNetwork([
            FlatDenseLayer((784,), activation=activation),
            FlatDenseLayer((100,), activation=activation),
            FlatDenseLayer((20,), activation=activation),
            FlatDenseLayer((10,), activation=sigmoid),
        ], eta=eta, batch_size=batch_size, epochs=epochs, output=False),
    ]

    train_time_lst = []
    pred_time_lst = []
    acc_lst = []
    mat_lst = []
    prec_lst = []
    rcl_lst = []
    f1_lst = []
    
    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    for model in models:
        train_time, pred_time, acc, mat, prec, rcl, f1 = evaluate_model(model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)
        
        train_time_lst.append(train_time)
        pred_time_lst.append(pred_time)
        acc_lst.append(acc)
        mat_lst.append(mat)
        prec_lst.append(prec)
        rcl_lst.append(rcl)
        f1_lst.append(f1)

NOTE: The graphs for this section did not generate correctly, but this section would take too long to rerun.

In [None]:
if not run_best_only:
    epochs = 500
    batch_size = 64
    activation = sigmoid
    etas = [0.1, 0.05, 0.025, 0.01, 0.005, 0.0025, 0.001, 0.0005, 0.00025, 0.0001]
    
    model = NeuralNetwork([
        FlatDenseLayer((784,), activation=activation),
        FlatDenseLayer((100,), activation=activation),
        FlatDenseLayer((20,), activation=activation),
        FlatDenseLayer((10,), activation=sigmoid),
    ], eta=etas[0], batch_size=batch_size, epochs=epochs, output=False)

    # Only run on the first fold to save time, as we only want an indication of which hyperparam value is best
    fold = folds[0]
    fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels = fold
    
    optimise_and_plot_learning_rate(etas, model, fold_train_data, fold_train_labels, fold_validation_data, fold_validation_labels)

The graphs for the section below are correct.

In [None]:
def best_nn(train_data, train_labels, validation_data, validation_labels):
    # Create and train model
    model = NeuralNetwork([
        FlatDenseLayer((784,), activation=sigmoid),
        FlatDenseLayer((100,), activation=sigmoid),
        FlatDenseLayer((20,), activation=sigmoid),
        FlatDenseLayer((10,), activation=sigmoid),
    ], eta=0.05, batch_size=64, epochs=250, output=False)
    
    return evaluate_model(model, train_data, train_labels, validation_data, validation_labels)

In [None]:
if not run_best_only:
    evaluate_n_fold(best_nn, folds)

## Compare Results of Best Classifiers of each type

In [None]:
model_names = ['Linear Regression', 'Multinomial Linear Regression', 'Naive Bayes', 'K-Nearest Neighbour', 'Neural Network']
mean_train_times = [86.3228, 54.3756, 0.3876, 0.0003, 171.4545]
mean_predict_times = [0.0648, 0.0256, 0.0198, 148.2729, 0.0589]
mean_accuracies = [0.8456, 0.7982, 0.6664, 0.8490, 0.8550]
conf_mats = [
    np.asarray([
        [2392,    7,   39,   95,    9,    1,  443,    0,   10,    2],
        [  16, 2901,    8,   34,    5,    0,    5,    0,    0,    1],
        [  57,   16, 2287,   34,  282,    2,  379,    0,   12,    1],
        [ 166,   74,   35, 2618,  117,    2,  103,    0,   33,    0],
        [  14,    7,  350,   94, 2236,    0,  265,    0,    9,    0],
        [   3,    0,    5,    2,    3, 2758,    7,  126,   24,   54],
        [ 271,   15,  259,   96,  272,    1, 1764,    0,   57,    1],
        [   2,    0,    0,    1,    0,  140,    3, 2781,   19,  119],
        [  38,    1,   28,   10,   22,   34,   66,    9, 2800,    4],
        [   1,    1,    0,    0,    1,   70,    0,  137,    2, 2832],
    ]),
    np.asarray([
        [2120,    8,   57,  111,   14,    2,  389,    0,   30,    0],
        [  27, 2897,   13,   79,   15,    3,   23,    0,    4,    1],
        [  91,   12, 1814,   40,  344,    2,  317,    0,   34,    0],
        [ 214,   74,   71, 2500,  126,    8,  182,    0,   31,    1],
        [  23,   13,  521,  102, 2053,    2,  403,    0,   26,    0],
        [   3,    1,    2,    0,    1, 2682,    0,  131,   22,   56],
        [ 443,   11,  494,  124,  372,    3, 1657,    0,   69,    1],
        [   2,    2,    0,    1,    0,  171,    0, 2745,   17,  201],
        [  35,    3,   39,   26,   21,   46,   64,   16, 2729,    6],
        [   2,    1,    0,    1,    1,   89,    0,  161,    4, 2748]
    ]),
    np.asarray([
        [2343,   31,   15,  106,    3,    1,  648,    0,   15,    0],
        [   4, 2700,    1,   33,    4,    0,    5,    0,    2,    0],
        [  21,   37, 1783,    3,  542,    8,  477,    0,    6,    6],
        [ 348,  207,   35, 2637,  404,    1,  223,    0,  144,    3],
        [  17,    9,  837,   49, 1863,    3, 1098,    0,  211,    1],
        [   0,    0,    0,    0,    0,  467,    0,   59,    6,  103],
        [ 178,   36,  268,  152,  101,   23,  465,    0,  131,   40],
        [   0,    0,    0,    0,    0, 1577,    0, 2795,   30,  339],
        [  49,    2,   72,    4,   30,   23,  119,    3, 2420,    4],
        [   0,    0,    0,    0,    0,  905,    0,  196,    1, 2518]
    ]),
    np.asarray([
        [2534,   18,   67,  155,   25,    0,  565,    0,   23,    1],
        [   5, 2914,    4,   26,    3,    0,    8,    0,    1,    1],
        [  45,   16, 2387,   31,  458,    0,  422,    0,   46,    0],
        [  64,   59,   18, 2587,   92,    3,   56,    0,   12,    0],
        [  11,    3,  248,  116, 2089,    0,  188,    0,   23,    0],
        [   0,    0,    0,    0,    0, 2651,    1,   31,   11,   10],
        [ 285,   11,  282,   66,  273,    0, 1773,    0,   21,    1],
        [   1,    0,    0,    0,    0,  217,    1, 2860,   25,  120],
        [  15,    0,    5,    3,    7,    3,   21,    1, 2795,    0],
        [   0,    1,    0,    0,    0,  134,    0,  161,    9, 2881]
    ]),
    np.asarray([
        [2365,    9,   52,  102,   10,    0,  398,    0,   15,    1],
        [  12, 2926,    6,   40,    6,    2,    6,    0,    3,    0],
        [  56,    8, 2320,   27,  282,    2,  296,    0,   15,    0],
        [ 123,   60,   33, 2610,  111,    1,   96,    0,   21,    1],
        [  15,    9,  317,   99, 2275,    0,  233,    0,   16,    3],
        [   5,    1,    1,    2,    1, 2799,    4,  121,   25,   48],
        [ 353,    7,  265,   92,  238,    4, 1947,    0,   58,    1],
        [   0,    1,    0,    3,    2,  116,    1, 2778,   11,  126],
        [  28,    1,   16,    7,   22,   14,   51,    8, 2797,    2],
        [   3,    0,    1,    2,    0,   70,    3,  146,    5, 2832]
    ])
]

mean_precisions_by_class = [
    np.asarray([0.79985467, 0.97691669, 0.74694781, 0.8330888,  0.75214043, 0.92503362, 0.64921367, 0.90755323, 0.93001218, 0.93055809]),
    np.asarray([0.79839245, 0.94701683, 0.73793274, 0.79150497, 0.70485267, 0.92682741, 0.60545925, 0.87912836, 0.91506139, 0.91745585]),
    np.asarray([0.74157424, 0.98229906, 0.61906961, 0.66011596, 0.45564595, 0.7376524, 0.33441587, 0.58971677, 0.88814489, 0.69588295]),
    np.asarray([0.74838203, 0.98388233, 0.70179857, 0.89539027, 0.7804547,  0.98052861, 0.65385576, 0.88746578, 0.98078678, 0.90433979]),
    np.asarray([0.8017319, 0.97504806, 0.77303743, 0.85552346, 0.76848373, 0.93091832, 0.65996621, 0.91445806, 0.94983404, 0.92505939])
]

mean_recalls_by_class = [
    np.asarray([0.80810811, 0.95996328, 0.75952454, 0.87734058, 0.75871094, 0.91689701, 0.58118812, 0.91092146, 0.94403381, 0.93962509]),
    np.asarray([0.71621622, 0.95863113, 0.60239599, 0.83782631, 0.69672086, 0.89161794, 0.54597555, 0.89908175, 0.92010647, 0.91178302]),
    np.asarray([0.79155405, 0.89345398, 0.59216849, 0.88371417, 0.63213536, 0.15526024, 0.15321022, 0.91549127, 0.81593184, 0.83545687]),
    np.asarray([0.85608108, 0.96426573, 0.79275373, 0.86695248, 0.70884469, 0.88131672, 0.58417904, 0.93678024, 0.94234917, 0.95588766]),
    np.asarray([0.79898649, 0.96823924, 0.770499,   0.87465938, 0.77196011, 0.93052049, 0.64151902, 0.90992393, 0.94302257, 0.93962289])
]

mean_f1s_by_class = [
    np.asarray([0.80288307, 0.96830781, 0.75212007, 0.8539681,  0.75462026, 0.92085224, 0.60989431, 0.90911391, 0.93677903, 0.93496731]),
    np.asarray([0.74256119, 0.952461,   0.61258572, 0.80815645, 0.67354938, 0.90837514, 0.50565056, 0.88522393, 0.91709617, 0.91290083]),
    np.asarray([0.76548335, 0.93564555, 0.60474426, 0.75538131, 0.5293982, 0.25633562, 0.20990478, 0.71726538, 0.85029551, 0.75918138]),
    np.asarray([0.79846003, 0.97393321, 0.74402335, 0.88074636, 0.74275956, 0.92818834, 0.61672485, 0.91133918, 0.96112016, 0.9293543]),
    np.asarray([0.79982213, 0.97161431, 0.77121738, 0.86451959, 0.76796481, 0.93068221, 0.64932603, 0.91215338, 0.94615582, 0.9322106])
]

### Plot Results
#### Confusion Matrices

In [None]:
if not run_best_only:
    # A lot of the code to generate heatmaps is taken from this website:
    # https://matplotlib.org/3.1.1/gallery/images_contours_and_fields/image_annotated_heatmap.html
    cls_map = ['T-shirt/Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

    model_idx = 0
    for mat in conf_mats:
        fig, ax = plt.subplots(figsize=(20, 16))
        im = ax.imshow(mat)
        ax.set_xticks(np.arange(mat.shape[1]))
        ax.set_yticks(np.arange(mat.shape[0]))
        ax.set_xticks(np.arange(mat.shape[1]+1)-.5, minor=True)
        ax.set_yticks(np.arange(mat.shape[0]+1)-.5, minor=True)
        
        ax.set_xticklabels(cls_map)
        ax.set_yticklabels(cls_map)
        
        cbar = ax.figure.colorbar(im, ax=ax)
        
        # Let the horizontal axes labeling appear on top.
        ax.tick_params(top=True, bottom=False,
                   labeltop=True, labelbottom=False)
    
        # Rotate the tick labels and set their alignment.
        plt.setp(ax.get_xticklabels(), rotation=-30, ha="right",
                 rotation_mode="anchor")
        
        # Loop over data dimensions and create text annotations.
        for i in range(len(cls_map)):
            for j in range(len(cls_map)):
                text = ax.text(j, i, mat[i, j], ha="center", va="center", color="w", size=24)

        ax.set_title("Cumulative Confusion Matrix\nfor Best {} Model".format(model_names[model_idx]))
        fig.tight_layout()
        plt.show()
        model_idx += 1

In [None]:
if not run_best_only:
    # A lot of the code to generate heatmaps is taken from this website:
    # https://matplotlib.org/3.1.1/gallery/lines_bars_and_markers/barchart.html#sphx-glr-gallery-lines-bars-and-markers-barchart-py
    cls_map = ['T-shirt/Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
    
    x = np.arange(len(cls_map))
    width = 0.1  # the width of the bars

    fig, ax = plt.subplots(figsize=(20,16))
    for i in range(len(model_names)):
        rects1 = ax.bar(x - width/2 + width*i, mean_precisions_by_class[i], width, label=model_names[i])

    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax.set_ylabel('Precision')
    ax.set_title('Mean Precision by Class across 10 folds')
    ax.set_xticks(x)
    ax.set_xticklabels(cls_map)
    ax.legend()
    fig.tight_layout()

    plt.show()

In [None]:
if not run_best_only:
    # A lot of the code to generate heatmaps is taken from this website:
    # https://matplotlib.org/3.1.1/gallery/lines_bars_and_markers/barchart.html#sphx-glr-gallery-lines-bars-and-markers-barchart-py
    cls_map = ['T-shirt/Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
    
    x = np.arange(len(cls_map))
    width = 0.1  # the width of the bars

    fig, ax = plt.subplots(figsize=(20,16))
    for i in range(len(model_names)):
        rects1 = ax.bar(x - width/2 + width*i, mean_recalls_by_class[i], width, label=model_names[i])

    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax.set_ylabel('Recall')
    ax.set_title('Mean Recall by Class across 10 folds')
    ax.set_xticks(x)
    ax.set_xticklabels(cls_map)
    ax.legend()
    fig.tight_layout()

    plt.show()

In [None]:
if not run_best_only:
    # A lot of the code to generate heatmaps is taken from this website:
    # https://matplotlib.org/3.1.1/gallery/lines_bars_and_markers/barchart.html#sphx-glr-gallery-lines-bars-and-markers-barchart-py
    cls_map = ['T-shirt/Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
    
    x = np.arange(len(cls_map))
    width = 0.1  # the width of the bars

    fig, ax = plt.subplots(figsize=(20,16))
    for i in range(len(model_names)):
        rects1 = ax.bar(x - width/2 + width*i, mean_f1s_by_class[i], width, label=model_names[i])

    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax.set_ylabel('F1')
    ax.set_title('Mean F1 by Class across 10 folds')
    ax.set_xticks(x)
    ax.set_xticklabels(cls_map)
    ax.legend()
    fig.tight_layout()

    plt.show()