In [113]:
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

def Verify(expression, message):
    assert expression

In [114]:
import pandas as pd
import torch

df_train = pd.read_csv('./sign_mnist_train.csv')
df_test = pd.read_csv("./sign_mnist_test.csv")

X_train, y_train = df_train.iloc[:,1:].values/255.0, df_train.iloc[:,0].values #normalizing takes place here
X_test, y_test = df_test.iloc[:,1:].values/255.0, df_test.iloc[:,0].values

from sklearn.preprocessing import LabelBinarizer
lb = LabelBinarizer() #One hot encoding of the labels
y_train = lb.fit_transform(y_train)
y_test = lb.fit_transform(y_test)

print("X_train numpy shape: "+ str(X_train.shape))
print("X_test numpy shape: " +str(X_test.shape))
print("y_train numpy shape: " +str(y_train.shape))
print("y_test numpy shape: " + str(y_test.shape))


X_train_tensor, y_train_tensor = torch.tensor(X_train).reshape(-1, 28, 28, 1), torch.tensor(y_train) #Reshaped to 2D images for the CNN
X_test_tensor, y_test_tensor = torch.tensor(X_test).reshape(-1,28,28,1), torch.tensor(y_test)

print("X_train tensor shape: "+ str(X_train_tensor.shape))
print("X_test tensor shape: " +str(X_test_tensor.shape))
print("y_train tensor shape: " +str(y_train_tensor.shape))
print("y_test tensor shape: " + str(y_test_tensor.shape))


X_train numpy shape: (27455, 784)
X_test numpy shape: (7172, 784)
y_train numpy shape: (27455, 24)
y_test numpy shape: (7172, 24)
X_train tensor shape: torch.Size([27455, 28, 28, 1])
X_test tensor shape: torch.Size([7172, 28, 28, 1])
y_train tensor shape: torch.Size([27455, 24])
y_test tensor shape: torch.Size([7172, 24])


In [115]:
#Base class for neural net layers, from: https://colab.research.google.com/github/yueliyl/comp551-notebooks/blob/master/NumpyDeepMLP.ipynb
#Overall approach is very similar to the one taken in most machine learning APIs like TensorFlow and PyTorch
class NeuralNetLayer:
    def __init__(self):
        self.gradient = None
        self.parameters = None
    
    def forward(self, X):
        raise NotImplementedError
    
    def backward(self, gradient):
        raise NotImplementedError
    

#Linear Layer implementation, with gaussian initialization
class LinearLayer(NeuralNetLayer):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.ni = input_size
        self.no = output_size
        self.w = np.random.randn(output_size, input_size)
        self.b = np.random.randn(output_size)
        self.cur_input = None
        self.parameters = [self.w, self.b]

    def forward(self, x):
        self.cur_input = x
        return (self.w[None, :,:] @ x[:,:,None]).squeeze() + self.b #Clever notation from the collab to deal with dimensions -> Final output is output_sizex1
    
    def backward(self, gradient):
        Verify(self.cur_input is not None, "Must call forawrd before backward")
        dw = gradient[:, :, None] @ self.cur_input[:, None, :]#Derivative with respect to the weight is the input, so we multiply the backpropped gradient by the current input
        db = gradient #Derivative with respect to b is 1, so we just keep the previous gradient
        self.gradient = [dw, db]
        return gradient.dot(self.w)
    
    def get_params(self):
        return self.w, self.b
    
    def set_params(self, w, b):
        self.w = w
        self.b = b
    

#ReLU Layer implementation
class ReLULayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()
    
    #The gradient depends on what is passed in due to the discontinuity
    def forward(self, x):
        self.gradient = np.where(x>0, 1.0, 0.0) #Gradient is 1 for input greater than 0, otherwise its 0
        return np.maximum(0, x)
    
    def backward(self,gradient):
        Verify(self.gradient is not None, "Must call forward before backward")
        return gradient *self.gradient

    def copy(self):
        return ReLULayer()
    
#Softmax layer - gradient only valid for use with cross entropy loss
class SoftmaxOutputLayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()
        self.cur_probs = None
    
    def forward(self, x):
        exps = np.exp(x)
        probs = exps / np.sum(exps, axis=-1)[:, None]
        self.cur_probs = probs
        return probs
    
    def backward(self, target):
        Verify(self.cur_probs is not None, "Must call forward before backward")
        return self.cur_probs - target #Really simple gradient form when softmax is combined with cross entropy loss function        

In [116]:
def evaluate_accuracy(y_true_onehot, predicted_probabilities):
    # Convert one-hot encoded true labels to class indices
    y_true_indices = np.argmax(y_true_onehot, axis=1)
    
    # Convert predicted probabilities to predicted class indices
    predicted_indices = np.argmax(predicted_probabilities, axis=1)
    
    # Calculate accuracy as the mean of correct predictions
    accuracy = np.mean(y_true_indices == predicted_indices)
    
    return accuracy

