## Course: Deep Learning
### Date: Nov-Dec 2023

Here, we were tasked to create a neural network for MNIST data. The network consists of two linear layers, a hidden layer with 300 nodes, a sigmoid activation, an output layer with 10 classes, and a softmax activation over the output layer.
<br><br>
The functions `load()` and `load_mnist()` were provided. Only plain python and the `numpy` package could be used.

In [None]:
import numpy as np
import pickle
import os

def load():
    with open("mnist.pkl",'rb') as f:
        mnist = pickle.load(f)
    return mnist["training_images"], mnist["training_labels"], mnist["test_images"], mnist["test_labels"]

def load_mnist(final=False, flatten=True):
    """
    Load the MNIST data.

    :param final: If true, return the canonical test/train split. If false, split some validation data from the training
       data and keep the test data hidden.
    :param flatten: If true, each instance is flattened into a vector, so that the data is returns as a matrix with 768
        columns. If false, the data is returned as a 3-tensor preserving each image as a matrix.

    :return: Two tuples and an integer: (xtrain, ytrain), (xval, yval), num_cls. The first contains a matrix of training
     data and the corresponding classification labels as a numpy integer array. The second contains the test/validation
     data in the same format. The last integer contains the number of classes (this is always 2 for this function).

     """

    if not os.path.isfile('mnist.pkl'):
        init()

    xtrain, ytrain, xtest, ytest = load()
    xtl, xsl = xtrain.shape[0], xtest.shape[0]

    if flatten:
        xtrain = xtrain.reshape(xtl, -1)
        xtest  = xtest.reshape(xsl, -1)

    if not final: # return the flattened images
        return (xtrain[:-5000], ytrain[:-5000]), (xtrain[-5000:], ytrain[-5000:]), 10

    return (xtrain, ytrain), (xtest, ytest), 10

# Sigmoid activation.
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Create empty network.
def create_empty_network(X):
    network = [np.zeros((X.shape[0], 300)), # first layer
               np.zeros((X.shape[0], 300)), # sigmoid-activated layer
               np.zeros((X.shape[0], 10))] # output layer
    return network

def forward_pass(X, action_per_layer, weights, biases):
    network = create_empty_network(X)
    
    # Forward pass.
    layer = X
    passed_network = []
    for i in range(len(network)):
        if (action_per_layer[i] == "dense"):
            layer = np.dot(layer, weights[i]) + biases[i]
        elif (action_per_layer[i] == "sigmoid"):
            layer = sigmoid(layer)
        passed_network.append(layer)

    # Check.
    assert len(network) == len(passed_network)
    for i in range(len(network)):
        assert network[i].shape == passed_network[i].shape
        
    return passed_network

# Get gradient of crossentropy+softmax.
# output_layer is the last activated layer.
# labels is the list with targets.
def get_first_gradient(output_layer, labels):
    one_or_zero = np.zeros_like(output_layer)
    one_or_zero[np.arange(len(output_layer)), labels] = 1
    softmax = do_softmax(output_layer)
    gradient = (- one_or_zero + softmax) / output_layer.shape[0]
    return gradient

# Do backward pass and update gradients.
def backward_pass(passed_network, X, labels, action_per_layer, weights, biases, learning_rate = 0.001):
    
    network = create_empty_network(X)
    
    # Backpropagation
    data_and_layers = [X] + passed_network
    gradient = get_first_gradient(passed_network[-1], labels)

    for i in range(len(network))[::-1]:
        if (action_per_layer[i] == "dense"):
            # Update weights.
            gradients_weights = np.dot(data_and_layers[i].T, gradient)
            weights[i] = weights[i] - learning_rate * gradients_weights
            # Update biases.
            gradients_biases = data_and_layers[i].shape[0] * gradient.mean(axis = 0)
            biases[i] = biases[i] - learning_rate * gradients_biases
            # Backpropagate gradient.
            gradient = np.dot(gradient, weights[i].T)
        if (action_per_layer[i] == "sigmoid"):
            gradient = gradient * data_and_layers[i+1] * (1-data_and_layers[i+1])

    return weights, biases

# Apply softmax
def do_softmax(output_layer):
    softmax_layer = np.zeros_like(output_layer)
    for instance_index in range(output_layer.shape[0]):
        for class_index in range(output_layer.shape[1]):
            softmax = np.exp(output_layer[instance_index][class_index])/np.sum(np.exp(output_layer[instance_index]))
            softmax_layer[instance_index][class_index] = softmax
    return(softmax_layer)

# Accuracy.
def compute_accuracy(softmax_layer, labels):
    accuracy = np.mean(np.argmax(softmax_layer, axis=1) == labels)
    return accuracy

# Cross entropy loss, including softmax activation.
def compute_CE_loss(predictions, labels):
    nr_instances = labels.shape[0]
    loss = np.sum(-np.log(predictions[range(nr_instances),labels])) / nr_instances
    return loss

def create_batches(X, labels, batch_size):
    
    # Number of instances must be divisable by batch size.
    assert X.shape[0]%batch_size == 0

    list_with_batches = []
    list_with_batchlabels = []
    index = 0
    for batch_nr in range(int(X.shape[0]/batch_size)):
        start = index
        end = index+batch_size
        list_with_batches.append(X[start:end])
        list_with_batchlabels.append(labels[start:end])

        index += batch_size
        
    return list_with_batches, list_with_batchlabels

# Do one forward pass and one backward pass.
def learn(X, labels, action_per_layer, weights, biases, learning_rate = 0.1):
    # Go through network in forward pass.
    passed_network = forward_pass(X, action_per_layer, weights, biases)
    # Update parameters.
    weights, biases = backward_pass(passed_network, X, labels, action_per_layer,
                                weights, biases, learning_rate = learning_rate)
    predictions = do_softmax(passed_network[-1])
    return predictions, weights, biases

