# Creating a neural network from scratch

Big shoutout to the book which guides this project: "Neural Networks from Scratch in Python" by Harrison Kinsley & Daniel Kukieła

## Imports

In [18]:
import numpy as np
from nnfs.datasets import spiral_data

## Layer Classes

In [19]:
class Layer_Dense: 
    def __init__(self, n_input, n_neurons, weight_regularizer_l1=0, weight_regularizer_l2=0, bias_regularizer_l1=0, bias_regularizer_l2=0):
        self.weights = 0.01 * np.random.randn(n_input, n_neurons)
        self.biases = np.zeros((1, n_neurons))
        # adding l1 and l2 regularization 
        self.weight_regularizer_l1 = weight_regularizer_l1
        self.weight_regularizer_l2 = weight_regularizer_l2
        self.bias_regularizer_l1 = bias_regularizer_l1
        self.bias_regularizer_l2 = bias_regularizer_l2

    def forward(self, inputs): 
        self.inputs = inputs
        self.output = np.dot(inputs, self.weights) + self.biases

    def backward(self, dvalues): 
        # gradients on parameters for updating
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)
        
        # backpropagating the l1 and l2 regularization 
        #   on weights 
        if self.weight_regularizer_l1 > 0:
            dL1 = np.ones_like(self.weights)
            dL1[self.weights < 0] = -1
            self.dweights += self.weight_regularizer_l1 * dL1
        if self.weight_regularizer_l2 > 0:
            self.dweights += 2 * self.weight_regularizer_l2 * self.weights
        
        #   on biaeses 
        if self.bias_regularizer_l1 > 0:
            dL1 = np.ones_like(self.biases)
            dL1[self.biases < 0] = -1
            self.dbiases += self.bias_regularizer_l1 * dL1
        if self.bias_regularizer_l2 > 0:
            self.dbiases += 2 * self.bias_regularizer_l2 * self.biases

        # gradients on values for next steps                
        self.dinputs = np.dot(dvalues, self.weights.T)
        

In [None]:
class Layer_Dropout:
    # Init
    def __init__(self, rate):
        # invert to get success rate 
        self.rate = 1 - rate

    def forward(self, inputs):
        self.inputs = inputs
        # Dropout only helpful in training => not wanted when predicting 
        # To make that possible: scaling the data back by dividing self.rate to mimic the mean of the sum
        #   => now in prediction the values are not (on average) bigger than when training 
        self.binary_mask = np.random.binomial(1, self.rate, size=inputs.shape) / self.rate
        # Apply mask to output values
        self.output = inputs * self.binary_mask
        
    def backward(self, dvalues):
        self.dinputs = dvalues * self.binary_mask

## Activation Functions

In [20]:
class Activation_ReLU: 
    def forward(self, inputs): 
        self.inputs = inputs
        self.output = np.maximum(0, inputs)
    
    def backward(self, dvalues): 
        self.dinputs = dvalues.copy() # copy because of modifying 
        self.dinputs[self.inputs <= 0] = 0

In [21]:
class Activation_Softmax: 
    def forward(self, inputs):
        exp_value = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))  # subtracting max value for numerical stability => result will not change!
        self.output = exp_value / np.sum(exp_value, axis=1, keepdims=True)

    def backward(self, dvalues):
        # dinputs contains the gradiants
        # in softmax the result is a jacobian matrix 
        # =>    calculating the partial derivatives of every ouput of the softmax output with respect to each input seperately 
        # =>    at the end, every batch will be summed with the np.dot function to not return a 3D array but 2D array
        self.dinputs = np.empty_like(dvalues) 
        for index, (single_output, single_dvalues) in enumerate(zip(self.output, dvalues)): 
            single_output = single_output.reshape(-1, 1)
            jacobian_matrix = np.diagflat(single_output) - np.dot(single_output, single_output.T)
            # Calculate sample-wise gradient and add it to the array of sample gradients
            self.dinputs[index] = np.dot(jacobian_matrix, single_dvalues)

