# Dev notebook

I use this notebook to develop my implementation of a simple and minimal neural network framework.

Inspiration for this network is drawn from the book [Neural Networks from Scratch (NNFS)](https://nnfs.io/) & [Pytorch's implementation](https://pytorch.org/)

Notes:
- The work on experimenting with backpropagation has been moved to another notebook to keep this one minimal
- I have chosen to use a similar naming convention to that used by pytorch (why reinvent the wheel), this has the benefit of ensuring that when we compare implementations the architecture of the networks is the same. 

In [1]:
# Import for dev and testing
import nnfs 
import numpy as np

from abc import ABC, abstractmethod
from nnfs.datasets import spiral_data
from typing import List

nnfs.init()

## Defining Base Module

This is a module that contains all the base attributes and function needed by every class in the framework

In [2]:
class Module(ABC):
    """Base class for all classes in frame work to ensure the same attributes and common function names."""

    def __init__(self) -> None:
        # Attributes to hold input and outputs
        self.input = None
        self.output = None
    
    @abstractmethod
    def forward(self):
        pass

    @abstractmethod
    def backward(self):
        pass

##  Defining layers 

In this section We define and test the layers

Notes:
- since we intend to use ReLU as one of our activation functions we will use the He weight initialization method as described in https://arxiv.org/abs/1502.01852

### Linear Layer

In [3]:
class LinearLayer(Module):
    """Linear transformation layer of the type o = ixW + b,
    
    where I is the incoming vector, W is the layers weight matrix, b is bias vector and o is the dot product of the 
    i and W plus the bias
    
    Args:
        in_features      (int):   The size of the input features 
        out_features     (int):   The size of the output features
        lambda_l1_weight (float): Hyperperamiter lambda for L1 regularization for the weights 
        lambda_l1_bias   (float): Hyperperamiter lambda for L1 regularization for the bias
        lambda_l2_weight (float): Hyperperamiter lambda for L2 regularization for the weights
        lambda_l2_bias   (float): Hyperperamiter lambda for L2 regularization for the bias
        
    Attributes:
        weights          (np_array): numpy array of in_features x n_neurons
        biases           (np_array): numpy array of 1 x n_neurons
        inputs           (np_array): numpy array of latest batch of inputs
        outputs          (np_array): numpy array of latest batch of outputs
        d_w              (np_array): The current gradients with respect to the weights 
        d_b              (np_array): The current gradients with respect to the biases
        grad             (np_array): The current gradients with respect to the inputs
        lambda_l1_weight (float):    Hyperperamiter lambda for L1 regularization for the weights 
        lambda_l1_bias   (float):    Hyperperamiter lambda for L1 regularization for the bias
        lambda_l2_weight (float):    Hyperperamiter lambda for L2 regularization for the weights
        lambda_l2_bias   (float):    Hyperperamiter lambda for L2 regularization for the bias
    """

    def __init__(self, in_features, out_features, 
                 lambda_l1_weight=0, lambda_l1_bias=0, 
                 lambda_l2_weight=0, lambda_l2_bias=0) -> None:
        super().__init__()
        # initializing weights and biases 
        self.weights = np.random.normal(0.0, np.sqrt(2/in_features), (in_features, out_features))
        # Using a simpler initialization  for testing 
        #self.weights = 0.01 * np.random.randn(in_features, out_features)
        self.bias = np.zeros((1, out_features))
        # initializing regularization lambdas
        if (lambda_l1_bias > 0) | (lambda_l1_weight > 0):
            self.lambda_l1_weight = lambda_l1_weight
            self.lambda_l1_bias = lambda_l1_bias
        else:
            self.lambda_l1_weight = None
            self.lambda_l1_bias = None
        if (lambda_l2_bias > 0) | (lambda_l2_weight > 0):  
            self.lambda_l2_weight = lambda_l2_weight
            self.lambda_l2_bias = lambda_l2_bias
        else: 
            self.lambda_l2_weight = None
            self.lambda_l2_bias = None

    def forward(self, inputs):
        """Forward pass through the layer.
        
        Args:
        inputs (np_array): Inputs to the layer must be the same size as the weights.
        """
        self.input = inputs
        self.output = np.dot(inputs, self.weights) + self.bias
        return self.output

    def l1_backward_w(self):
        """Backpropagation of L1 regularization function wrt weights."""
        if self.lambda_l1_weight:
            d_l1 = np.ones_like(self.weights) 
            d_l1[self.weights < 0] = -1
            return d_l1 * self.lambda_l1_weight
        else:
            return 0

    def l1_backward_b(self):
        """Backpropagation of L1 regularization function wrt bias."""
        if self.lambda_l1_bias:
            d_l1 = np.ones_like(self.bias) 
            d_l1[self.bias < 0] = -1
            return d_l1 * self.lambda_l1_bias
        else:
            return 0  

    def l2_backward_w(self):
        """Backpropagation of L2 regularization function wrt weights."""
        if self.lambda_l2_weight:
            return 2 * self.lambda_l2_weight  * self.weights
        else:
            return 0
        
    def l2_backward_b(self):
        """Backpropagation of L1 regularization function wrt bias."""
        if self.lambda_l2_bias:
            return 2 * self.lambda_l2_bias  * self.bias
        else:
            return 0

    def backward(self, d_vals):
        """Backpropagation  of the linear layer function

        Args:
            d_vals (np_array): derivatives from the previous layer/function.
        """
        self.d_w = np.dot(self.input.T, d_vals) + self.l1_backward_w() + self.l2_backward_w()
        self.d_b = np.sum(d_vals, axis=0, keepdims=True) + self.l1_backward_b() + self.l2_backward_b()

        self.grad = np.dot(d_vals, self.weights.T)

    def regularization_loss(self):
        """Calculates the regularization loss of the layer. 
        It will only do the calculation if the respective lambda for the loss type is > 0"""
        loss = 0
        # L1 weight 
        if self.lambda_l1_weight:
            loss += self.lambda_l1_weight * np.sum(np.abs(self.weights))
        # L1 bias
        if self.lambda_l1_bias:
            loss += self.lambda_l1_bias * np.sum(np.abs(self.bias))
        # L2 weight
        if self.lambda_l2_weight:
            loss += self.lambda_l2_weight * np.sum(self.weights * self.weights)
        # L2 bias
        if self.lambda_l2_bias:
            loss += self.lambda_l2_bias * np.sum(self.bias * self.bias)

        return loss


#### Testing Linear Layer

In [4]:
# The sample data is a list of coordinate, ie two points 
# The layer therefore will take 2 inputs 
# We have given it 3 out features (3 neurons) so we expect to see an output with the shape (n_samples*n_neurons, n_neurons)
# In out case that should be (300, 3) 
X, _ = spiral_data(samples=100, classes=3)
linear1 = LinearLayer(2, 3)
output = linear1.forward(X)
print(output[:10, :])

assert output.shape == (300,3)


[[ 0.          0.          0.        ]
 [-0.01047519  0.01139536 -0.00479835]
 [-0.02741484  0.03172915 -0.00869218]
 [-0.04218837  0.05266625 -0.00559127]
 [-0.05770768  0.0714014  -0.00894304]
 [-0.0354307   0.03502549 -0.02336348]
 [-0.089267    0.10767876 -0.01945324]
 [-0.09335078  0.10723802 -0.0312274 ]
 [-0.11243759  0.13112801 -0.03362967]
 [-0.13386956  0.16200906 -0.02810179]]


## Defining Dropout

The drop out layer will take one hyperparameter, rate,  which will represent the percentage of neurons that will be deactivated with each forward pass. 

To achieve this will will make a mask the size of the output of the previous layer (number of neurons) and apply a binomial distribution. The distribution will be used to generate an array of zero or ones that will be multiplied against the output of the previous layer. This will deactivate or zero out those outputs that correspond to the zeros generated to the binomial dist.


In [5]:
class Dropout(Module):
    """ Dropout Layer, intended to be used in traning to deactivate a random portion of the neurons from 
    a pervious layer to based on the work https://arxiv.org/abs/1207.0580

    Args:
        p (float): probability of an element to be set to zero

    Attributes:
        p            (float):   probability of an element to be set to zero
        mask         (ndarray): Latest scaled binary mask used to zero out input elements 
        traning_mode (binary):  Binary flag to control behaviour betwen traning and eval modes
    
    """

    def __init__(self, p:float) -> None:
        super().__init__()
        self.p = p
        self.mask = None
        self.training_mode = True

    def forward(self, input:np.ndarray) -> np.ndarray:
        """During training it will randomly zero out a number of inputs according to a binomial distribution
        and it will also scale the inputs by 1/1-p to account for the lack of dropout in evaluation mode.
        During evaluation it returns the input.
        
        Args:
            input (ndarray): Output from a previous layer
        """
        if not self.training_mode:
            # Eval operation mode -> NO DROPOUT
            self.output = input
            return self.output

        # Training operation mode -> Dropout 
        self.input = input
        self.mask = np.random.binomial(1, self.p, size=input.shape)/ (1-self.p)
        self.output = input * self.mask 
        return self.output
    
    def backward(self, grads):
        """Backpropagation of the dropout function.
        
        Args:
            grads (ndarray): gradients from the next layer.
        """
        self.grad = grads * self.mask
    

### Testing Dropout

In [6]:
# using the output of the previous test
input = output[:10, :]
p = 0.8 # deactivate 10% of the neurons

dropout = Dropout(p)
dropout.forward(input)

array([[ 0.        ,  0.        ,  0.        ],
       [-0.05237594,  0.05697681, -0.02399175],
       [-0.13707422,  0.        , -0.0434609 ],
       [-0.        ,  0.        , -0.02795634],
       [-0.28853839,  0.35700701, -0.04471522],
       [-0.17715348,  0.17512744, -0.1168174 ],
       [-0.446335  ,  0.53839382, -0.09726618],
       [-0.46675388,  0.53619012, -0.15613699],
       [-0.56218795,  0.65564007, -0.        ],
       [-0.        ,  0.8100453 , -0.14050897]])

## Defining Activation Functions

### ReLu

$$y = \begin{cases}
   x &x> 0 \\
   0 & otherwise
\end{cases} $$

In [7]:
class ReLU(Module):
    """Applies Rectified linear Unit function to vector.
    
    Attributes:
        inputs            (ndarray): numpy array of latest batch of inputs
        outputs           (ndarray): numpy array of latest batch of outputs
        grad              (ndarray): The current gradients with respect to the inputs
    """
    def __init__(self) -> None:
        # initializing attributes needed for backwards 
        super().__init__()
        self.grad = None
    
    def forward(self, x):
        # storing inputs needed for backwards 
        self.inputs = x
        self.output = np.maximum(x, 0)
        return self.output
    
    def backward(self, d_vals):
        self.grad = d_vals.copy()
        self.grad[self.inputs <= 0] = 0

#### Testing ReLU

In [8]:
i = [-2, 3, 4, 0, 0.1, -44]
test_relu = ReLU()

# Checking values are as expected 
assert np.all(np.array([0., 3, 4, 0, 0.1, 0.]) == test_relu.forward(i))

test_relu.output

array([0. , 3. , 4. , 0. , 0.1, 0. ])

### Softmax

$$\text{softmax}(x)_i = \frac{exp(x_i)}{\sum_{j}^{ }exp(x_j))}$$

