# Chapter 5 - Part 2: Building and Training a Neural Network From Sratch 



## Peak inside the MNIST data as delivered from the keras library

In [27]:

import numpy as np
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
from collections import Counter


def shape_data(data_images, data_labels):
    features = [np.reshape(x, (784, 1)) / 255.0 for x in data_images]
    labels = [to_categorical(y, num_classes=10) for y in data_labels]
    return list(zip(features, labels))

def load_data():
    (train_images, train_labels), (test_images, test_labels) = mnist.load_data()
    shaped_train_data = shape_data(train_images, train_labels)
    shaped_test_data = shape_data(test_images, test_labels)
    return shaped_train_data, shaped_test_data

shaped_train_data, shaped_test_data = load_data()


In [None]:

len(shaped_train_data)

In [5]:
# x is a two-dimensional array with 784 rows and 1 column. this is essentially a column vector with 784 elements
shaped_train_data[0][0].shape

(784, 1)

In [6]:
shaped_train_data[0][0][:3]

array([[0.],
       [0.],
       [0.]])

In [8]:
# ground truth y is a 1D array containing 10 elements

shaped_train_data[0][1].shape


(10,)

In [9]:
shaped_test_data[0][1]

array([0., 0., 0., 0., 0., 0., 0., 1., 0., 0.], dtype=float32)

## Neural Network From Scratch 
#### *from the book Deep Learning and the Game of Go, with some minor modifications*

### Changes
- Minor linting support added 
- Deltas are processed in column vectors in np arrays instead of lists
- Added printouts of progressive accuracy over the epochs
- Using xavior initialization instead of random initialization of weights to handle dissapearing gradients
- Reduced learning rate from 3 to .9 to handle exploding gradients (or what I supposed to be exploding gradients and which was resulting in a failing model)


In [28]:
import random
import numpy as np
from typing import Union


def sigmoid_double(x):
    return 1.0 / (1.0 + np.exp(-x))

def sigmoid(z):
    return np.vectorize(sigmoid_double)(z)


def sigmoid_prime_double(x):
    return sigmoid_double(x) * (1 - sigmoid_double(x))

def sigmoid_prime(z):
    return np.vectorize(sigmoid_prime_double)(z)


class MSE:

    def __init__(self):
        pass
    
    @staticmethod
    def loss_function(predictions, labels):
        diff = predictions - labels
        return 0.5 * sum(diff * diff)[0] 
    
    @staticmethod
    def loss_derivative(predictions, labels):
        predictions = predictions.flatten()
        labels = labels.flatten()
        return predictions - labels

    

class Layer:  


    def __init__(self):
        self.params = []
        self.previous = None  
        self.next = None  

        self.input_data = None  
        self.output_data = None

        self.input_delta = None  
        self.output_delta = None

    def connect(self, layer):

        self.previous = layer
        layer.next = self

    def forward(self):
        raise NotImplementedError
    
    def get_forward_input(self):
        if self.previous is not None:
            return self.previous.output_data
        else:
            return self.input_data

    def backward(self):
        raise NotImplementedError
    
    def get_backward_input(self) -> np.ndarray:
        """

        output shape is column vector (n, 1) due to processing needs in DenseLayer.backward()
        """
        if self.next is not None:
            backward_feed_delta = self.next.output_delta
        else:
            backward_feed_delta = self.input_delta
        
        backward_feed_delta = backward_feed_delta.reshape(-1, 1)
        return backward_feed_delta
        
    def clear_deltas(self):
        pass

    def update_params(self, learning_rate):
        pass
    
    def describe(self):
        raise NotImplementedError
    

class ActivationLayer(Layer):
    def __init__(self, input_dim):
        super(ActivationLayer, self).__init__()

        self.input_dim = input_dim
        self.output_dim = input_dim 
            
    def forward(self):
        data = self.get_forward_input()
        self.output_data = sigmoid(data)

    def backward(self):

        delta = self.get_backward_input()
        data = self.get_forward_input()

        self.output_delta = delta * sigmoid_prime(data)

    def describe(self):
        print("|-- " + self.__class__.__name__)
        print(" |-- dimensions: ({},{})".format(self.input_dim, self.output_dim))


