In [1]:
import numpy as np
import nnfs
from nnfs.datasets import spiral_data

In [2]:
nnfs.init()

In [3]:
# Dense Layer
class Layer_Dense:

    # Layer initialisation
    def __init__(self, n_inputs, n_neurons, weight_regulariser_l1=0, weight_regulariser_l2=0, bias_regulariser_l1=0, bias_regulariser_l2=0):
        # Initialise the weights and the biases
        self.weights = 0.01 * np.random.randn(n_inputs, n_neurons)
        self.biases = np.zeros((1, n_neurons))
        # Set regularisation strength
        self.weight_regulariser_l1 = weight_regulariser_l1
        self.weight_regulariser_l2 = weight_regulariser_l2
        self.bias_regulariser_l1 = bias_regulariser_l1
        self.bias_regulariser_l2 = bias_regulariser_l2

    # Forward pass
    def forward(self, inputs):
        # Remember input values
        self.inputs = inputs
        # Calculate output values from inputs, weights, and biases
        self.output = np.dot(inputs, self.weights) + self.biases

    # Backpropagation
    def backward(self, dvalues):
        # Gradients on parameters
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)

        # Gradients on regularisation, L1 on weights
        if self.weight_regulariser_l1 > 0:
            dL1 = np.ones_like(self.weights)
            dL1[self.weights < 0] = -1
            self.dweights += self.weight_regulariser_l1 * dL1
        # L2 on weights
        if self.weight_regulariser_l2 > 0:
            self.dweights += 2 * self.weight_regulariser_l2 * self.weights

        # L1 on biases
        if self.bias_regulariser_l1 > 0:
            dL1 = np.ones_like(self.biases)
            dL1[self.biases < 0] = -1
            self.dbiases += self.bias_regulariser_l1 * dL1
        # L2 on biases
        if self.bias_regulariser_l2 > 0:
            self.dbiases += 2 * self.bias_regulariser_l2 * self.biases

        # Gradient on values
        self.dinputs = np.dot(dvalues, self.weights.T)

In [4]:
# Dropout
class Layer_Dropout:

    # Init
    def __init__(self, rate):
        # Store rate, we invert it as for example for dropout of 0.1 we need a success rate of 0.9
        self.rate = 1 - rate

    # Forward pass
    def forward(self, inputs):
        # Save input values
        self.inputs = inputs
        # Generate and save scaled mask
        self.binary_mask = np.random.binomial(1, self.rate, size=inputs.shape) / self.rate
        #Apply mask to output values
        self.output = inputs * self.binary_mask

    # Backwards pass
    def backward(self, dvalues):
        # Gradient on values
        self.dinputs = dvalues * self.binary_mask

In [5]:
# ReLU activation
class Activation_ReLU:

    # Forward pass
    def forward(self, inputs):
        # Remember inputs
        self.inputs = inputs
        # Calculate output values from inputs
        self.output = np.maximum(0, inputs)

    # Backpropagation
    def backward(self, dvalues):
        self.dinputs = dvalues.copy()

        # Zero gradient where input values were negative
        self.dinputs[self.inputs <= 0] = 0

In [6]:
# Softmax activation
class Activation_Softmax:

    # Forward pass
    def forward(self, inputs):
        # Remember input values
        self.inputs = inputs

        # Get unnormalised probabilities
        exp_values = np.exp(inputs - np.max(inputs, axis=1, keepdims=True))

        # Normalise them for each sample
        probabilities = exp_values / np.sum(exp_values, axis=1, keepdims=True)

        self.output = probabilities

    # Backpropagation
    def backward(self, dvalues):
        # Create an uninitialised array
        self.dinputs = np.empty_like(dvalues)

        # Enumerate outputs and gradients
        for index, (single_output, single_dvalues) in enumerate(zip(self.output, dvalues)):
            # Flatten output array
            single_output = single_output.reshape(-1, 1)
            # Calculate the Jacobian matrix of the output
            jacobian_matrix = np.diagflat(single_output) - np.dot(single_output, single_output.T)
            # Then calculate the sample-wise gradient and add it to the array of sample gradients
            self.dinputs[index] = np.dot(jacobian_matrix, single_dvalues)