The soft max represents the confidence score for each output class and adds up to 1.

In [9]:
class Softmax(Module):
    """Applies Softmax function to input.
    
    Attributes:
        inputs            (ndarray): numpy array of latest batch of inputs
        outputs           (ndarray): numpy array of latest batch of outputs
        grad              (ndarray): The current gradients with respect to the inputs
        confidence_scores (ndarray): Latest batch of classification probabilities
    """

    def __init__(self) -> None:
        super().__init__()
        self.confidence_scores = None

    def forward(self, x):
        """Forward pass
        Args:
            x (ndarray): Input from the pervious layer
        """
        # exponenets of each value
        exp_vals = np.exp(x - np.max(x, axis=1, keepdims=True))
        exp_sum = np.sum(exp_vals, axis=1, keepdims=True)
        # Normalization to get the proabilities 
        self.output = exp_vals/exp_sum
        return self.output

    def _backward(self, d_vals):
        """Backward pass which calculates the gradient wrt the inputs 

        Args:
            d_vals (ndarray): gradients from the loss calculation
        """
        # Initialize array for gradients wrt to inputs
        self.grad = np.zeros_like(d_vals)
        
        _iter = enumerate(zip(self.output, d_vals))
        for i, conf_score, d_val in _iter:
            # Flatten confidence scores
            cs = conf_score.reshape(-1, 1)
            # Find the Jacobian matrix of the output 
            j_matrix = np.diagflat(cs) - np.dot(cs, cs.T)
            # get the gradient 
            self.grad[i] = np.dot(j_matrix, d_val)
    
    def backward(self, y_pred, y_true):
        """Combined backward pass for CCE & Softmax as a single which is
           faster to compute.

        Args:
            y_pred (ndarray): predicted classes for the current batch
            y_true (ndarray): One hot encoded true values for y
        """
        # Number of examples in the batch
        n = len(y_pred)

        # Getting descrete vals from one hot encoding 
        y_true = np.argmax(y_true, axis=1)
        
        self.grad = y_pred.copy()
        self.grad[range(n), y_true] -= 1
        self.grad = self.grad / n
        return self.grad


