In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from skimage.util import view_as_windows

In [None]:
# Extract data and labels into numpy arrays

train_labels_and_data = pd.read_csv("/content/sample_data/mnist_train.csv").to_numpy()
test_labels_and_data = pd.read_csv("/content/sample_data/mnist_test.csv").to_numpy()

train_labels = train_labels_and_data[5000:, :1]
val_labels = train_labels_and_data[:5000, :1]
test_labels = test_labels_and_data[:, :1]

train_data = np.reshape(train_labels_and_data[5000:, 1:], (55000, 28, 28)) / 255.0
val_data = np.reshape(train_labels_and_data[:5000, 1:], (5000, 28, 28)) / 255.0
test_data = np.reshape(test_labels_and_data[:, 1:], (10000, 28, 28)) / 255.0

In [None]:
class ConvLayer:
    def __init__(self, num_filters, filter_size, learning_rate):
        # Originally contained stride_size and padding but were removed for simplicity
        self.num_filters = num_filters
        self.filter_size = filter_size # filter_size is passed in as an int. The true size is filter_size x filter_size
        self.learning_rate = learning_rate
        self.filter_weights = None
        self.biases = np.zeros(num_filters)
        self.weights_initialized = False
        self.last_input = None
        self.last_output = None

    def convolve(self, input_arr, kernel, fm_rows, fm_cols):
        """
        input_arr: The matrix which we're convolving over
        kernel: The filter that is being convolved over input_arr
        fm_rows: The number of rows the feature map output will contain
        fm_cols: The number of columns the feature map output will contain

        Returns an array representing the result of convolution
        """
        assert input_arr.ndim == 3, "input_arr must be a 3D array (batch_size, height, width)"
        assert kernel.ndim == 3, "kernel must be a 3D array (batch_size, kernel_height, kernel_width)"
        assert input_arr.shape[1] == input_arr.shape[2], "All images/feature maps must be square"
        assert input_arr.shape[0] == kernel.shape[0], "Input and kernel must have the same batch size or in the case of backwards pass, number of filters"

        im2col_matrix = self.im2col(input_arr, kernel, fm_rows, fm_cols)
        dot_result = np.dot(kernel.flatten(), im2col_matrix)
        conv_result = np.reshape(dot_result, (fm_rows, fm_cols))

        return conv_result

    def im2col(self, input_arr, kernel, fm_rows, fm_cols):
        output_shape_rows = kernel.flatten().shape[0]
        output_shape_cols = fm_rows * fm_cols
        windows = view_as_windows(input_arr, kernel.shape)
        im2row_matrix = np.reshape(windows, (output_shape_cols, output_shape_rows))

        return im2row_matrix.T

    # Assumes stride_size and filter_size are compatible with the input shape
    # Also assumes that input_arr is a three dimensional array. The first dimension being
    # the number of feature maps or images, the second and third representing the feature map or image
    def forward_pass(self, input_arr):
        self.last_input = input_arr

        # Initialize filter weights
        if not self.weights_initialized:
            fan_in = input_arr.shape[0] * input_arr.shape[1] * input_arr.shape[2]
            self.filter_weights = np.random.randn(self.num_filters, input_arr.shape[0], self.filter_size, self.filter_size) * np.sqrt(2.0 / fan_in)
            self.weights_initialized = True

        # Calculate output dimensions and create output array
        feature_map_num_rows = (input_arr.shape[1] - self.filter_size) + 1
        feature_map_num_cols = (input_arr.shape[2] - self.filter_size) + 1
        feature_maps = np.zeros((self.num_filters, feature_map_num_rows, feature_map_num_cols))

        for n in range(self.num_filters):
            feature_maps[n] = self.convolve(input_arr, self.filter_weights[n], feature_map_num_rows, feature_map_num_cols)

            # Add the filter bias to the feature map
            feature_maps[n] += self.biases[n]

        # Use ReLU activation function
        feature_maps = self.relu(feature_maps)

        self.last_output = feature_maps
        return feature_maps

    # Good article is https://pavisj.medium.com/convolutions-and-backpropagations-46026a8f5d2c
    # Another good article https://hideyukiinada.github.io/cnn_backprop_strides2.html
    # Another good article https://deeplearning.cs.cmu.edu/F21/document/recitation/Recitation5/CNN_Backprop_Recitation_5_F21.pdf
    # prev_layer_grad represents the gradient of the loss with respect to each element of the output feature map.
    # prev_layer_grad is a numpy array of shape of the last output of feature_maps
    def backward_pass(self, prev_layer_grad):
        # Apply ReLU derivative to get the pre-activation values
        prev_layer_grad *= self.relu_derivative(self.last_output)

        # Calculate gradients
        filter_gradients = self.calc_filter_grads(prev_layer_grad)
        input_gradients = self.calc_input_grads(prev_layer_grad)
        bias_gradients = np.sum(prev_layer_grad, axis=(1, 2)) # Summing over all elements of each feature map for biases

        self.update_params(filter_gradients, bias_gradients) # Update filter weights and biases
        return input_gradients

    def relu(self, x):
        return np.maximum(0, x)

    def relu_derivative(self, x):
        return np.where(x > 0, 1, 0)

    def calc_filter_grads(self, prev_layer_grad):
        filter_gradients = np.zeros((self.num_filters, self.last_input.shape[0], self.filter_size, self.filter_size))
        output_rows = (self.last_input.shape[1] - prev_layer_grad.shape[1]) + 1
        output_cols = (self.last_input.shape[2] - prev_layer_grad.shape[2]) + 1

        for n in range(self.num_filters):
            # Convolve prev_layer_grad over last_input to calculate dL/dF (loss with respect to filter)
            filter_gradients[n] = self.filter_grads_helper(self.last_input, prev_layer_grad[n], output_rows, output_cols)

        return filter_gradients

    def filter_grads_helper(self, input_arr, kernel, output_rows, output_cols):
        assert input_arr.ndim == 3, "input_arr must be a 3D array (batch_size, input height, input width)"
        assert kernel.ndim == 2, "kernel must be a 2d array (output height, output width)"

        conv_result = np.zeros((input_arr.shape[0], output_rows, output_cols))

        for i in range(input_arr.shape[0]):
            im2col_matrix = self.im2col(input_arr[i], kernel, output_rows, output_cols)
            dot_result = np.dot(kernel.flatten(), im2col_matrix)
            conv_result[i] = np.reshape(dot_result, (output_rows, output_cols))

        return conv_result

    def calc_input_grads(self, prev_layer_grad):
        prev_layer_grad = self.pad_input(prev_layer_grad, self.filter_size - 1) # Pad input for a full convolution
        input_gradients = np.zeros_like(self.last_input, dtype=np.float64) # Assumes last_input is a 3D numpy array
        rot_filter_weights = np.rot90(self.filter_weights, 2, axes=(2, 3)) # Flip the filter weights' heights and widths

        input_rows = (prev_layer_grad.shape[1] - rot_filter_weights.shape[2]) + 1
        input_cols = (prev_layer_grad.shape[2] - rot_filter_weights.shape[3]) + 1

        for i in range(rot_filter_weights.shape[1]): # Iterate over batch_size
            # Convolve flipped filter_weights over prev_layer_grad to calculate dL/dX (loss with respect to input)
            input_gradients[i] = self.convolve(prev_layer_grad, rot_filter_weights[:, i], input_rows, input_cols)

        return input_gradients

    def pad_input(self, input_arr, padding):
        # input_arr should be a 3d array
        if padding > 0:
            # Don't pad the first dimension but pad the second two
            input_arr = np.pad(input_arr, ((0, 0), (padding, padding), (padding, padding)))
        return input_arr

    def update_params(self, filter_grads, bias_grads):
        self.filter_weights -= filter_grads * self.learning_rate
        self.biases -= bias_grads * self.learning_rate