In [7]:
# Sigmoid Activation
class Activation_Sigmoid:

    # Forward pass
    def forward(self, inputs):
        # Save inputs and calculate outputs
        self.inputs = inputs
        self.output = 1 / (1 + np.exp(-inputs))

    # Backward pass
    def backward(self, dvalues):
        # Derivative - calculates from the output of the sigmoid function
        self.dinputs = dvalues * (1 - self.output) * self.output

In [8]:
# Stochastic Gradient Optimiser
class Optimiser_SGD:

    # Initialise parameters - set settings, learning_rate=1. is default for this optimiser
    def __init__(self, learning_rate=1.0, decay=0., momentum=0.):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.momentum = momentum

    # Call once before any parameter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    # Update parameters
    def update_params(self, layer):
        
        # If we use momentum
        if self.momentum:

            # If the layer does not contain momentum arrays, create them filled with zeros
            if not hasattr(layer, 'weight_momentums'):
                layer.weight_momentums = np.zeros_like(layer.weights)
                # If there is no momentum array for weights, then the array doesn't exist for biases either
                layer.bias_momentums = np.zeros_like(layer.biases)

            # Build weight updates with momentum - take previous updates multiplied by retain factor and update with current gradients
            weight_updates = self.momentum * layer.weight_momentums - self.current_learning_rate * layer.dweights
            layer.weight_momentums = weight_updates

            # Build bias updates
            bias_updates = self.momentum * layer.bias_momentums - self.current_learning_rate * layer.dbiases
            layer.bias_momentums = bias_updates

        # Original SGD without momentum
        else:
            weight_updates = -self.current_learning_rate * layer.dweights
            bias_updates = -self.current_learning_rate * layer.dbiases

        # Update weights and biases using either original or momentum updates
        layer.weights += weight_updates
        layer.biases += bias_updates

    # Call once after any parameter updates
    def post_update_params(self):
        self.iterations += 1

In [9]:
class Optimiser_Adagrad:

    # Initialise optimiser - set settings
    def __init__(self, learning_rate=1., decay=0.,  epsilon=1e-7):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon

    # Call once before any parameter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    # Update parameters
    def update_params(self, layer):

        # If layer does not contain cache arrays, create them filled with zeros
        if not hasatrr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)

        # Update cache with squared current gradients
        layer.weight_cache += layer.dweights ** 2
        layer.bias_cache += layer.dbiases ** 2

        # Vanilla SGD parameter update + normalisation with square rooted cache
        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases / (np.sqrt(layer.bias_cache) + self.epsilon)

    def post_update_params(self):
        self.iterations += 1

In [10]:
# RMSprop optimiser
class Optimiser_RMSprop:

    # Initialise optimiser - set settings
    def __init__(self, learning_rate=0.001, decay=0.,  epsilon=1e-7, rho=0.9):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
        self.rho = rho

    # Call once before any parameter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    # Update parameters
    def update_params(self, layer):

        # If layer does not contain cache arrays, create them filled with zeros
        if not hasatrr(layer, 'weight_cache'):
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_cache = np.zeros_like(layer.biases)

        # Update cache with squared current gradients
        layer.weight_cache = self.rho * layer.weight_cache + (1 - self.rho) * layer.dweights**2
        layer.bias_cache = self.rho * layer.bias_cache + (1 - self.rho) * layer.dbiases**2

        # Vanilla SGD parameter update + normalisation with square rooted cache
        layer.weights += -self.current_learning_rate * layer.dweights / (np.sqrt(layer.weight_cache) + self.epsilon)
        layer.biases += -self.current_learning_rate * layer.dbiases / (np.sqrt(layer.bias_cache) + self.epsilon)

    def post_update_params(self):
        self.iterations += 1