#### Testing Softmax

In [10]:
softmax = Softmax()
softmax.forward([[1,2,44]])

array([[2.11513104e-19, 5.74952226e-19, 1.00000000e+00]])

## Defining Loss - Categorical Cross-Entropy

$$ L_i = -\sum_j y_{i,j}\log(\hat{y}_{i,j}) $$

With taking one hot encoding into account we can simplify this down to:

$$ L_i = -y_{i,k}\log(\hat{y}_{i,k}) $$

where K is the index of the correct class

In [11]:
class CategoricalCrossEntropyLoss:
    """Calculates the CCE loss for a given set of predictions.
    This method expect a softmax output and one-hot encoded label mask
    
    y_pred (np_array): matrix of confidence scores of the prediction
    y_true (np_array): matrix of one-hot encoded true lables of the classes
    """
    def forward(y_pred, y_true):
        # Clipping and applying one hot encoded labels as mask 
        # to zero out scores corresponding to incorrect classes
        # We clip to make sure that none of the reaming classes are 0 or 
        # exactly 1 
        clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)
        corrected = np.sum(clipped*y_true, axis=1)
        # Taking the -ve log of the remaining confidence scores 
        negative_log = -np.log(corrected)
        return np.mean(negative_log)

    def backward(y_pred, y_true):
        """Backpropagation  of the CCE Loss

        Args:
            y_pred (np_array) array of predictions.
            y_true (np_array) array of correct labels.
        """
        return (-y_true/y_pred)/len(y_pred)

