# MLP Implementation by Numpy
Use MLP to slove the MNIST problem.
- use dense layers. f(x) = wx + b
- use ReLu
- use crossentropy loss function
- Apply backprop algrithm.

In [1]:
import numpy as np
from keras import datasets
np.random.seed(42)

2024-09-12 15:21:42.325910: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-12 15:21:42.453847: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-12 15:21:44.786486: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-12 15:21:44.989663: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-12 15:21:45.172523: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been 

In [2]:
# Define layer template
class Layer():
    """Define layers building block. There is two main function.
    -  Process the input to get the output:    output = layer.forward(input)
    -  Propagate gradients through itself.     grad_input = layer.backward(input, grad_output)
    """
    def __init__(self):
        """Here initialize the layer parameters"""
        pass

    def forward(self, input):
        """Input date with shape [batch, input_units], returns the output data [batch, output_units]"""
        return input

    def backward(self, input, grad_output):
        """Perform the backpropogradtion step through the layer, with respect to the given input.
        Compute the loss gradients w.r.t input, and apply chain rule for backprop:
        d loss / dx = (d loss / d layer) * (d layer / dx)
        For this function, already receive (d loss / d layer),  so only need to multiply it by (d layer / dx)
        """
        num_units = input.shape[1]
        d_layer_d_input = np.eye(num_units)
        return np.dot(grad_output, d_layer_d_input)

# Define ReLu
class ReLu(Layer):
    def __init__(self):
        """ ReLu layer simply apply the elementwise recificed linear unit to all input."""
        """ y = max(0, x)"""
        pass

    def forward(self, input):
        """Here apply elementwise ReLu to [batch, input_units] matrix"""
        return np.maximum(0, input)

    def backward(self, input, grad_output):
        """ Compute the gradient of loss w.r.t ReLu input"""
        relu_grad = input > 0 
        return grad_output * relu_grad

# Define Dense layer
class Dense(Layer):
    def  __init__(self, input_units, output_units, learning_rate = 0.1):
        """Perform the function f(x) = Wx + B"""
        self.learning_rate = learning_rate
        # initialize the weights with normal initializeation.
        self.weights = np.random.randn(input_units, output_units) * 0.01
        self.biases = np.zeros(output_units)

    def forward(self, input):
        """Perform  f(x) = Wx + b
        input shape:  [batch, input_units]
        output shape: [batch, output_units]
        """
        return np.dot(input, self.weights) + self.biases

    def backward(self, input, grad_output):
        # Here is df/dx = df/d dense  * d dense/dx,  w.r.t d dense/dx = weights transposed
        grad_input = np.dot(grad_output, self.weights.T)

        # compute the gradient w.r.t. weights and biases
        grad_weights = np.dot(input.T, grad_output) / input.shape[0]
        grad_biases = grad_output.mean(axis = 0)

        assert grad_weights.shape == self.weights.shape and grad_biases.shape == self.biases.shape

        # define the stochastic gradient descent step.
        self.weights = self.weights - self.learning_rate * grad_weights
        self.biases = self.biases - self.learning_rate * grad_biases

        return grad_input

####  Define the loss function

As we use the softmax, the loass as:

$ loss = - log \space {e^{a_{correct}} \over {\underset i \sum e^{a_i} } } $

rewritten as:

$ loss = - a_{correct} + log {\underset i \sum e^{a_i} } $

In [8]:
def softmax_crossentropy_with_logits(logits, reference_answer):
    """Compute the crossentropy from logits  [batch, n_classes] and ids of correct answers"""
    logits_for_answers = logits[np.arange(len(logits)), reference_answer]

    xentropy = - logits_for_answers + np.log(np.sum(np.exp(logits), axis = -1))

    return xentropy

def grad_softmax_crossentropy_with_logits(logits, reference_answer):
    """compute the crossentropy gradient from logits [batch, n_classes] and ids of correct answers"""
    ones_for_answers = np.zeros_like(logits)

    ones_for_answers[np.arange(len(logits)), reference_answer] = 1

    softmax = np.exp(logits) / np.exp(logits).sum(axis = -1, keepdims = True)

    return - ones_for_answers + softmax


### Load MNIST test data