In [22]:
class Activation_Sigmoid: 
    def forward(self, input): 
        pass

## Loss Functions

In [23]:
# parent class
class Loss: 
    def calculate(self, output, y):
        sample_loss = self.forward(output, y)
        data_loss = np.mean(sample_loss)
        return data_loss
    # l1 and l2 regularization loss calculation on one layer 
    def regularization_loss(self, layer): 
        # 0 by default
        regularization_loss = 0
        # L1 regularization - weights
        # calculate only when factor greater than 0
        if layer.weight_regularizer_l1 > 0:
            regularization_loss += layer.weight_regularizer_l1 * np.sum(np.abs(layer.weights))
        # L2 regularization - weights
        if layer.weight_regularizer_l2 > 0:
            regularization_loss += layer.weight_regularizer_l2 * np.sum(layer.weights * layer.weights)
        # L1 regularization - biases
        # calculate only when factor greater than 0
        if layer.bias_regularizer_l1 > 0:
            regularization_loss += layer.bias_regularizer_l1 * np.sum(np.abs(layer.biases))
        # L2 regularization - biases
        if layer.bias_regularizer_l2 > 0:
            regularization_loss += layer.bias_regularizer_l2 *  np.sum(layer.biases * layer.biases)
        return regularization_loss

In [24]:
class Loss_CategoricalCrossEntropy(Loss): 
    def forward(self, y_pred, y_true): 
        samples = len(y_pred)
        y_pred_clipped = np.clip(y_pred, 1e-7, 1-1e-7) # to prevent 0 => log(0) is undefined

        """
        example: 
        
        y_pred_clipped = [[0.7, 0.1, 0.2],
                          [0.1, 0.5, 0.4],
                          [0.02, 0.9, 0.08]]
        """
        
        # single dimension => categorical labels
        if len(y_true.shape) == 1: # if is 1D-Array:
            # e.g. y_true = [1, 0, 0]
            correct_confidences = y_pred_clipped[range(samples), y_true]
    
        # 2 dimensions => one-hot encoded labels
        elif len(y_true.shape) == 2: 
            # e.g. y_true = [[0, 1, 0], [1, 0, 0], [1, 0, 0]]
            correct_confidences = np.sum(y_pred_clipped * y_true, axis=1)            
        else: 
            raise Exception("Please use a different shape for y_true: shape = {0}".format(y_true.shape))
        negative_log_likelihoods = -np.log(correct_confidences)
        return negative_log_likelihoods
    
    def backward(self, dvalues, y_true): 
        samples = len(dvalues) 
        labels = len(dvalues[0]) # number of labels in every sample

        # ensure one-hot encoding
        if len(y_true.shape) == 1: 
            y_true = np.eye(labels)[y_true]
        
        # calculate gradient 
        self.dinputs = -y_true / dvalues # derivative of loss function (-log(x)) => because of one-hot encoding the correct values get updated

        # normalize gradient => optimizers sum all the gradients before multiplying them with the learning rate 
        self.dinputs = self.dinputs / samples

In [25]:
# in case of using softmax and categorical corssentropy loss functions we can simplify 
# the backpropagation and therefore greatly enhance our performance 

class Activation_Softmax_Loss_CategoricalCrossentropy(Loss):
    def __init__(self):
        self.activation = Activation_Softmax()
        self.loss = Loss_CategoricalCrossEntropy()
    
    def forward(self, inputs, y_true): 
        self.activation.forward(inputs)
        self.output = self.activation.output
        return self.loss.calculate(self.output, y_true)

    def backward(self, dvalues, y_true): 
        samples = len(dvalues)
        # if one-hot encoded turn into discrete values
        if len(y_true.shape) == 2: 
            y_true = np.argmax(y_true, axis=1) # axis 1: sample-wise performing 
        self.dinputs = dvalues.copy() # copy to not modify existing array
        # calc gradients
        self.dinputs[range(samples), y_true] -= 1
        # normalize gradients 
        self.dinputs /= samples  