#### Testing CCE Loss

In [12]:
y_pred = np.array([[0.7, 0.1, 0.2], [0.1,0.5,0.4],[0.02,0.9,0.08]])
y_true = np.array([[1,0,0], [0,1,0], [0,1,0]])

loss_function = CategoricalCrossEntropyLoss
loss_function.forward(y_pred, y_true)

0.38506088005216804

## Defining Optimizers

### Stochastic Gradient Decent 

$$ \text{Update} = -\text{Learning Rate} \cdot \text{Gradient}$$

In [13]:
class SDG:
    """Stochastic Gradient Decent class used to update layer paramers
    The update is the -ve learning rate multiplied by the gradient calculated 
    in the backward step. Optionally it will also apply momentum and decay.

    Args:
        lr       (float): Learning rate to scale the gradients by for the update
        decay    (float): Decay rate used to scale learning rate
        momentum (float): momentum factor used to scale updates to avoid local minima
    
    Attributes:
        lr         (float): Learning rate to scale the gradients by for the update
        clr        (float): Learning rate at the current step
        decay      (float): Decay rate used to scale learning rate
        momentum   (float): momentum factor used to scale updates to avoid local minima
        iterations (int):   Number of times optimizer has completed a step
    """
    IMPLEMENTED = [LinearLayer]

    def __init__(self, learning_rate=1, decay=0., momentum=0.) -> None:
        self.lr = learning_rate
        self.clr = learning_rate # current learning rate
        self.decay = decay
        self.momentum = momentum
        self.iterations = 0

    def init_momentum(self, layers):
        """Initializes momentum arttribute for layer objects.
        Args:
            Layers (list): A list of layers that need to be updated with momentum.
        """
        for layer in layers:
            if not hasattr(layer, 'momentum_w'):
                layer.momentum_w = np.zeros_like(layer.weights)
                layer.momentum_b = np.zeros_like(layer.bias)

    def pre_update_step(self):
        """Update the current learning rate according to the decay and iterations"""
        decay_rate = 1/(1 + self.decay * self.iterations)
        self.clr = self.lr * decay_rate

    def get_updates(self, layer):
        """Get the update values for a layer's weights and biases
        Args:
            Layers (list): A list of layers that need to be updated with momentum."""
        return (
            -self.clr*layer.d_w,
            -self.clr*layer.d_b
        )

    def get_momentum_updates(self, layer):
        """Updates a layers momentum."""
        wu = (self.momentum * layer.momentum_w) - (self.clr * layer.d_w) 
        bu = (self.momentum * layer.momentum_b) - (self.clr * layer.d_b) 
        layer.momentum_w = wu
        layer.momentum_b = bu
        return (wu, bu)

    def update(self, layers):
        """Update a layers parameters
        Args:
            Layers (list): A list of layers that need to be updated.
        """
        # Test to make sure all layers supported
        if any(l for l in layers if type(l) not in self.IMPLEMENTED):
            unsupported = next(l for l in layers if type(l) not in self.IMPLEMENTED)
            raise NotImplementedError(f'SDG does not support {unsupported.__class__}')

        # pre update step
        if self.decay:
            self.pre_update_step()

        # On the first iteration using momentum initialize the layer momentums
        if self.iterations == 0 and self.momentum:
            self.init_momentum(layers)

        # Update step
        for layer in layers:

            if self.momentum:
                weight_u, bias_u = self.get_momentum_updates(layer)
            else:
                weight_u, bias_u = self.get_updates(layer)
            
            layer.weights += weight_u
            layer.bias += bias_u

        # post update
        self.iterations += 1 

### Adam

Short for Adaptive Momentum.

An extension to the Root mean square propagation (RSMprop) technique that adds in a bias correction mechanism used to correct the momentum and momentum caches.

To find the update with Adam we need to take the following steps:

1. Find momentum for the current step
2. Get corrected the momentum 
3. Update the cache with the square of the gradient 
4. Get the corrected cache 
5. Update weights 


In the first step we calculate the layer weight and bias momentums by:

$$ \text{Layer Momentum} = (\beta_1 \cdot \text{Layer Momentum}) + ((1 - \beta_1) \cdot gradient)$$ 

where $\beta_1$ is a hyper-parameter that allows us to apply fractions of the momentum and gradient at each step. 

To correct this we then divide the momentum by bias correction mechanism: 

$$ \text{Corrected Momentum} = \frac{\text{Layer Momentum}}{1 - \beta_1^{n+1}} $$

where $n$ is the number of the iteration/epoch and we add 1 to it to account for initializing it from 0

Next we update the cache for the weights and biases:

$$ \text{Cache} = (\beta_2 \cdot \text{Cache}) + ((1 - \beta_2) * \text{gradients}^2)$$

We once again correct, this time the cache, with Adam's bias correction mechanism:

$$ \text{Corrected Cache} = \frac{\text{Cache}}{{1 - \beta_2^{n+1}}}$$

Finally to update the weights we do the following:

$$ \text{Update} = \frac{\text{Current Learning Rate} \cdot \text{Corrected Momentum}}{\sqrt{\text{Corrected Cache}} + \epsilon} $$ 


In [14]:
class Adam:
    """Adam Optimizer Short for Adaptive Momentum.
    An extension to the Root mean square propagation (RSMprop) technique that adds in a bias correction mechanism used to correct the momentum and momentum caches.
    To find the update with Adam we need to take the following steps:
        1. Find momentum for the current step
        2. Get corrected the momentum 
        3. Update the cache with the square of the gradient 
        4. Get the corrected cache 
        5. Update weights 
    
    Args:
        learning_rate (float): Learning rate to scale the gradients by for the update
        decay         (float): Decay rate used to scale learning rate
        epsilon       (float): Hyperparmeter for tuning update
        beta_1        (float): Hyperparameter for calculating momentum 
        beta_2        (float): Hyperparameter for calculating cache

    Attributes:
        lr          (float): Learning rate to scale the gradients by for the update
        clr         (float): L:earning rate at current step
        decay       (float): Decay rate used to scale learning rate
        epsilon     (float): Hyperparmeter for tuning update
        beta_1      (float): Hyperparameter for calculating momentum 
        beta_2      (float): Hyperparameter for calculating cache
        iterations  (int):   Number of times optimizer has completed a step
    """

    IMPLEMENTED = [LinearLayer]

    def __init__(self, learning_rate=0.001, decay=0., epsilon=1e-7, beta_1=0.9, beta_2=0.999) -> None:
        self.lr = learning_rate
        self.clr = learning_rate # current learning rate
        self.decay = decay
        self.epsilon = epsilon
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.iterations = 0

    def pre_update_step(self):
        """Update the current learning rate according to the decay and iterations"""
        decay_rate = 1/(1 + self.decay * self.iterations)
        self.clr = self.lr * decay_rate

    def init_momentum(self, layers):
        """Initializes momentum arttribute for layer objects.
        Args:
            Layers (list): A list of layers that need to be updated with momentum.
        """
        for layer in layers:
            # Init momentum for weights
            layer.momentums_w = np.zeros_like(layer.weights)
            layer.cache_w = np.zeros_like(layer.weights)

            # Init momentums for biases
            layer.momentums_b = np.zeros_like(layer.bias)
            layer.cache_b = np.zeros_like(layer.bias)
            
    def update(self, layers):
        """Update a layers parameters
        Args:
            Layers (list): A list of layers that need to be updated.
        """
        # pre update step
        if self.decay:
           self.pre_update_step()
        
        if self.iterations == 0:
            self.init_momentum(layers)

        # Update step
        for layer in layers:     
            ## Updating momentum 
            layer.momentums_w = self.beta_1 * layer.momentums_w + (1 - self.beta_1) * layer.d_w
            layer.momentums_b = self.beta_1 * layer.momentums_b + (1 - self.beta_1) * layer.d_b

            ## Correcting momentum 
            correction_bias_momentums = 1 - self.beta_1**(self.iterations +1)

            corrected_weights = layer.momentums_w / correction_bias_momentums
            corrected_bias    = layer.momentums_b / correction_bias_momentums

            ## Updating cache
            layer.cache_w = self.beta_2 * layer.cache_w + (1 - self.beta_2) * layer.d_w**2
            layer.cache_b = self.beta_2 * layer.cache_b + (1 - self.beta_2) * layer.d_b**2

            ## Correcting cache
            correction_bias_cache = 1 - self.beta_2**(self.iterations +1)

            corrected_cache_w = layer.cache_w / correction_bias_cache
            corrected_cache_b = layer.cache_b / correction_bias_cache

            ## Updating weights 
            layer.weights += -self.clr * corrected_weights / (np.sqrt(corrected_cache_w) + self.epsilon)

            ## Updating bias
            layer.bias    += -self.clr * corrected_bias / (np.sqrt(corrected_cache_b) + self.epsilon)
        
        # Post update step
        self.iterations += 1



## Defining Utility functions

### One-hot encoding function 

In [15]:
def one_hot_encode_index(y, n):
    return np.eye(n)[y]

#### Testing one hot masker

In [16]:
n=3
y_test = np.array([0,1,2, 1, 2])

one_hot_encode_index(y_test, n)

array([[1., 0., 0.],
       [0., 1., 0.],
       [0., 0., 1.],
       [0., 1., 0.],
       [0., 0., 1.]], dtype=float32)

### Define Accuracy 

In [17]:
def accuracy(y_pred, y_true):
    """Calculates the accuracy of a batch of predictions"""
    return np.mean(np.argmax(y_pred, axis=1) == np.argmax(y_true, axis=1))

## Defining Regularization 

Regularization methods are used to reduce generalization errors, With L1 and L2 regularization we calculate a penalty that we add to the loss if weight and biases are large. We want to see many neurons contribute to the evaluation rather than a few having a large impact.