In [7]:
# load test data and split it as train set and test set.
(x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()

#Scale image to [0, 1] for all input x data
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255

## make sure images have the same input dimension with (28, 28, 1)
#x_train = np.expand_dims(x_train, -1)
#x_test = np.expand_dims(x_train, -1)
print("x_train shape:", x_train.shape)
print("train samples:", x_train.shape[0])
print("test samples:", x_test.shape[0])

x_train shape: (60000, 28, 28)
train samples: 60000
test samples: 10000


### Define Network

In [5]:
network = []
network.append(Dense(x_train.shape[1], 100))
network.append(ReLu())
network.append(Dense(100, 200))
network.append(ReLu())
network.append(Dense(200, 10))

# Forward Path
def forward(network, X):
    activations = []
    input = X

    for layer in network:
        activations.append(layer.forward(input))
        input = activations[-1]

    assert len(activations) == len(network)
    return activations

def predict(network, X):
    logits = forward(network, X)[-1]
    return logits.argmax(axis = -1)

def train(network, X, y):
    """
    1. run forward to get all layer activations
    2. run layer.backward grom last to the first layer.
    """
    # calcuate the activations
    layer_activations = forward(network, X)
    layer_inputs = [X] + layer_activations
    logits = layer_activations[-1]

    # Compute the loss and initial gradient
    loss = softmax_crossentropy_with_logits(logits, y)
    loss_grad = grad_softmax_crossentropy_with_logits(logits, y)

    # propagate gradients through the network
    for layer_i in range(len(network))[::-1]:
        layer = network[layer_i]

        loss_grad = layer.backward(layer_inputs[layer_i], loss_grad) # gradient w.r.t input and weight

    return np.mean(loss)
    

## Train the network

In [9]:
from tqdm import trange
def iternate_minibatches(inputs, targets, batchsize, shuffle=False):
    assert len(inputs) == len(targets)
    if shuffle:
        indices = np.random.permutation(len(inputs))

    for start_idx in trange(0, len(inputs) - batchsize + 1, batchsize):
        if shuffle:
            excerpt = indices[start_idx: start_idx + batchsize]
        else:
            excerpt = slice(start_idx, start_idx + batchsize)
        yield inputs[excerpt], targets[excerpt]

from IPython.display import clear_output
train_log = []
val_log = []

for epoch in range(25):
    for x_batch, y_batch in iternate_minibatches(x_train, y_train, batchsize=8, shuffle= True):
        train(network, x_batch, y_batch)

    train_log.append(np.mean(predict(network, x_train) == y_train))
    val_log.append(np.mean(predict(network, x_test) == y_test))

    clear_output()
    print("Epoch:", epoch)
    print("Train accuracy: ", train_log[-1])
    print("Test accuracy: ", val_log[-1])
    plt.plot(train_log, label = "Train accuracy")
    plt.plot(val_log, label = "Test accuracy")
    plt.legend(loc = "best")
    plt.grid()
    plt.show()

  0%|          | 0/7500 [00:00<?, ?it/s]


ValueError: operands could not be broadcast together with shapes (8,10) (8,28) 

# Unit test

### Test code for loss function(not completed version)

In [None]:
logits = np.linspace(-1,1,500).reshape([50,10])
answers = np.arange(50)%10

softmax_crossentropy_with_logits(logits,answers)
grads = grad_softmax_crossentropy_with_logits(logits,answers)
#numeric_grads = eval_numerical_gradient(lambda l: softmax_crossentropy_with_logits(l,answers).sum(),logits)
#assert np.allclose(numeric_grads,grads,rtol=1e-3,atol=0), "omfg, reference implementation just failed"

### Test code for Dense layer
Target: perform a few test to make sure dense layer works properly. 

In [None]:
l = Dense(128,150)

assert -0.05 < l.weights.mean() < 0.05 and 1e-3 < l.weights.std() < 1e-1, "weights must be zero mean and have small variance."\
                                                                  "If you know what you're doing, remove this assertion."
assert -0.05 < l.biases.mean() < 0.05, "Biases must be zero mean. Ignore if you have a reason to do otherwise."

#to test outputs, we explicitly set weights with fixed values. DO NOT DO THAT IN ACTUAL NETWORK!
l = Dense(3,4)

x = np.linspace(-1,1,2*3).reshape([2,3])
l.weights = np.linspace(-1,1,3*4).reshape([3,4])
l.biases = np.linspace(-1,1,4)

assert np.allclose(l.forward(x),np.array([[ 0.07272727,  0.41212121,  0.75151515,  1.09090909],
                                          [-0.90909091,  0.08484848,  1.07878788,  2.07272727]]))
print("Well done!")

### Check training data

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize = [6, 6])
for i in range(4):
    plt.subplot(2, 2, i + 1)
    plt.title("Label: %i"%y_train[i])
    plt.imshow(x_train[i].reshape([28, 28]), cmap = "gray")