## Optimizers

In [26]:
# SGD is updating the weights and biases based on the momentum of the previous gradients 
#   and the new gradient
class Optimizer_SGD(): 
    def __init__(self, learning_rate=1.0, decay=0.0, momentum=0.0):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.momentum = momentum

    def pre_update_params(self): 
        if self.decay: 
            self.current_learning_rate = self.learning_rate * (1 / (1 + self.decay * self.iterations))

    def update_params(self, layer):
        if self.momentum: 
            # if layer does not contain momentum arrays => create it
            if not hasattr(layer, "weight_momentums"):
                layer.weight_momentums = np.zeros_like(layer.weights)
                layer.bias_momentums = np.zeros_like(layer.biases) # if weights do not have momentum array then also biases
            
            # build weight update with momentum
            weight_updates = self.momentum * layer.weight_momentums - self.current_learning_rate * layer.dweights
            layer.weight_momentums = weight_updates # update momentum in layer for next update
            # same for biases
            bias_updates = self.momentum * layer.bias_momentums - self.current_learning_rate * layer.dbiases
            layer.bias_momentums = bias_updates
            
        # optimizing gradients without momentum
        else: 
            weight_updates = -self.current_learning_rate * layer.dweights 
            bias_updates = -self.current_learning_rate * layer.dbiases

        # update weights and biases
        layer.weights += weight_updates
        layer.biases += bias_updates

    def post_update_params(self): 
        self.iterations += 1

In [27]:
# AdaGrad is very similar to SGD but it adds the idea of normalizing updates made to the featues
#   it is not commonly used
#   => in our example not as good as SGD!
class Optimizer_Adagrad:
    def __init__(self, learning_rate=1., decay=0., epsilon=1e-7):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    def update_params(self, layer):
        if not hasattr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)
        layer.weight_cache += layer.dweights**2
        layer.bias_cache += layer.dbiases**2
        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases / (np.sqrt(layer.bias_cache) + self.epsilon)

    def post_update_params(self):
        self.iterations += 1

In [28]:
# RMSProp is very similiar to AdaGrad cache
#   adds smoother learning rate changes 
#   uses moving average of cache instead of squared gradients like AdaGrad
#   => in our example not as good as SGD!
class Optimizer_RMSprop:
    def __init__(self, learning_rate=0.001, decay=0., epsilon=1e-7, rho=0.9):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
        self.rho = rho
    
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))
    
    def update_params(self, layer):
        if not hasattr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)
        layer.weight_cache = self.rho * layer.weight_cache + (1 - self.rho) * layer.dweights**2
        layer.bias_cache = self.rho * layer.bias_cache + (1 - self.rho) * layer.dbiases**2
        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases /  (np.sqrt(layer.bias_cache) + self.epsilon)
        
    def post_update_params(self):
        self.iterations += 1

In [29]:
# Adam is a mix between SGD and RMSProp and combines the two principles 
#   meaning it applies momentum like SGD and then 
#   applies per-weight adaptive learning rate with the cache like RMSProp
# Is the most used optimizer!
class Optimizer_Adam:
    def __init__(self, learning_rate=0.001, decay=0., epsilon=1e-7, beta_1=0.9, beta_2=0.999):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
        self.beta_1 = beta_1
        self.beta_2 = beta_2
    
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))
    
    def update_params(self, layer):
        if not hasattr(layer, 'weight_cache'):
            layer.weight_momentums = np.zeros_like(layer.weights)
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_momentums = np.zeros_like(layer.biases)
            layer.bias_cache = np.zeros_like(layer.biases)

        layer.weight_momentums = self.beta_1 * layer.weight_momentums + (1 - self.beta_1) * layer.dweights
        layer.bias_momentums = self.beta_1 * layer.bias_momentums + (1 - self.beta_1) * layer.dbiases

        weight_momentums_corrected = layer.weight_momentums / (1 - self.beta_1 ** (self.iterations + 1))
        bias_momentums_corrected = layer.bias_momentums / (1 - self.beta_1 ** (self.iterations + 1))

        layer.weight_cache = self.beta_2 * layer.weight_cache + (1 - self.beta_2) * layer.dweights**2
        layer.bias_cache = self.beta_2 * layer.bias_cache + (1 - self.beta_2) * layer.dbiases**2

        # Get corrected cache
        weight_cache_corrected = layer.weight_cache / (1 - self.beta_2 ** (self.iterations + 1))
        bias_cache_corrected = layer.bias_cache / (1 - self.beta_2 ** (self.iterations + 1))
        
        layer.weights += -self.current_learning_rate * weight_momentums_corrected / \
            (np.sqrt(weight_cache_corrected) + self.epsilon)
        layer.biases += -self.current_learning_rate * bias_momentums_corrected / \
        (np.sqrt(bias_cache_corrected) + self.epsilon)

    def post_update_params(self):
        self.iterations += 1