L1 weight regularization: 

$$ L_{1w} = \lambda\sum_m|w_m| $$

L1 bias regularization: 

$$ L_{1b} = \lambda\sum_n|b_n| $$

L2 weight regularization: 

$$ L_{2w} = \lambda\sum w^2_m $$

L2 bias regularization: 

$$ L_{2b} = \lambda\sum_n b^2_n $$

Overall loss:

$$ \text{Loss} = \text{DataLoss} + L_{1w} + L_{1b} + L_{2w} + L_{2b} $$


To implement these changes we will need to modify the Linear Layer class.

(note: for backpropagation see backpropagation notebook)



## Integration Testing 

### Test with SDG

In [18]:
nnfs.init()
X, y = spiral_data(samples=100, classes=3)
y = one_hot_encode_index(y, 3)

# Initializing Network Components 
relu = ReLU()
softmax = Softmax()
cce_loss = CategoricalCrossEntropyLoss
optimizer = SDG(decay=1e-3, momentum=0.9)
linear1 = LinearLayer(2, 64, lambda_l2_weight=5e-4, lambda_l2_bias=5e-4)
linear2 = LinearLayer(64, 3)

update_layers = [linear1, linear2]

n_epochs = 10000

for epoch in range(n_epochs + 1):
    # Forward Pass
    linear1.forward(X) 
    relu.forward(linear1.output)
    linear2.forward(relu.output)
    y_pred = softmax.forward(linear2.output)

    # Calculating loss and regularized loss  
    loss = cce_loss.forward(y_pred, y)
    # Regularised loss
    rl = sum([l.regularization_loss() for l in update_layers if hasattr(l, 'regularization_loss')])
    regularized_loss = loss + rl

    #Calculating accuracy 
    acc = accuracy(y_pred, y) 

    # Printing results
    if not epoch % 100:
        print(f"Epoch:{epoch}, Loss:{loss:.3f}, Reg loss: {regularized_loss:.3f}, ({rl:.3f}), accuracy:{acc:.3f}")

    # Backward pass
    softmax.backward(y_pred, y)
    linear2.backward(softmax.grad)
    relu.backward(linear2.grad)
    linear1.backward(relu.grad)

    # Optimization Step
    optimizer.update(update_layers)
    

Epoch:0, Loss:1.113, Reg loss: 1.174, (0.061), accuracy:0.360
Epoch:100, Loss:1.042, Reg loss: 1.068, (0.026), accuracy:0.407
Epoch:200, Loss:0.777, Reg loss: 0.839, (0.062), accuracy:0.650
Epoch:300, Loss:0.703, Reg loss: 0.793, (0.089), accuracy:0.680
Epoch:400, Loss:0.607, Reg loss: 0.724, (0.117), accuracy:0.753
Epoch:500, Loss:0.557, Reg loss: 0.679, (0.121), accuracy:0.797
Epoch:600, Loss:0.522, Reg loss: 0.679, (0.157), accuracy:0.790
Epoch:700, Loss:1.524, Reg loss: 1.665, (0.141), accuracy:0.480
Epoch:800, Loss:0.523, Reg loss: 0.661, (0.138), accuracy:0.807
Epoch:900, Loss:0.523, Reg loss: 0.652, (0.129), accuracy:0.790
Epoch:1000, Loss:0.600, Reg loss: 0.738, (0.138), accuracy:0.757
Epoch:1100, Loss:0.441, Reg loss: 0.577, (0.136), accuracy:0.813
Epoch:1200, Loss:0.430, Reg loss: 0.566, (0.136), accuracy:0.827
Epoch:1300, Loss:1.594, Reg loss: 1.739, (0.146), accuracy:0.487
Epoch:1400, Loss:0.379, Reg loss: 0.527, (0.148), accuracy:0.870
Epoch:1500, Loss:0.380, Reg loss: 0.5

In [19]:
# Validating the model 
X_test, y_test = spiral_data(samples=100, classes=3)
y_test = one_hot_encode_index(y_test, 3)

linear1.forward(X_test) 
relu.forward(linear1.output)
linear2.forward(relu.output)
y_pred = softmax.forward(linear2.output)

# Calculating loss and accuracy 
loss = cce_loss.forward(y_pred, y_test)
acc = accuracy(y_pred, y_test) 

print(f'Vallidation: Accuracy: {acc:.3f}, loss: {loss:.3f}')

Vallidation: Accuracy: 0.817, loss: 0.450


### Test with Adam

In [20]:
X, y = spiral_data(samples=1000, classes=3)
y = one_hot_encode_index(y, 3)

# Initializing Network Components 
relu = ReLU()
softmax = Softmax()
cce_loss = CategoricalCrossEntropyLoss
optimizer = Adam(learning_rate=0.05, decay=5e-5)
dropout = Dropout(0.9)
linear1 = LinearLayer(2, 64, lambda_l2_weight=5e-4, lambda_l2_bias=5e-4)
linear2 = LinearLayer(64, 3)