In [11]:
class Optimiser_Adam:

    # Initialise optimiser - set settings
    def __init__(self, learning_rate=0.001, decay=0., epsilon=1e-7, beta_1=0.9, beta_2=0.999):
        self.learning_rate = learning_rate
        self.current_learning_rate = learning_rate
        self.decay = decay
        self.iterations = 0
        self.epsilon = epsilon
        self.beta_1 = beta_1
        self.beta_2 = beta_2

    # Call once before any paramter updates
    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * (1. / (1. + self.decay * self.iterations))

    # Update params
    def update_params(self, layer):

        # If the layer does not contain cache arrays, create them filled with zeros
        if not hasattr(layer, 'weight_cache'):
            layer.weight_momentums = np.zeros_like(layer.weights)
            layer.weight_cache = np.zeros_like(layer.weights)
            layer.bias_momentums = np.zeros_like(layer.biases)
            layer.bias_cache = np.zeros_like(layer.biases)

        # Update momentum with current gradients
        layer.weight_momentums = self.beta_1 * layer.weight_momentums + (1 - self.beta_1) * layer.dweights
        layer.bias_momentums = self.beta_1 * layer.bias_momentums + (1 - self.beta_1) * layer.dbiases

        # Get corrected momentum, self.iteration is 0 at first pass and we need it to start with 1
        weight_momentums_corrected = layer.weight_momentums / (1 - self.beta_1 ** (self.iterations + 1))
        bias_momentums_corrected = layer.bias_momentums / (1 - self.beta_1 ** (self.iterations + 1))
        # Update cache with squared current gradients
        layer.weight_cache = self.beta_2 * layer.weight_cache + (1 - self.beta_2) * layer.dweights**2
        layer.bias_cache = self.beta_2 * layer.bias_cache + (1 - self.beta_2) * layer.dbiases**2

        # Get corrected cache
        weight_cache_corrected = layer.weight_cache / (1 - self.beta_2 ** (self.iterations + 1))
        bias_cache_corrected = layer.bias_cache / (1 - self.beta_2 ** (self.iterations + 1))

        # OG SGD parameter update + normalisation with square rooted cache
        layer.weights += -self.current_learning_rate * weight_momentums_corrected / (np.sqrt(weight_cache_corrected) + self.epsilon)
        layer.biases += -self.current_learning_rate * bias_momentums_corrected / (np.sqrt(bias_cache_corrected) + self.epsilon)

    # Call once after any parameter updates
    def post_update_params(self):
        self.iterations += 1

In [12]:
# Common loss class
class Loss:

    # Regularisation loss calculation
    def regularisation_loss(self, layer):

        # 0 by default
        regularisation_loss = 0

        # L1 regularisation - abs(weights)
        # calculate only when factor is greater than 0
        if layer.weight_regulariser_l1 > 0:
            regularisation_loss += layer.weight_regulariser_l1 * np.sum(np.abs(layer.weights))

        # L2 regularisaiton - weights ** 2
        if layer.weight_regulariser_l2 > 0:
            regularisation_loss += layer.weight_regulariser_l2 * np.sum(layer.weights * layer.weights)

        # L1 regularisation - abs(biases)
        # calculate only when factor is greater than 0
        if layer.bias_regulariser_l1 > 0:
            regularisation_loss += layer.bias_regulariser_l1 * np.sum(np.abs(layer.biases))

        # L2 regularisation - biases ** 2
        if layer.bias_regulariser_l2 > 0:
            regularisation_loss += layer.bias_regulariser_l2 * np.sum(layer.biases * layer.biases)

        return regularisation_loss

    # Calculates the data and regularisation losses given model output and ground truth values
    def calculate(self, output, y):

        # Calculate samples losses
        sample_losses = self.forward(output, y)

        # Calculate mean loss
        data_loss = np.mean(sample_losses)

        # Return loss
        return data_loss

In [13]:
# Cross-entropy loss
class Loss_CategoricalCrossentropy(Loss):

    # Forward pass
    def forward(self, y_pred, y_true):

        # Number of samples in a batch
        samples = len(y_pred)

        # Clip data to prevent division by 0
        # Clip both sides to not drag mean towards any value
        y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)

        # Probabilities for target values - only if categorical labels
        if len(y_true.shape) == 1:
            correct_confidences = y_pred_clipped[
            range(samples),
            y_true
        ]

        # Mask values - only for one-hot encoded labels
        elif len(y_true.shape) == 2:
            correct_confidences = np.sum(
                y_pred_clipped * y_true,
                axis=1
            )

        # Losses
        negative_log_likelihoods = -np.log(correct_confidences)
        return negative_log_likelihoods

    # Backpropagation
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)

        # Number of labels in every sample, we'll use the first sample to count them
        labels = len(dvalues[0])

        # If labels are sparse, turn them into one-hot vectors
        if len(y_true.shape) == 1:
            y_true = np.eye(labels)[y_true]

        # Calculate gradient
        self.dinputs = -y_true / dvalues
        # Normalise
        self.dinputs = self.dinputs / samples