## Testing 

In [30]:
import matplotlib.pyplot as plt
def plot_progress(name, epoch, X, y, predict): 
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.01),
                        np.arange(y_min, y_max, 0.01))

    # Flatten the meshgrid points and make predictions
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = np.array([predict(x) for x in grid_points])
    Z = Z.reshape(xx.shape)

    colors = ['red', 'blue', 'green']


    # Use a colormap for both the scatter plot and contourf plot
    cmap = plt.cm.viridis # Choose the desired colormap

    # Plot the decision boundaries using filled contours with the specified colormap
    plt.contourf(xx, yy, Z, levels=np.arange(len(np.unique(y)) + 1) - 0.5, cmap=cmap, alpha=0.8)

    # Scatter plot for the original data points with the same colormap
    scatter = plt.scatter(X[:, 0], X[:, 1], c=y, cmap=cmap, edgecolors='k', marker='.', s=40, alpha=0.8)

    # Create a legend
    legend_labels = np.unique(y)
    legend_handles = [plt.Line2D([0], [0], marker='o', color='w', label=label, markerfacecolor=cmap(label)) for label in legend_labels]
    plt.legend(handles=legend_handles, title='Classes')

    # Show the plot
    plt.savefig(f"img/{name}_{epoch}")


In [31]:
# calculate the time difference when using the optimized "activation_softmax_loss_categoricalcrossentropy" way
from timeit import timeit
import nnfs

nnfs.init()
softmax_outputs = np.array([[0.7, 0.1, 0.2],
                            [0.1, 0.5, 0.4],
                            [0.02, 0.9, 0.08]])
class_targets = np.array([0, 1, 1])

def f1():
    softmax_loss = Activation_Softmax_Loss_CategoricalCrossentropy()
    softmax_loss.backward(softmax_outputs, class_targets)
    dvalues1 = softmax_loss.dinputs
   
def f2():
    activation = Activation_Softmax()
    activation.output = softmax_outputs
    loss = Loss_CategoricalCrossEntropy()
    loss.backward(softmax_outputs, class_targets)
    activation.backward(loss.dinputs)
    dvalues2 = activation.dinputs
    


t1 = timeit(lambda: f1(), number=10000)
t2 = timeit(lambda: f2(), number=10000)

print("The optimized backward pass is {0} times faster!".format(t2/t1)) 

The optimized backward pass is 5.84487092210354 times faster!


In [33]:
# Create dataset    
X, y = spiral_data(samples=1000, classes=3)

# Dense layer with 2 input features and 3 output values
dense1 = Layer_Dense(2, 512, weight_regularizer_l2=5e-4, bias_regularizer_l2=5e-4)
activation1 = Activation_ReLU()
dropout1 = Layer_Dropout(0.1)
dense2 = Layer_Dense(512, 3)
loss_activation = Activation_Softmax_Loss_CategoricalCrossentropy()
optimizer = Optimizer_Adam(learning_rate=0.05, decay=5e-5)