update_layers = [linear1, linear2]

n_epochs = 10000

for epoch in range(n_epochs + 1):
    # Forward Pass
    linear1.forward(X) 
    relu.forward(linear1.output)
    dropout.forward(relu.output)
    linear2.forward(dropout.output)
    y_pred = softmax.forward(linear2.output)

    # Calculating loss and regularized loss  
    loss = cce_loss.forward(y_pred, y)
    # Regularised loss
    rl = sum([l.regularization_loss() for l in update_layers if hasattr(l, 'regularization_loss')])
    regularized_loss = loss + rl

    #Calculating accuracy 
    acc = accuracy(y_pred, y) 

    # Printing results
    if not epoch % 100:
        print(f"Epoch:{epoch}, Loss:{loss:.3f}, Reg loss: {regularized_loss:.3f}, ({rl:.3f}), accuracy:{acc:.3f}")

    # Backward pass
    softmax.backward(y_pred, y)
    linear2.backward(softmax.grad)
    dropout.backward(linear2.grad)
    relu.backward(dropout.grad)
    linear1.backward(relu.grad)

    # Optimization Step
    optimizer.update(update_layers)


Epoch:0, Loss:3.212, Reg loss: 3.276, (0.064), accuracy:0.373
Epoch:100, Loss:0.847, Reg loss: 0.883, (0.036), accuracy:0.622
Epoch:200, Loss:0.712, Reg loss: 0.757, (0.045), accuracy:0.683
Epoch:300, Loss:0.655, Reg loss: 0.705, (0.050), accuracy:0.696
Epoch:400, Loss:0.633, Reg loss: 0.684, (0.051), accuracy:0.707
Epoch:500, Loss:0.602, Reg loss: 0.653, (0.051), accuracy:0.728
Epoch:600, Loss:0.610, Reg loss: 0.660, (0.050), accuracy:0.723
Epoch:700, Loss:0.614, Reg loss: 0.664, (0.049), accuracy:0.722
Epoch:800, Loss:0.591, Reg loss: 0.640, (0.049), accuracy:0.733
Epoch:900, Loss:0.591, Reg loss: 0.639, (0.048), accuracy:0.730
Epoch:1000, Loss:0.584, Reg loss: 0.632, (0.048), accuracy:0.744
Epoch:1100, Loss:0.567, Reg loss: 0.615, (0.047), accuracy:0.741
Epoch:1200, Loss:0.559, Reg loss: 0.605, (0.046), accuracy:0.737
Epoch:1300, Loss:0.558, Reg loss: 0.603, (0.045), accuracy:0.767
Epoch:1400, Loss:0.556, Reg loss: 0.600, (0.044), accuracy:0.755
Epoch:1500, Loss:0.566, Reg loss: 0.6

In [None]:
# Validating the model 
X_test, y_test = spiral_data(samples=100, classes=3)
y_test = one_hot_encode_index(y_test, 3)

linear1.forward(X_test) 
relu.forward(linear1.output)
linear2.forward(relu.output)
y_pred = softmax.forward(linear2.output)

# Calculating loss and accuracy 
loss = cce_loss.forward(y_pred, y_test)
acc = accuracy(y_pred, y_test) 

print(f'Vallidation: Accuracy: {acc:.3f}, loss: {loss:.3f}')

Vallidation: Accuracy: 0.740, loss: 0.585


## Defining Model Object  