# Split data into batches and perform weight updates per batch.
def learn_per_batch(X, labels, batch_size, action_per_layer, weights, biases, learning_rate):
    
    # Empty logs.
    batch_acc_log = []
    batch_loss_log = []
    
    # Split dataset up into batches of desired size.
    batches, batchlabels = create_batches(X, labels, batch_size)

    # Train and update for each batch.
    for i in range(len(batches)):
        # Train and update.
        predictions, weights, biases = learn(batches[i], batchlabels[i], action_per_layer, weights, biases, learning_rate)
        # Add batch accuracy and loss to log.
        batch_acc_log.append(compute_accuracy(predictions, batchlabels[i]))
        batch_loss_log.append(compute_CE_loss(predictions, batchlabels[i]))

    # Take the average of all batches.
    accuracy = np.average(batch_acc_log)
    loss = np.average(batch_loss_log)
    
    return predictions, weights, biases, accuracy, loss

# Check number of correct predictions.
def validate(X, labels, action_per_layer, weights, biases):
    # Test validation set.
    passed_val_network = forward_pass(X, action_per_layer, weights, biases)
    val_predictions = do_softmax(passed_val_network[-1])

    # Calculate objectives.
    accuracy = compute_accuracy(val_predictions, labels)
    loss = compute_CE_loss(val_predictions, labels)

    return accuracy, loss

# Train network and return performance.
def train_and_report(X_train, labels_train, X_val, labels_val, batch_size, action_per_layer, weights,
                     biases, learning_rate, epochs):
    
    accuracy, loss = validate(X_train, labels_train, action_per_layer, weights, biases)
    accuracy_log = [accuracy]
    loss_log = [loss]
    
    accuracy, loss = validate(X_val, labels_val, action_per_layer, weights, biases)
    val_acc_log = [accuracy]
    val_loss_log = [loss]

    for epoch in range(epochs):

        # Train and update.
        predictions, weights, biases, accuracy, loss = learn_per_batch(X_train, labels_train,
                                                                       batch_size, action_per_layer,
                                                                       weights, biases, learning_rate)

        # Store accuracy and loss.
        accuracy_log.append(accuracy)
        loss_log.append(loss)
        
        # Validate and store objectives.
        val_acc, val_loss = validate(X_val, labels_val, action_per_layer, weights, biases)
        val_loss_log.append(val_loss)
        val_acc_log.append(val_acc)

        print("Epoch", epoch, "done")
    print("DONE")
    
    plot_learning_curve(accuracy_log, val_acc_log, loss_log, val_loss_log)
    
    return weights, biases, accuracy_log, val_acc_log, loss_log, val_loss_log

# Create matplotlib figure for learning curve with accuracy and loss of training and validation sets.
def plot_learning_curve(accuracy_log, val_acc_log, loss_log, val_loss_log):

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(11, 5))
    axes[0].plot(accuracy_log, label='Training set', color = "#008b8e")
    axes[0].plot(val_acc_log, label='Validation set', color = "#fabf01")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Accuracy")
    axes[0].legend(loc='best') # place legend
    axes[0].set_title("Accuracy over epochs")
    axes[0].grid(True) # add raster lines
    axes[0].xaxis.set_major_locator(plt.MaxNLocator(integer=True)) # convert x axis labels to integer values
    axes[1].plot(loss_log, label='Training set', color = "#bd467e")
    axes[1].plot(val_loss_log, label='Validation set', color = "#919302")
    axes[1].legend(loc='best')
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("Loss")
    axes[1].set_title("Loss over epochs")
    axes[1].grid(True) # add raster lines
    axes[1].xaxis.set_major_locator(plt.MaxNLocator(integer=True))
    fig.tight_layout()

# Load MNIST data.
(xtrain, ytrain), (xval, yval), num_cls = load_mnist()

# Visualize some examples.
import matplotlib.pyplot as plt
%matplotlib inline
plt.figure(figsize=[6,6])
for i in range(4):
    plt.subplot(2,2,i+1)
    plt.title("Label: %i"%ytrain[i])
    plt.imshow(xtrain[i].reshape([28,28]),cmap='gray');
    
# Normalize
norm_factor = 255
xtrain = xtrain/norm_factor
xval = xval/norm_factor

print("--- Dimensions ---")
print("xtrain:", np.shape(xtrain))
print("ytrain:", np.shape(ytrain))
print("xval:", np.shape(xval))
print("yval", np.shape(yval))
print("\n")
print("--- Value range ---")
print("xtrain max:", xtrain.max(), "min: ", xtrain.min(), "mean:", xtrain.mean(), "variance:", xtrain.var())
print("xval max:", xval.max(), "min: ", xval.min(), "mean:", xval.mean(), "variance:", xval.var())

# Initialize weights with random normally distributed values.
weights = [np.random.normal(loc = 0.0, scale = 1.0, size = (xtrain.shape[1],300)), # from input to first layer
           [], # none for sigmoid-activated layer
          np.random.normal(loc = 0.0, scale = 1.0, size = (300,num_cls))] # from sigmoid-activated layer to output layer

biases = [300*[0], # for first layer
         [], # none for sigmoid-activated layer
         10*[0]] # for output layer

action_per_layer = ["dense", "sigmoid", "dense"]

weights, biases, accuracy_log, val_acc_log, loss_log, val_loss_log = train_and_report(xtrain, ytrain,
                                                                                      xval, yval,
                                                                                      batch_size = 100,
                 action_per_layer = action_per_layer, weights = weights, biases = biases,
                 learning_rate = 0.01, epochs = 5)