In [14]:
# Softmax classifier - combined softmax activation and cross-entropy loss for a faster backward step
class Activation_Softmax_Loss_CategoricalCrossentropy():

    # Create activation and loss function objects
    def __init__(self):
        self.activation = Activation_Softmax()
        self.loss = Loss_CategoricalCrossentropy()

    # Forward pass
    def forward(self, inputs, y_true):
        # Output layer's activation funciton
        self.activation.forward(inputs)
        # Set the output
        self.output = self.activation.output
        # Calculate and return the loss value
        return self.loss.calculate(self.output, y_true)

    # Backpropagation
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)

        # If labels are one-hot encoded, turn them into discrete values
        if len(y_true.shape) == 2:
            y_true = np.argmax(y_true, axis=1)

        # Copy
        self.dinputs = dvalues.copy()
        # Calculate gradient
        self.dinputs[range(samples), y_true] -= 1
        # Normalise gradient
        self.dinputs = self.dinputs / samples

In [15]:
# Binary Cross-Entropy Loss
class Loss_BinaryCrossentropy(Loss):

    # Forward pass
    def forward(self, y_pred, y_true):
        # Clip data
        y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)

        # Calculate sample-wise loss
        sample_losses = -(y_true * np.log(y_pred_clipped) + (1 - y_true) * np.log(1 - y_pred_clipped))
        sample_losses = np.mean(sample_losses, axis=-1)

        # Return losses
        return sample_losses

    # Backward pass
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)

        # Number of outputs, used first to count them
        outputs = len(dvalues[0])

        # Clip
        clipped_dvalues = np.clip(dvalues, 1e-7, 1 - 1e-7)

        # Calculate gradient
        self.dinputs = -(y_true / clipped_dvalues - (1 - y_true) / (1 - clipped_dvalues)) / outputs

        # Normalise
        self.dinputs = self.dinputs / samples

In [24]:
# Linear activation
class Activation_Linear:

    # Forward pass
    def forward(self, inputs):
        self.inputs = inputs
        self.output = inputs

    # Backward pass
    def backward(self, dvalues):
        # derivative is 1, 1 * dvalues = dvalues
        self.dinputs = dvalues.copy()

In [25]:
# Mean Squared Error Loss
class Loss_MeanSquaredError(Loss):

    # Forward pass
    def forward(self, y_pred, y_true):
        # Calculate loss
        sample_losses = np.mean((y_true - y_pred)**2, axis=-1)

        # Return losses
        return samples_losses

    # Backward pass
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)

        # Number of outputs
        outputs = len(dvalues[0])

        # Gradient on values
        self.dinputs = -2 * (y_true - dvalues) / outputs
        # Normalise
        self.dinputs = self.dinputs / sample

In [26]:
# Mean Absolute Error Loss
class Loss_MeanAbsoluteError(Loss): # L1 loss

    # Forward pass
    def forward(self, y_pred, y_true):
        # Calculate loss
        sample_losses = np.mean(np.abs(y_true - y_pred), axis=-1)

        # Return losses
        return sample_losses

    # Backward pass
    def backward(self, dvalues, y_true):
        # Number of samples
        samples = len(dvalues)
        # Number of outputs in every sample
        outputs = len(dvalues[0])

        # Calculate gradient
        self.dinputs = np.sign(y_true - dvalues) / outputs
        # Normalise
        self.dinputs = self.dinputs / samples

In [22]:
# Create dataset
X, y = spiral_data(samples=100, classes=2)

# Reshape labels to be a list of lists
# Inner list contains one output (either 0 or 1)
y = y.reshape(-1, 1)

# Create Dense Laer with 2 input features and 3 output values
dense1 = Layer_Dense(2, 64, weight_regulariser_l2=5e-4, bias_regulariser_l2=5e-4)

# Create ReLu activation (to be used with Dense layer):
activation1 = Activation_ReLU()

# Create second Dense layer with 3 input features (as we take output of previous layer here) and 1 output values
dense2 = Layer_Dense(64, 1)

# Create Sigmoid activation
activation2 = Activation_Sigmoid()

# Create loss function
loss_function = Loss_BinaryCrossentropy()