In [None]:
class Model:
    """Model class designed as a container to simplify the building and training of networks.
    
    Args:
    optimizer:  The optimizer that should be used
    Loss:       The loss class that should be used

    Attributes:
        layers              (list):  List of all layers in the network in their activation sequence 
        trainable_layers    (list):  List of trainable layers in the network
        loss                ():      The loss class that should be used
        optim               ():      The optimizer that should be used
        current_loss        (float): Latest loss recorded 
        current_accuracy    (float): Latest accuracy recorded
        training_mode       (bool):  Boolean flag if the network is in training mode
    """

    def __init__(self, optimizer, loss) -> None:
        self.layers = []
        self.trainable_layers = []
        self.loss = loss
        self.optim = optimizer
        self.current_loss = 0
        self.current_accuracy = 0
        self.training_mode = True

    def __repr__(self) -> str:
        """Custom dunder representer method to print out all the layers of the network."""
        layer_str = "".join([f"\t ({i}): {type(l).__name__} (Trainable: {l in self.trainable_layers})\n" 
                            for i, l in enumerate(self.layers)])

        return "Model Architecture: \n" + layer_str
    
    def add(self, layer):
        """Appends a single layer to the end of the network"""
        self.layers.append(layer)
        if hasattr(layer, 'weights'):
            self.trainable_layers.append(layer)

    def set_sequence(self, layers):
        """Defines an entire sequence of layers. NOTE: will overwrite current network"""
        self.layers = layers
        self.trainable_layers = [l for l in layers if hasattr(l, 'weights')]

    def get_loss(self, y_pred, y_true):
        """Calculates the current loss of the network"""
        # Calculating loss and regularized loss  
        loss = self.loss.forward(y_pred, y_true)
        # Regularised loss
        rl = sum([l.regularization_loss() for l in self.trainable_layers if hasattr(l, 'regularization_loss')])
        return loss + rl

    def forward(self, X):
        """Handles forward pass through all layers"""
        # First layer
        self.layers[0].forward(X)

        # Rest of layers 
        for i in range(1, len(self.layers)):
            self.layers[i].forward(self.layers[i-1].output)

    def backward(self, y_pred, y_true):
        """Handles backward pass through all layers"""
        # Last layer
        self.layers[-1].backward(y_pred, y_true)

        # Rest of the layers
        for i, l in reversed(list(enumerate(self.layers[:-1]))):
            l.backward(self.layers[i+1].grad)
        
    def logger(self, epoch):
        """Prints current state of model"""
        print(
            f"Epoch: {epoch}, accuracy{self.current_accuracy:.3f}, loss{self.current_loss:.3f}, learning rate {self.optim.clr:.3f} "
        )
    
    def validate(self, X_val, y_val):
        """Handles the validation pass of the network"""
        self.forward(X_val)

        loss = self.loss.forward(self.layers[-1].output, y_val)
        acc  = accuracy(self.layers[-1].output, y_val)

        print(
            f"Validation : Loss: {loss:.3f}, Accuracy: {acc:.3f}"
        )

    def mode_train(self):
        """Sets the model and all dropout layers to training mode."""
        self.training_mode = True
        for l in self.layers:
            if type(l) == Dropout:
                l.training_mode = True
    
    def mode_eval(self):
        """Sets the model and all dropout layers to evaluation mode."""
        self.training_mode = False
        for l in self.layers:
            if type(l) == Dropout:
                l.training_mode = False
    
    def train(self, X, y, epochs=1, log=True, log_freq=100):
        """Handles the trining loop."""
        for epoch in range(epochs + 1):

            # Forward Pass
            self.forward(X)

            # Loss 
            self.current_loss = self.get_loss(self.layers[-1].output, y)

            # accuracy 
            self.current_accuracy = accuracy(self.layers[-1].output, y) 
            
            # Backward Pass
            self.backward(self.layers[-1].output, y)

            # Optimization 
            self.optim.update(self.trainable_layers)

            # Logging 
            if log and not (epoch % log_freq):
                self.logger(epoch)

    

#### Testing Model Object

In [None]:
cce_loss = CategoricalCrossEntropyLoss
optimizer = Adam(learning_rate=0.05, decay=5e-5)

model = Model(optimizer, cce_loss)
model.set_sequence([
    LinearLayer(2, 512, lambda_l2_weight=5e-4, lambda_l2_bias=5e-4),
    ReLU(),
    Dropout(0.9),
    LinearLayer(512, 3)
])
model.add(Softmax())
model

Model Architecture: 
	 (0): LinearLayer (Trainable: True)
	 (1): ReLU (Trainable: False)
	 (2): Dropout (Trainable: False)
	 (3): LinearLayer (Trainable: True)
	 (4): Softmax (Trainable: False)

In [None]:
X, y = spiral_data(samples=1000, classes=3)
y = one_hot_encode_index(y, 3)
model.mode_train()
model.train(X, y, epochs=10000)

Epoch: 0, accuracy0.380, loss1.645, learning rate 0.050 
Epoch: 100, accuracy0.487, loss0.986, learning rate 0.050 
Epoch: 200, accuracy0.688, loss0.839, learning rate 0.050 
Epoch: 300, accuracy0.737, loss0.739, learning rate 0.049 
Epoch: 400, accuracy0.767, loss0.690, learning rate 0.049 
Epoch: 500, accuracy0.776, loss0.686, learning rate 0.049 
Epoch: 600, accuracy0.787, loss0.641, learning rate 0.049 
Epoch: 700, accuracy0.792, loss0.637, learning rate 0.048 
Epoch: 800, accuracy0.821, loss0.584, learning rate 0.048 
Epoch: 900, accuracy0.787, loss0.631, learning rate 0.048 
Epoch: 1000, accuracy0.825, loss0.576, learning rate 0.048 
Epoch: 1100, accuracy0.769, loss0.689, learning rate 0.047 
Epoch: 1200, accuracy0.822, loss0.579, learning rate 0.047 
Epoch: 1300, accuracy0.820, loss0.547, learning rate 0.047 
Epoch: 1400, accuracy0.805, loss0.605, learning rate 0.047 
Epoch: 1500, accuracy0.844, loss0.513, learning rate 0.047 
Epoch: 1600, accuracy0.831, loss0.529, learning rate

In [None]:
X_test, y_test = spiral_data(samples=100, classes=3)
y_test = one_hot_encode_index(y_test, 3)
model.mode_eval()
model.validate(X_test, y_test)

Validation : Loss: 0.379, Accuracy: 0.900