In [117]:
class MLP:
    def __init__(self, activation_layer_type, number_of_hidden_layers, number_of_hidden_units : list, input_dimensions = 784, output_dimensions = 25):
        Verify(number_of_hidden_layers == len(number_of_hidden_units), "Hidden layers does not match size of hidden units list.")

        self.activator = activation_layer_type
        self.number_of_hidden_layers = number_of_hidden_layers
        self.number_of_hidden_units = number_of_hidden_units

        self.dimensions_list = [input_dimensions] + number_of_hidden_units + [output_dimensions]

        self.layers = []
        for i in range(len(self.dimensions_list)-1):
            linear_layer_to_append = LinearLayer(self.dimensions_list[i], self.dimensions_list[i+1])
            self.layers.append(linear_layer_to_append)
            self.layers.append(activation_layer_type.copy())
        self.layers.append(SoftmaxOutputLayer()) #Final layer is a softmax


    def get_params(self):
        # Collect parameters from all layers that have them
        params = []
        for layer in self.layers:
            if hasattr(layer, 'get_params'):
                params.extend(layer.get_params())
        return params

    def apply_params(self, parameters):
        # Apply parameters to all layers that have them
        param_iter = iter(parameters)
        for layer in self.layers:
            if hasattr(layer, 'set_params'):
                layer.set_params(next(param_iter), next(param_iter))

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x
    
    #Just a wrapper for convenience with other code
    def predict(self, x):
        return self.forward(x)
    
    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target) #target points to the gradients now

In [118]:
def optimize_MLP_fancypants(mlp_to_optimize:MLP, X, y, lr, max_iter):
    #First split into train/validation split
    np.random.seed(13) #So that its deterministic
    val_size = int(len(X) * 0.2)  # For an 80/20 split
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    val_indices = indices[:val_size]
    train_indices = indices[val_size:]
    # Use the indices to create training and validation sets
    x_train, y_train = X[train_indices], y[train_indices]
    x_val, y_val = X[val_indices], y[val_indices]

    validation_mlp = MLP(mlp_to_optimize.activator, mlp_to_optimize.number_of_hidden_layers, mlp_to_optimize.number_of_hidden_units)
    validation_mlp.apply_params(mlp_to_optimize.get_params())

    #Setting up a validation starting point
    val_losses = []
    val_labels = y_val.copy()
    val_pred = validation_mlp.predict(x_val)
    val_loss = -(val_labels * np.log(val_pred)).sum(axis=-1).mean()
    val_losses.append(val_loss) #need a first value to compare to
    best_params = validation_mlp.get_params()

    train_losses = []
    train_labels = y_train.copy()
    
    batch_size = 128

    for i in tqdm(range(max_iter)):
        # Shuffle the data at the beginning of each epoch
        permutation = np.random.permutation(len(x_train))
        x_train_shuffled = x_train[permutation]
        y_train_shuffled = y_train[permutation]

        # Initialize variables to accumulate batch losses and accuracies
        batch_losses = []
        batch_accuracies = []

        # Mini-batch training
        for j in range(0, len(x_train), batch_size):
            # Create mini-batch data
            x_batch = x_train_shuffled[j:j + batch_size]
            y_batch = y_train_shuffled[j:j + batch_size]
            
            # Forward pass
            train_pred = mlp_to_optimize.forward(x_batch)

            # Calculate accuracy and loss for the batch
            batch_accuracy = evaluate_accuracy(y_batch, train_pred) * 100
            batch_losses.append(-(y_batch * np.log(train_pred)).sum(axis=-1).mean())
            batch_accuracies.append(batch_accuracy)

            # Backward pass
            mlp_to_optimize.backward(y_batch)

            # Update weights
            for layer in mlp_to_optimize.layers[::-1]:
                if layer.parameters is not None:
                    for (p, g) in zip(layer.parameters, layer.gradient):
                        # Update parameters using the gradients from the batch
                        # Note that we don't need to take the mean of gradients since they are already
                        # calculated for this batch
                        p -= lr * g.mean(axis=0)

        # Calculate and print the average loss and accuracy for the epoch
        epoch_loss = np.mean(batch_losses)
        epoch_accuracy = np.mean(batch_accuracies)
        train_losses.append(epoch_loss)

        # print(f"Accuracy at epoch {i} = {epoch_accuracy}%")
        # print(f"Loss at epoch {i}: {epoch_loss}")

    best_params = mlp_to_optimize.get_params() ##TODO Change to include validation loop
    y_pred = mlp_to_optimize.predict(x_train)
    print(train_losses)
    print(f"Final Accuracy = {evaluate_accuracy(y_train, y_pred)*100}%")
    return best_params
    
    
    

In [122]:
relu = ReLULayer()
mlp1 = MLP(relu, 2, [50, 30])

optimize_MLP_fancypants(mlp1, X_train, y_train, 0.01, 100)



KeyboardInterrupt: 

In [None]:
predictions_test = mlp1.predict(X_test)
print(f"Test Accuracy: {evaluate_accuracy(y_test, predictions_test)*100}%")

predictions_test = mlp1_fitted.predict(X_test)
print(f"Test Accuracy: {evaluate_accuracy(y_test, predictions_test)*100}%")

relu = ReLULayer()
mlp2 = MLP(relu, 2, [50,30])
predictions_test = mlp2.predict(X_test)
print(f"Test Accuracy: {evaluate_accuracy(y_test, predictions_test)*100}%")