softmax = Activation_Softmax()

def predict_class(x): 
    dense1.forward(x)
    activation1.forward(dense1.output)
    dense2.forward(activation1.output)
    softmax.forward(dense2.output)
    return np.argmax(softmax.output, axis=1)


# loop the training process
epochs = 10001
for epoch in range(epochs): 

    # forward pass
    dense1.forward(X)
    activation1.forward(dense1.output)
    dropout1.forward(activation1.output)
    dense2.forward(dropout1.output)

    data_loss = loss_activation.forward(dense2.output, y)
    # calculating and adding the regularization loss of each layer
    regularization_loss = loss_activation.regularization_loss(dense1) + loss_activation.regularization_loss(dense2)
    loss = data_loss + regularization_loss

    predictions = np.argmax(loss_activation.output, axis=1)
    if len(y.shape) == 2:
        y = np.argmax(y, axis=1)
    accuracy = np.mean(predictions==y)
    
    # backward pass
    loss_activation.backward(loss_activation.output, y)
    dense2.backward(loss_activation.dinputs)
    dropout1.backward(dense2.dinputs)
    activation1.backward(dropout1.dinputs)
    dense1.backward(activation1.dinputs)

    if not epoch % 200:
        print(
            f'epoch: {epoch}, ' +
            f'acc: {accuracy:.3f}, ' +
            f'loss: {loss:.3f} (' +
            f'data_loss: {data_loss:.3f}, ' +
            f'reg_loss: {regularization_loss:.3f}), ' +
            f'lr: {optimizer.current_learning_rate}')
        # if wanted: plotting the learning progress of the network 
        # plot_progress("spiral_data", epoch, X, y, predict_class)



    # update params based on calculated gradients
    optimizer.pre_update_params()
    optimizer.update_params(dense1)
    optimizer.update_params(dense2)
    optimizer.post_update_params()

# validation of our model: 
X_test, y_test = spiral_data(samples=100, classes=3)

dense1.forward(X_test)
activation1.forward(dense1.output)
dense2.forward(activation1.output)

loss = loss_activation.forward(dense2.output, y_test)
predictions = np.argmax(loss_activation.output, axis=1)
if len(y_test.shape) == 2:
    y_test = np.argmax(y_test, axis=1)
accuracy = np.mean(predictions==y_test)

print(f'validation, acc: {accuracy:.3f}, loss: {loss:.3f}')

epoch: 0, acc: 0.309, loss: 1.099 (data_loss: 1.099, reg_loss: 0.000), lr: 0.02
epoch: 200, acc: 0.766, loss: 0.728 (data_loss: 0.653, reg_loss: 0.074), lr: 0.019998010197985302
epoch: 400, acc: 0.835, loss: 0.606 (data_loss: 0.509, reg_loss: 0.096), lr: 0.01999601079584623
epoch: 600, acc: 0.847, loss: 0.554 (data_loss: 0.458, reg_loss: 0.096), lr: 0.01999401179346786
epoch: 800, acc: 0.858, loss: 0.522 (data_loss: 0.430, reg_loss: 0.092), lr: 0.0199920131907303
epoch: 1000, acc: 0.859, loss: 0.500 (data_loss: 0.411, reg_loss: 0.088), lr: 0.019990014987513734
epoch: 1200, acc: 0.868, loss: 0.479 (data_loss: 0.393, reg_loss: 0.086), lr: 0.019988017183698373
epoch: 1400, acc: 0.870, loss: 0.462 (data_loss: 0.377, reg_loss: 0.085), lr: 0.019986019779164473
epoch: 1600, acc: 0.877, loss: 0.444 (data_loss: 0.361, reg_loss: 0.083), lr: 0.01998402277379235
epoch: 1800, acc: 0.882, loss: 0.433 (data_loss: 0.351, reg_loss: 0.081), lr: 0.019982026167462367
epoch: 2000, acc: 0.879, loss: 0.425 (