In [None]:
# Good article: https://medium.com/@YasinShafiei/making-a-neural-network-fully-connected-layer-from-scratch-only-numpy-49bd7958b6f3

class FullyConnectedLayer:
    def __init__(self, num_nodes, activation, learning_rate, regularization_param):
        self.num_nodes = num_nodes # The number of nodes in the layer
        self.activation = activation # The activation function to use, either "relu" or "softmax", softmax will only be used for the final layer
        self.learning_rate = learning_rate
        self.weights = None # Will be initialized in the forward pass when the input size is known
        self.biases = np.zeros((1, num_nodes))
        self.regularization_param = regularization_param
        self.last_input = None
        self.last_output = None
        self.weights_initialized = False

    def forward_pass(self, input_arr): # Assumes input_arr is a flattened vector of shape (1, x)
        if not self.weights_initialized:
            self.weights = np.random.randn(input_arr.shape[1], self.num_nodes) * np.sqrt(2.0 / input_arr.shape[1])
            self.weights_initialized = True

        self.last_input = input_arr
        output = np.dot(input_arr, self.weights) + self.biases
        if self.activation == "relu":
            output = self.relu(output)
        if self.activation == "softmax":
            # print("output before softmax", output)
            output = self.softmax(output)

        self.last_output = output # Store output for backpropagation
        return output

    def backward_pass(self, prev_layer_grad):
        if self.activation == "relu":
            prev_layer_grad *= self.relu_derivative(self.last_output)

        # If the activation is softmax we don't need to do anything as we assume that prev_layer_grad
        # is already the difference between the softmax output and the true labels (s - y)

        input_gradients = np.dot(prev_layer_grad, self.weights.T)
        weight_gradients = np.dot(self.last_input.T, prev_layer_grad)
        bias_gradients = np.sum(prev_layer_grad, axis=0, keepdims=True)

        # Add in l2 regularization. Normally you add a term proportional to the sum of the squared weights:
        # (lambda/2) * sum_i(weights[i]^2). But here we add the regularization term to the gradients so we
        # need to take the gradient of that equation to get lambda * sum_i(weights[i]).
        weight_gradients += self.regularization_param * self.weights

        self.update_params(weight_gradients, bias_gradients)
        return input_gradients

    def relu(self, x):
        return np.maximum(0, x)

    def relu_derivative(self, x):
        return np.where(x > 0, 1, 0)

    def softmax(self, x):
        assert x.shape[0] == 1
        assert x.ndim == 2

        numerator = np.exp(x - np.max(x, axis=1))
        denominator = np.sum(numerator, axis=1)
        return numerator / denominator

    def update_params(self, weight_grads, bias_grads):
        weight_grads = np.clip(weight_grads, -1.0, 1.0)
        bias_grads = np.clip(bias_grads, -1.0, 1.0)

        self.weights -= weight_grads * self.learning_rate
        self.biases -= bias_grads * self.learning_rate

