Load dataset and preprocess

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
from sklearn.preprocessing import OneHotEncoder

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
# Ensure data and dimensions are correct
assert (x_train.shape == (50000, 32, 32, 3))
assert (y_train.shape == (50000, 1))
assert (x_test.shape == (10000, 32, 32, 3))
assert (y_test.shape == (10000,1))

# Preprocessing

# Min-Max scale normalisation,returns values between 0-1
x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))
# Change type to 4 byte float for performance if needed, currently 8 byte float
print(type(x_train[0][0][0][0]))

# flatten x_train and x_test arrays
# New layout (50000, 32x32x3)
x_train = x_train.reshape(x_train.shape[0], -1)
x_test = x_test.reshape(x_test.shape[0], -1)
print(x_train.shape)

# One-hot encode y_train and y_test
encoder = OneHotEncoder(sparse_output=False)
encoder.fit(y_train)
y_train = encoder.transform(y_train)
y_test = encoder.transform(y_test)
print(y_train.shape)

<class 'numpy.float64'>
(50000, 3072)
(50000, 10)


In [2]:
import numpy as np

def sigmoid(z):
    return 1.0 / (1.0 + np.exp(-z))

class Sigmoid:
    def __init__(self):
        self.out = None

    def forward(self, x):
        # x can be any shape
        self.out = sigmoid(x)
        return self.out

    def backward(self, dout):
        # d/dx sigmoid(x) = s * (1 - s)
        return dout * self.out * (1.0 - self.out)


class ReLU:
    def __init__(self):
        self.out = None

    def forward(self, x):
        self.out = np.maximum(0.0, x)
        return self.out

    def backward(self, dout):
        dx = dout.copy()
        dx[self.out <= 0.0] = 0.0
        return dx
    
class LeakyReLU:
    def __init__(self, a):
        self.out = None
        # hyperparam a value, so we can tune it for testing
        self.a = a

    def forward(self, x):
        self.out = np.maximum(x, self.a * x)
        return self.out
    
    def backward(self, dout):
        return np.where(dout >= 0, 1.0, self.a)

In [3]:
import numpy as np
class Softmax:
    def __init__(self):
        self.y_pred = None

    def forward(self, x, y):
        # May be overflow for larger numbers so potentially use different method
        exp_sum = np.sum(np.exp(x))
        self.y_pred = np.exp(x) / sum
        return self.y_pred

    # Using Cross-Entropy-Loss 
    def backward(self, y_actual):
        return self.y_pred - y_actual

In [1]:
import numpy as np

class Dropout:
    def __init__ (self, drop_rate):
        # Noting that the drop_rate resembles the probability that a unit will be set to 0. (e.g. 0.5 for 50%)
        self.drop_rate = drop_rate
        # Mask will store the indices of the units which are kept (set to 1) during training.
        self.mask = None


    def forward(self, x, training=True):
        """
        Performs the forward pass for the dropout layer

        args:
            x (np.array) is the input data
            training (bool) if true, apply dropout, else return input as is.
        
        returns:
            The output data after applying dropout (if training is set to True), or data is just passing through.
        """
        # Mask is created, 1 (True) to keep the unit at 0 (False) to drop it.
        # (1-drop_rate) is the probability of keeping a unit.
        if training:
            self.mask = np.random.rand(*x.shape) > self.drop_rate
            # Multiply the mask by 1/p to maintain the expected value of values.
            return x * self.mask*1/(1-self.drop_rate)
        else:
            return x
    
    def backward(self, d_out):
        """
        Performs the backwards pass for the dropout layer.

        args:
            d_out (np.array) is the gradient from the subsequent layer

        returns:
            np.ndarray is the gradient passed to the preceding layer
        """
        # The gradient only flows through the neurons that weren't dropped in the forward pass, and the same inverted scaling factor
        # 1/(1-drop_rate) is applied to the gradient.
        return d_out * self.mask * 1/(1-self.drop_rate)

Input layer: 3072 nodes<br>
Output classification layer: 10 nodes<br><br>To do:<br> Simplify anything and there may be bugs<br> Implement L1/L2 Regularisers<br>Activation Functions should be chosen per layer, currently it's 1 for the entire network<br>Look at bottom where training and testing is happening, Add metrics for analysis (for graphs)

In [None]:
import numpy as np

class LayerCache:
    """Structured storage for layer computations during forward/backward passes."""
    def __init__(self, input_data):
        self.input = input_data
        self.layers = []
    
    def add_layer(self, z, activation, pre_dropout=None):
        """Store layer computation results."""
        self.layers.append({
            'z': z,
            'activation': activation,
            'pre_dropout': pre_dropout if pre_dropout is not None else activation
        })
    
    def get_layer(self, idx):
        """Retrieve layer data by index."""
        return self.layers[idx]
    
    def get_previous_activation(self, layer_idx):
        """Get the activation from the previous layer, or input if first layer."""
        if layer_idx == 0:
            return self.input
        return self.layers[layer_idx - 1]['activation']
    
    def __len__(self):
        return len(self.layers)