# Create optimiser
optimiser = Optimiser_Adam(learning_rate=1e-2, decay=5e-7)

# Start the epoch loop for training
for epoch in range(10001):
    
    # Perform a forward pass of our training data through this layer
    dense1.forward(X)
    
    # Perform a forward pass through activation funciton it takes the output of the first dense layer here
    activation1.forward(dense1.output)
    
    # Perform a forward pass through the second Dense layer, it takes output of activation function of first layer as inputs
    dense2.forward(activation1.output)

    # Perform a forward pass through the activation function, taking the outputs of the second dense layer
    activation2.forward(dense2.output)
    
    # Perform a forward pass through activation/loss function, it takes the output of the second dense layer here
    data_loss = loss_function.calculate(activation2.output, y)

    # Calculate regularisation penalty
    regularisation_loss = loss_function.regularisation_loss(dense1) + loss_function.regularisation_loss(dense2)

    # Calculate overall loss
    loss = data_loss + regularisation_loss
    
    # Calculate accuracy from output of activation2 and targets, calculate values along the first axis
    predictions = (activation2.output > 0.5) * 1
    accuracy = np.mean(predictions==y)
    
    if not epoch % 100:
        print(f'epoch: {epoch}, accuaracy: {accuracy:.3f}, loss: {loss:.3f} (data_loss: {data_loss:.3f}, reg_loss: {regularisation_loss:.3f}), lr: {optimiser.current_learning_rate}')
    
    # Backward pass
    loss_function.backward(activation2.output, y)
    activation2.backward(loss_function.dinputs)
    dense2.backward(activation2.dinputs)
    activation1.backward(dense2.dinputs)
    dense1.backward(activation1.dinputs)

    optimiser.pre_update_params()
    optimiser.update_params(dense1)
    optimiser.update_params(dense2)
    optimiser.post_update_params()

epoch: 0, accuaracy: 0.525, loss: 0.693 (data_loss: 0.693, reg_loss: 0.000), lr: 0.01
epoch: 100, accuaracy: 0.705, loss: 0.587 (data_loss: 0.574, reg_loss: 0.013), lr: 0.009999505024501287
epoch: 200, accuaracy: 0.840, loss: 0.478 (data_loss: 0.432, reg_loss: 0.045), lr: 0.009999005098992651
epoch: 300, accuaracy: 0.905, loss: 0.399 (data_loss: 0.333, reg_loss: 0.066), lr: 0.009998505223469092
epoch: 400, accuaracy: 0.920, loss: 0.359 (data_loss: 0.282, reg_loss: 0.077), lr: 0.009998005397923115
epoch: 500, accuaracy: 0.930, loss: 0.337 (data_loss: 0.257, reg_loss: 0.079), lr: 0.009997505622347224
epoch: 600, accuaracy: 0.915, loss: 0.321 (data_loss: 0.242, reg_loss: 0.079), lr: 0.00999700589673393
epoch: 700, accuaracy: 0.930, loss: 0.307 (data_loss: 0.229, reg_loss: 0.078), lr: 0.009996506221075735
epoch: 800, accuaracy: 0.935, loss: 0.296 (data_loss: 0.220, reg_loss: 0.076), lr: 0.00999600659536515
epoch: 900, accuaracy: 0.935, loss: 0.287 (data_loss: 0.213, reg_loss: 0.074), lr: 0

In [23]:
# Validate the model

# Create test dataset
X_test, y_test = spiral_data(samples=100, classes=2)

# Reshape
y_test = y_test.reshape(-1, 1)

# Perform a forward pass of our testing data through this layer
dense1.forward(X_test)

# Perform a forward pass through activation funciton that takes the output of the first dense layer
activation1.forward(dense1.output)

# Perform a forward pass through the second dense layer, that takes the output of the activaiton function
dense2.forward(activation1.output)

# Perform a forward pass through the activation/loss funciton, takes the output of the second dense layer here and returns loss
loss = loss_function.calculate(activation2.output, y_test)

# Caclulate the accuracy from output of activation2 and targets
predictions = (activation2.output > 0.5) * 1
accuracy = np.mean(predictions==y_test)

print(f'validation, acc: {accuracy:.3f}, loss: {loss:.3f}')

validation, acc: 0.950, loss: 0.108