class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, initialization="random"):
        super(DenseLayer, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim

        if initialization == "xavier":
            a = np.sqrt(6.0 / (input_dim + output_dim))
            self.weight = np.random.uniform(-a, a, (output_dim, input_dim))
        else:
            self.weight = np.random.randn(output_dim, input_dim)

        self.bias = np.random.randn(output_dim, 1)

        self.params = [self.weight, self.bias]

        self.delta_w = np.zeros(self.weight.shape)
        self.delta_b = np.zeros(self.bias.shape)

    def forward(self):
        data = self.get_forward_input()
        self.output_data = np.dot(self.weight, data) + self.bias

    def backward(self):

        data = self.get_forward_input()
        delta = self.get_backward_input()

        self.delta_b += delta
        self.delta_w += np.dot(delta, data.transpose())
        self.output_delta = np.dot(self.weight.transpose(), delta)
  
    def update_params(self, learning_rate):

        self.weight -= learning_rate * self.delta_w
        self.bias -= learning_rate * self.delta_b

    def clear_deltas(self):
        self.delta_w = np.zeros(self.weight.shape)
        self.delta_b = np.zeros(self.bias.shape)

    def describe(self):
        print("|--" + self.__class__.__name__)
        print("    |-- dimensions: ({}, {})".format(self.input_dim, self.output_dim))


class SequentialNetwork:
    def __init__(self,loss=None):
        print("Initializing network...")
        self.layers = []
        if loss is None:
            self.loss = MSE()

    def add(self, layer):
        self.layers.append(layer)
        layer.describe()
        if len(self.layers) > 1:
            self.layers[-1].connect(self.layers[-2])

    def train(self, training_data, epochs, mini_batch_size, learning_rate, test_data=None):
        """
        This function differs from the one in the book because it reporst on progress for each epoch and tests on the test data after each epoch
        """
        n = len(training_data)

        initial_weights_sum = np.sum(self.layers[-2].weight)
        print(f" initial sum of weights of the last layer: {initial_weights_sum}")
        
        for epoch in range(epochs):
            random.shuffle(training_data)
            mini_batches = [
                training_data[k:k + mini_batch_size] for k in range(0, n, mini_batch_size)
            ]

            epoch_loss = 0 

            for mini_batch in mini_batches:
                batch_loss = self.train_batch(mini_batch, learning_rate)  
                epoch_loss += batch_loss  

            epoch_loss /= len(mini_batches)   

            print(f"Epoch {epoch} complete. Average Training loss: {epoch_loss}")

            epoch_weights_sum = np.sum(self.layers[-2].weight)
            print(f"Sum of weights of the last layer after epoch {epoch}: {epoch_weights_sum}")

            if test_data:
                n_test = len(test_data)
                print("Epoch {0}: {1} / {2}".format(epoch, self.evaluate(test_data), n_test))
            else:
                print("Epoch {0} complete".format(epoch))

    def train_batch(self, mini_batch, learning_rate):
        
        self.forward_backward(mini_batch)

        batch_loss = 0  

        for x, y in mini_batch:
            batch_loss += self.loss.loss_function(self.single_forward(x), y)

        batch_loss /= len(mini_batch)  


        self.update(mini_batch, learning_rate)

        return batch_loss 
    
    def update(self, mini_batch, learning_rate):
        learning_rate = learning_rate / len(mini_batch)

        for layer in self.layers:
            layer.update_params(learning_rate)

        for layer in self.layers:
            layer.clear_deltas()

    def forward_backward(self, mini_batch):
        for x, y in mini_batch:
            self.layers[0].input_data = x  # seting here the input data for the first layer directly
            
            # start up forward propagation
            for layer in self.layers:
                layer.forward()

            self.layers[-1].input_delta = self.loss.loss_derivative(self.layers[-1].output_data, y)

            # ...backward propagation
            for layer in reversed(self.layers):
                layer.backward()

    def single_forward(self, x):
        """
        used for testing 
        """
        self.layers[0].input_data = x  # set input data for the first layer same as in forward_backward(). 

        for layer in self.layers:
            layer.forward()
            
        return self.layers[-1].output_data
    
    def evaluate(self, test_data):
        test_results = [(
            np.argmax(self.single_forward(x)),
            np.argmax(y)
        ) for (x, y) in test_data]
        return sum(int(x == y) for (x, y) in test_results)
    
    
net2 = SequentialNetwork()
net2.add(DenseLayer(784, 392, initialization="xavier"))
net2.add(ActivationLayer(392))
net2.add(DenseLayer(392, 196, initialization="xavier"))
net2.add(ActivationLayer(196))
net2.add(DenseLayer(196, 10, initialization="xavier"))
net2.add(ActivationLayer(10))

shaped_train_data, shaped_test_data = load_data()


# shaped_train_data = shaped_train_data[:10000]
# shaped_test_data = shaped_test_data[:10000]

# shaped_train_data, shaped_test_data 
net2.train(shaped_train_data, epochs=10, mini_batch_size=10,
          learning_rate=.9, test_data=shaped_test_data)  

Initializing network...
|--DenseLayer
    |-- dimensions: (784, 392)
|-- ActivationLayer
 |-- dimensions: (392,392)
|--DenseLayer
    |-- dimensions: (392, 196)
|-- ActivationLayer
 |-- dimensions: (196,196)
|--DenseLayer
    |-- dimensions: (196, 10)
|-- ActivationLayer
 |-- dimensions: (10,10)
 initial sum of weights of the last layer: -2.6337755720653
Epoch 0 complete. Average Training loss: 0.7469072502517705
Sum of weights of the last layer after epoch 0: -258.349462177276
Epoch 0: 9244 / 10000
Epoch 1 complete. Average Training loss: 0.8331697449158939
Sum of weights of the last layer after epoch 1: -296.1944352784897
Epoch 1: 9438 / 10000
Epoch 2 complete. Average Training loss: 0.8461794080545909
Sum of weights of the last layer after epoch 2: -309.34416223099527
Epoch 2: 9575 / 10000
Epoch 3 complete. Average Training loss: 0.8547710953048605
Sum of weights of the last layer after epoch 3: -326.8422937277082
Epoch 3: 9625 / 10000
Epoch 4 complete. Average Training loss: 0.8606

In [25]:
# a bit more info on the data 

train_labels_int = [np.argmax(label) for _, label in shaped_train_data]
test_labels_int = [np.argmax(label) for _, label in shaped_test_data]
from collections import Counter

train_label_counts = Counter(train_labels_int)
test_label_counts = Counter(test_labels_int)

print("Train label counts:", train_label_counts)
print("Test label counts:", test_label_counts)

Train label counts: Counter({1: 6742, 7: 6265, 3: 6131, 2: 5958, 9: 5949, 0: 5923, 6: 5918, 8: 5851, 4: 5842, 5: 5421})
Test label counts: Counter({1: 1135, 2: 1032, 7: 1028, 3: 1010, 9: 1009, 4: 982, 0: 980, 8: 974, 6: 958, 5: 892})