class NeuralNetwork:
    def __init__(self, x_train, y_train, hidden_layers, hidden_layer_sizes, activation_func, optimiser, learning_rate=0.03):
        
        self.inputs = x_train
        self.outputs = y_train
        self.hidden_layers = hidden_layers

        if len(hidden_layer_sizes) != hidden_layers:
            raise ValueError("Neurons array length mismatch with hidden layers amount")
        self.hidden_layers_sizes = hidden_layer_sizes

        if not isinstance(activation_func, (Sigmoid, ReLU, LeakyReLU)):
            raise TypeError("Activation function must be of type Sigmoid, ReLU or LeakyReLU")
        self.activation_func = activation_func

        self.learning_rate = learning_rate

        # Initialize network architecture
        self.num_layers = hidden_layers + 1  # hidden layers + output layer
        self.weights, self.biases = self._initialize_parameters()

        # Layer components
        self.dropout_layers = [Dropout(drop_rate=0.5) for _ in range(hidden_layers)]
        self.softmax = Softmax()

        # Optimizer state
        self.optimiser = optimiser
        if optimiser in ["RMS_Prop", "Adam"]:
            self.r_weights, self.r_biases = self._create_zeroed_matrices()
        if optimiser == "Adam":
            self.adam_m1_weights, self.adam_m1_biases = self._create_zeroed_matrices()
            self.adam_m2_weights, self.adam_m2_biases = self._create_zeroed_matrices()
            self.adam_timestep = 0

    def _initialize_parameters(self):
        """Initialize weights and biases using appropriate strategy."""
        layer_sizes = [self.inputs.shape[1]] + self.hidden_layers_sizes + [self.outputs.shape[1]]
        weights, biases = [], []

        for i in range(len(layer_sizes) - 1):
            fan_in, fan_out = layer_sizes[i], layer_sizes[i + 1]
            
            # Xavier/He initialization
            if isinstance(self.activation_func, Sigmoid):
                scale = np.sqrt(1.0 / fan_in)  # Xavier
            else:  # ReLU, LeakyReLU
                scale = np.sqrt(2.0 / fan_in)  # He initialization
            
            W = np.random.uniform(-1, 1, (fan_out, fan_in)) * scale
            b = np.zeros(fan_out)
            
            weights.append(W)
            biases.append(b)

        return weights, biases

    def _create_zeroed_matrices(self):
        """Create zero-initialized matrices matching parameter shapes."""
        weights_zeros = [np.zeros_like(W) for W in self.weights]
        biases_zeros = [np.zeros_like(b) for b in self.biases]
        return weights_zeros, biases_zeros
    
    def train(self, epochs):
        """Train the network for specified number of epochs."""
        loss_history = []
        batch_size = 64
        
        for epoch in range(epochs):
            x_shuffled, y_shuffled = self._shuffle_dataset()
            epoch_loss = 0
            num_batches = 0
            
            for batch_start in range(0, len(self.inputs), batch_size):
                batch_x = x_shuffled[batch_start:batch_start + batch_size]
                batch_y = y_shuffled[batch_start:batch_start + batch_size]
                
                # Forward pass
                cache = self.forwardpass(batch_x, training=True)
                loss = self._cross_entropy_loss(self.softmax.y_pred, batch_y)
                
                # Backward pass
                weight_grads, bias_grads = self._backprop(cache, batch_y, len(batch_x))
                
                # Update parameters
                self._update_parameters(weight_grads, bias_grads)
                
                epoch_loss += loss
                num_batches += 1
            
            loss_history.append(epoch_loss / num_batches)
        
        return loss_history

    def forwardpass(self, batch_x, training=True):
        """
        Forward propagation through all layers.
        
        Args:
            batch_x: Input batch of shape (batch_size, input_dim)
            training: Whether to apply dropout
        
        Returns:
            LayerCache: Structured cache containing all layer computations
        """
        cache = LayerCache(batch_x)
        current_input = batch_x
        
        # Process hidden layers
        for i in range(self.num_layers - 1):
            z = current_input @ self.weights[i].T + self.biases[i]
            activation = self.activation_func.forward(z)
            
            # Apply dropout if training
            pre_dropout = activation
            if training:
                activation = self.dropout_layers[i].forward(activation, training)
            
            cache.add_layer(z, activation, pre_dropout)
            current_input = activation
        
        # Output layer with softmax
        z_out = current_input @ self.weights[-1].T + self.biases[-1]
        self.softmax.forward(z_out)
        output = self.softmax.y_pred
        cache.add_layer(z_out, output)
        
        return cache

    def _backprop(self, cache, y_true, batch_size):
        """
        Backpropagation through all layers.
        
        Args:
            cache: LayerCache from forward pass
            y_true: True labels
            batch_size: Size of current batch
        
        Returns:
            tuple: (weight_gradients, bias_gradients)
        """
        weight_grads = []
        bias_grads = []
        
        # Start with output layer gradient from softmax
        grad = self.softmax.backward(y_true)
        
        # Backprop through all layers in reverse
        for layer_idx in range(self.num_layers - 1, -1, -1):
            layer_data = cache.get_layer(layer_idx)
            prev_activation = cache.get_previous_activation(layer_idx)
            
            # Compute parameter gradients for this layer
            dW = prev_activation.T @ grad / batch_size
            db = np.sum(grad, axis=0) / batch_size
            
            weight_grads.insert(0, dW)
            bias_grads.insert(0, db)
            
            # Propagate gradient to previous layer (if not at input)
            if layer_idx > 0:
                # Gradient w.r.t. input of this layer
                grad = grad @ self.weights[layer_idx]
                
                # Apply activation function derivative for the PREVIOUS layer (layer we're propagating to)
                # Always use 'z' for activation backward, regardless of activation type
                prev_layer = cache.get_layer(layer_idx - 1)
                grad = grad * self.activation_func.backward(prev_layer['z'])
                
                # Apply dropout backward (only for hidden layers, not output)
                if layer_idx - 1 < len(self.dropout_layers):
                    grad = self.dropout_layers[layer_idx - 1].backward(grad)
        
        return weight_grads, bias_grads

    def _update_parameters(self, weight_grads, bias_grads):
        """Update parameters using the selected optimizer."""
        if self.optimiser == "None":
            self._sgd_update(weight_grads, bias_grads)
        elif self.optimiser == "RMS_Prop":
            self._rmsprop_update(weight_grads, bias_grads)
        elif self.optimiser == "Adam":
            self._adam_update(weight_grads, bias_grads)
        else:
            raise ValueError(f"Unknown optimizer: {self.optimiser}")

    def _sgd_update(self, weight_grads, bias_grads):
        """Standard SGD parameter update."""
        for i in range(len(self.weights)):
            self.weights[i] -= self.learning_rate * weight_grads[i].T
            self.biases[i] -= self.learning_rate * bias_grads[i]

    def _rmsprop_update(self, weight_grads, bias_grads):
        """RMSProp optimizer update."""
        beta = 0.999
        epsilon = 1e-8
        
        for i in range(len(self.weights)):
            self.r_weights[i] = beta * self.r_weights[i] + (1 - beta) * np.square(weight_grads[i].T)
            self.weights[i] -= self.learning_rate * weight_grads[i].T / (np.sqrt(self.r_weights[i]) + epsilon)
            
            self.r_biases[i] = beta * self.r_biases[i] + (1 - beta) * np.square(bias_grads[i])
            self.biases[i] -= self.learning_rate * bias_grads[i] / (np.sqrt(self.r_biases[i]) + epsilon)

    def _adam_update(self, weight_grads, bias_grads):
        """Adam optimizer update."""
        beta1, beta2 = 0.9, 0.999
        epsilon = 1e-8
        self.adam_timestep += 1
        
        for i in range(len(self.weights)):
            # Weight updates
            self.adam_m1_weights[i] = beta1 * self.adam_m1_weights[i] + (1 - beta1) * weight_grads[i].T
            self.adam_m2_weights[i] = beta2 * self.adam_m2_weights[i] + (1 - beta2) * np.square(weight_grads[i].T)
            
            # Bias correction
            m1_corrected = self.adam_m1_weights[i] / (1 - beta1 ** self.adam_timestep)
            m2_corrected = self.adam_m2_weights[i] / (1 - beta2 ** self.adam_timestep)
            
            self.weights[i] -= self.learning_rate * m1_corrected / (np.sqrt(m2_corrected) + epsilon)
            
            # Bias updates
            self.adam_m1_biases[i] = beta1 * self.adam_m1_biases[i] + (1 - beta1) * bias_grads[i]
            self.adam_m2_biases[i] = beta2 * self.adam_m2_biases[i] + (1 - beta2) * np.square(bias_grads[i])
            
            m1_corrected = self.adam_m1_biases[i] / (1 - beta1 ** self.adam_timestep)
            m2_corrected = self.adam_m2_biases[i] / (1 - beta2 ** self.adam_timestep)
            
            self.biases[i] -= self.learning_rate * m1_corrected / (np.sqrt(m2_corrected) + epsilon)

    def _cross_entropy_loss(self, y_pred, y_true):
        """Compute cross-entropy loss with numerical stability."""
        epsilon = 1e-15
        y_pred_clipped = np.clip(y_pred, epsilon, 1 - epsilon)
        return -np.mean(np.sum(y_true * np.log(y_pred_clipped), axis=1))

    def _shuffle_dataset(self):
        """Shuffle training data."""
        indices = np.random.permutation(len(self.inputs))
        return self.inputs[indices], self.outputs[indices]

Basic Training, Add more advanced metrics for analysis

In [None]:
# Train the Neural Network
relu = ReLU()
np.random.seed(20) # Set seed so results are reproducible across runs
nn = NeuralNetwork(x_train, y_train, 3, [5,5,5], relu, "Adam")
loss_history = nn.train(1000)

Basic Testing, Add more advanced metrics for analysis

In [None]:
# Test the Neural Network
y_pred = nn.forwardpass(x_test, training=False)
y_true = np.argmax(y_test, axis=1)
# Convert into boolean array and then sum over all true elements
correct_predictions = np.sum((y_pred == y_true))
accuracy = correct_predictions / len(y_true)