In [None]:
class MaxPoolingLayer:
    def __init__(self, filter_size, stride_size):
        assert filter_size == stride_size, "filter size and stride size must be the same"
        self.filter_size = filter_size
        self.stride_size = stride_size
        self.last_input = None
        self.indices = [] # An array that lists the indices of the maxes

    def convolve(self, input_arr, kernel_size, step_size, n):
        assert input_arr.ndim == 2, "input_arr must be a 2d array"

        output_shape = (input_arr.shape[0] - kernel_size) // step_size + 1
        windows = view_as_windows(input_arr, kernel_size, step_size)
        maxes = np.max(windows, axis=(2, 3))
        im2row_matrix = windows.reshape(output_shape ** 2, kernel_size ** 2)

        col_indices = np.argmax(im2row_matrix, axis=1)
        row_indices = np.arange(im2row_matrix.shape[0])
        n_indices = np.full(col_indices.size, n)
        old_indices = list(zip(n_indices, row_indices, col_indices))
        new_indices = []

        for n, row_idx, col_idx in old_indices:
            row_offset = (row_idx // output_shape) * kernel_size
            col_offset = (row_idx % output_shape) * kernel_size
            row = row_offset + (col_idx // kernel_size)
            col = col_offset + (col_idx % kernel_size)
            new_indices.append((n, row, col))

        return maxes, new_indices


    def forward_pass(self, input_arr):
        assert input_arr.ndim == 3, "input_arr must be a three dimensional array (output of convolutional layer)"
        assert input_arr.shape[1] == input_arr.shape[2], "input_arr must only contain square feature maps/images"

        self.indices = [] # Reset the indices list
        self.last_input = input_arr
        pooling_output = np.zeros((input_arr.shape[0], input_arr.shape[1] // self.filter_size, input_arr.shape[2] // self.filter_size))

        for n in range(pooling_output.shape[0]):
            maxes, new_indices = self.convolve(input_arr[n], self.filter_size, self.stride_size, n)
            pooling_output[n] = maxes
            self.indices.extend(new_indices)

        return pooling_output


    def backward_pass(self, prev_layer_grad):
        input_indices = np.zeros_like(self.last_input, dtype=np.float64)

        for index in self.indices:
            input_indices[index[0], index[1], index[2]] = 1

        grad_repeat_cols = np.repeat(prev_layer_grad, self.filter_size, axis=2)
        grad_repeat_rows = np.repeat(grad_repeat_cols, self.filter_size, axis=1)

        # Pad gradients in case stride size and input size weren't compatible
        if grad_repeat_rows.shape != input_indices.shape:
            pad_margin = input_indices.shape[1] % self.stride_size
            grad_repeat_rows = np.pad(grad_repeat_rows, ((0, 0), (0, pad_margin), (0, pad_margin)))

        input_gradients = input_indices * grad_repeat_rows
        return input_gradients

In [None]:
class CreateModel:
    def __init__(self):
        self.conv_layer1 = ConvLayer(num_filters=64, filter_size=3, learning_rate=0.01)
        self.mp_layer1 = MaxPoolingLayer(filter_size=2, stride_size=2)
        self.conv_layer2 = ConvLayer(num_filters=64, filter_size=3, learning_rate=0.01)
        self.mp_layer2 = MaxPoolingLayer(filter_size=2, stride_size=2)
        self.fc_layer1 = FullyConnectedLayer(num_nodes=400, activation="relu", learning_rate=0.001, regularization_param=0.1)
        self.fc_layer2 = FullyConnectedLayer(num_nodes=100, activation="relu", learning_rate=0.001, regularization_param=0.1)
        self.fc_layer3 = FullyConnectedLayer(num_nodes=10, activation="softmax", learning_rate=0.001, regularization_param=0.1)
        self.mp_layer2_shape = None # Will be initialized in forward pass and used in backward pass

    def forward_pass(self, input_image):
        conv_layer1_output = self.conv_layer1.forward_pass(input_image)
        mp_layer1_output = self.mp_layer1.forward_pass(conv_layer1_output)
        conv_layer2_output = self.conv_layer2.forward_pass(mp_layer1_output)
        mp_layer2_output = self.mp_layer2.forward_pass(conv_layer2_output)
        self.mp_layer2_shape = mp_layer2_output.shape
        flattened_output = self.flatten(mp_layer2_output)
        fc_layer1_output = self.fc_layer1.forward_pass(flattened_output)
        fc_layer2_output = self.fc_layer2.forward_pass(fc_layer1_output)
        fc_layer3_output = self.fc_layer3.forward_pass(fc_layer2_output)

        return fc_layer3_output

    def flatten(self, array):
        # Assumes array is the output of a max pooling layer and is therefore a 3d array
        flattened_array = np.reshape(array, (1, array.shape[0] * array.shape[1] * array.shape[2]))
        return flattened_array

    def backward_pass(self, output_grad):
        fc_layer3_backward_output = self.fc_layer3.backward_pass(output_grad)
        fc_layer2_backward_output = self.fc_layer2.backward_pass(fc_layer3_backward_output)
        fc_layer1_backward_output = self.fc_layer1.backward_pass(fc_layer2_backward_output)
        resized_output = np.reshape(fc_layer1_backward_output, self.mp_layer2_shape)
        mp_layer2_backward_output = self.mp_layer2.backward_pass(resized_output)
        conv_layer2_backward_output = self.conv_layer2.backward_pass(mp_layer2_backward_output)
        mp_layer1_backward_output = self.mp_layer1.backward_pass(conv_layer2_backward_output)
        conv_layer1_backward_output = self.conv_layer1.backward_pass(mp_layer1_backward_output)

        return conv_layer1_backward_output

    def train(self, train_images, train_labels, epochs, val_images=None, val_labels=None):
        for i in range(epochs):
            correct, total = 0, 0
            counter = 0
            total_loss = 0
            for train_image, train_label in zip(train_images, train_labels): # Use SGD instead of mini-batch gradient descent because it's simpler
                # Perform forward pass
                train_image = np.reshape(train_image, (1, train_image.shape[0], train_image.shape[1]))
                forward_output = self.forward_pass(train_image)

                # Calculate categorical cross-entropy loss
                epsilon = 1e-10 # To avoid getting infinity when taking the log of zero
                loss = -np.sum(train_label * np.log(forward_output + epsilon))
                total_loss += loss

                # Update correct and total counters
                predicted_label = np.argmax(forward_output, axis=1) # forward_output is shape (1, 10)
                true_label = np.argmax(train_label) # label is shape (10, )
                correct += 1 if true_label == predicted_label else 0
                total += 1

                # Perform backward pass
                output_grad = forward_output - train_label # softmax output - label (s - y)
                input_image_grad = self.backward_pass(output_grad)

                if counter % 1000 == 0:
                    print(f"Epoch {i}, iteration {counter}")

                counter += 1

            # Calculate validation and train accuracy
            if val_images is not None and val_labels is not None:
                val_accuracy = self.predict(val_images, val_labels)
            train_accuracy = correct / total

            # Log statistics
            print(f"Epoch {i+1}/{epochs}, Loss: {total_loss}, Accuracy: {train_accuracy}, Elapsed time in seconds: {time.time() - start_time}")
            print(f"Validation Accuracy: {val_accuracy}")

    def predict(self, val_images, val_labels):
        correct, total = 0, 0
        for val_image, val_label in zip(val_images, val_labels):
            val_image = np.reshape(val_image, (1, val_image.shape[0], val_image.shape[1]))
            forward_output = self.forward_pass(val_image)

            predicted_label = np.argmax(forward_output, axis=1) # forward_output is shape (1, 10)
            true_label = np.argmax(val_label) # label is shape (10, )
            correct += 1 if true_label == predicted_label else 0
            total += 1

        accuracy = correct / total
        return accuracy

In [None]:
model = CreateModel()
subset_data = train_data
subset_labels = np.zeros((subset_data.shape[0], 10)) # Make subset labels one hot vectors
for i in range(subset_labels.shape[0]):
    label = train_labels[i][0]
    subset_labels[i][label] = 1

subset_val_data = val_data
subset_val_labels = np.zeros((subset_val_data.shape[0], 10)) # Make validation labels one hot
for i in range(subset_val_data.shape[0]):
    label = val_labels[i][0]
    subset_val_labels[i][label] = 1

model.train(subset_data, subset_labels, 3, subset_val_data, subset_val_labels)