<div align=center>

# Implementing a Deep Neural Network

By Hamed Araab

Supervisor: Dr. Marzieh Zarinbal

</div>


## Introduction

This notebook demonstrates a basic implementation of deep neural networks (DNNs)
based on the specifications of `../reference.pdf`.


## Required Libraries

First of all, let's import the required libraries:


In [12]:
import math
import numpy as np

## Deep Neural Network (DNN) Implementation

In this section, we implement two main classes:

- `FullyConnectedLayer`, which will be used to declare a layer in the network,
  and
- `NeuralNetwork`, which will be used to declare the network itself.


In [13]:
class FullyConnectedLayer:
    def __init__(self, units, activation):
        self.units = units
        self.activation = activation


class NeuralNetwork:
    def generate_initial_weights(self, l):
        prev_units = self.input_size if l == 1 else self.layers[l - 1].units
        current_units = self.layers[l].units

        return np.random.normal(
            0.0,
            math.sqrt(2 / current_units),
            size=(prev_units, current_units),
        )

    def __init__(self, input_size, layers, loss, learning_rate):
        self.input_size = input_size
        self.number_of_layers = len(layers)
        self.loss = loss
        self.learning_rate = learning_rate

        self.layers = {l + 1: layers[l] for l in range(self.number_of_layers)}

        self.weights = {
            l: self.generate_initial_weights(l)
            for l in range(1, self.number_of_layers + 1)
        }

        self.biases = {
            l: np.zeros((1, self.layers[l].units))
            for l in range(1, self.number_of_layers + 1)
        }

    @property
    def trainable_parameters(self):
        params = 0

        for l in range(1, self.number_of_layers + 1):
            prev_units = self.input_size if l == 1 else self.layers[l - 1].units
            current_units = self.layers[l].units

            # Weights
            # params += prev_units * current_units

            # Biases
            # params += current_units

            params += (prev_units + 1) * current_units

        return params

    def summary(self):
        print(f"Trainable parameters: {self.trainable_parameters}")

    # Forward Propagation
    def predict(self, X, return_objects=False):
        Z = {}
        A = {0: X}

        for l in range(1, self.number_of_layers + 1):
            Z[l] = A[l - 1] @ self.weights[l] + self.biases[l]
            A[l] = self.layers[l].activation.forward(Z[l])

        if return_objects:
            return Z, A
        else:
            Y_hat = A[self.number_of_layers]

            return Y_hat

    # The training phase (forward + backward propagation on training dataset).
    def train(self, X, Y, epochs=10, batch_size=32, test_data=None):
        n, _ = X.shape

        # The total number of mini-batches.
        batches = math.ceil(n / batch_size)

        for epoch in range(epochs):
            # Shuffle the training data for each epoch.
            permutation = np.random.permutation(n)

            X = X[permutation, :]
            Y = Y[permutation, :]

            loss_epoch = 0

            # Run the training phase for each mini-batch.
            for batch in range(batches):
                start_index = batch * batch_size
                end_index = min(start_index + batch_size, n)

                X_batch = X[start_index:end_index, :]
                Y_batch = Y[start_index:end_index, :]

                Z_batch, A_batch = self.predict(X_batch, return_objects=True)

                Y_hat_batch = A_batch[self.number_of_layers]

                loss_epoch += self.loss.forward(Y_batch, Y_hat_batch)

                self.update_parameters(Y_batch, Z_batch, A_batch)

            cost_epoch = loss_epoch / n

            details = (
                f"Epoch: {epoch + 1}/{epochs}, cost: {'{:.3f}'.format(cost_epoch)}"
            )

            # Calculate the loss for the test split.
            if test_data is not None:
                X_test, Y_test = test_data

                n_test, _ = X_test.shape

                Y_hat_test = self.predict(X_test)

                loss_test_epoch = self.loss.forward(Y_test, Y_hat_test)
                cost_test_epoch = loss_test_epoch / n_test

                details += f", cost_test: {'{:.3f}'.format(cost_test_epoch)}"

            # Print the details containing the prediction cost for this epoch.
            print(details)

    # Backward propagation.
    def update_parameters(self, Y, Z, A):
        n, _ = Y.shape

        Y_hat = A[self.number_of_layers]

        dJ_dY_hat = self.loss.backward(Y, Y_hat)

        dC_dA = {self.number_of_layers: dJ_dY_hat / n}

        for l in range(self.number_of_layers, 0, -1):
            dA_dZ_l = self.layers[l].activation.backward(Z[l])

            # Slow, only accepts dA_dZ_l with shape (n, m, n, m).
            # delta_l = np.tensordot(dC_dA[l], dA_dZ_l)

            # Fast, accepts dA_dZ_l with shapes (n, m, n, m), (n, m, m), and (n, m).
            match len(dA_dZ_l.shape):
                case 4:
                    subscripts = "ij,ijuv->uv"
                case 3:
                    subscripts = "ij,ijv->iv"
                case 2:
                    subscripts = "ij,ij->ij"

            delta_l = np.einsum(subscripts, dC_dA[l], dA_dZ_l)

            dC_dW_l = A[l - 1].T @ delta_l
            dC_db_l = np.ones((1, n)) @ delta_l

            self.weights[l] -= self.learning_rate * dC_dW_l
            self.biases[l] -= self.learning_rate * dC_db_l

            if l > 1:
                dC_dA[l - 1] = delta_l @ self.weights[l].T

In [14]:
epsilon = 1e-7

## Activation Functions

Now, we are going to write down the activation functions based on the following
class:


In [15]:
class ActivationFunction:
    def forward(self, Z_l):
        raise NotImplementedError()

    def backward(self, Z_l):
        raise NotImplementedError()

Further on, you are going to see that two versions of `backward` are implemented
for each activation function. The first one uses a for-loop and is commented out
due to being extremely slow. The second one is quite fast since it uses a
vectorized approach.


### Rectified Linear Unit (ReLU)


In [16]:
class ReLU(ActivationFunction):
    def forward(self, Z_l):
        return np.maximum(0, Z_l)

    def backward(self, Z_l):
        # Slow, returns an (n, m, n, m) tensor.
        # n, m = Z_l.shape
        # i, j = np.indices((n, m))

        # jacobian = np.zeros((n, m, n, m))

        # jacobian[i, j, i, j] = (Z_l[i, j] > 0).astype(int)

        # return jacobian
        
        # Fast, returns an (n, m) matrix.
        return (Z_l > 0).astype(int)

### Sigmoid


In [17]:
class Sigmoid(ActivationFunction):
    def forward(self, Z_l):
        return 1 / (1 + np.exp(-Z_l))

    def backward(self, Z_l):
        # Slow, returns an (n, m, n, m) tensor.
        # n, m = Z_l.shape
        # i, j = np.indices((n, m))

        # jacobian = np.zeros((n, m, n, m))

        # A_l = self.forward(Z_l)

        # jacobian[i, j, i, j] = A_l[i, j] * (1 - A_l[i, j])

        # return jacobian

        # Fast, returns an (n, m) matrix.
        A_l = self.forward(Z_l)

        return A_l * (1 - A_l)

### Softmax


In [18]:
class Softmax(ActivationFunction):
    def forward(self, Z_l):
        exp_Z_l = np.exp(Z_l)

        return exp_Z_l / np.sum(exp_Z_l, axis=1, keepdims=True)

    def backward(self, Z_l):
        # Slow, returns an (n, m, n, m) tensor.
        # n, m = Z_l.shape
        # i, j, v = np.indices((n, m, m))

        # jacobian = np.zeros((n, m, n, m))

        # A_l = self.forward(Z_l)

        # jacobian[i, j, i, v] = A_l[i, j] * ((j == v).astype(int) - A_l[i, v])

        # return jacobian

        # Fast, returns and (n, m, m) tensor.
        n, m = Z_l.shape
        i, j, v = np.indices((n, m, m))

        derivative = np.zeros((n, m, m))

        A_l = self.forward(Z_l)

        derivative[i, j, v] = A_l[i, j] * ((j == v).astype(int) - A_l[i, v])

        return derivative

## Loss Functions

Similarly, we will be implementing the loss functions based on this class:


In [19]:
class LossFunction:
    def forward(self, Y, Y_hat):
        raise NotImplementedError()

    def backward(self, Y, Y_hat):
        raise NotImplementedError()

### Sum of Squared Errors (SSE)


In [20]:
class SSELoss(LossFunction):
    def forward(self, Y, Y_hat):
        return 1 / 2 * np.sum((Y_hat - Y) ** 2)

    def backward(self, Y, Y_hat):
        return Y_hat - Y

### Binary Cross Entropy (BCE)


In [21]:
class BCELoss(LossFunction):
    def forward(self, Y, Y_hat):
        # We clip `Y_hat` to avoid overflow since `log(0)` is undefined.
        Y_hat = np.clip(Y_hat, epsilon, 1 - epsilon)

        return -np.sum((Y * np.log(Y_hat) + (1 - Y) * np.log(1 - Y_hat)))

    def backward(self, Y, Y_hat):
        # We clip `Y_hat` to avoid division by zero.
        Y_hat = np.clip(Y_hat, epsilon, 1 - epsilon)

        return (Y_hat - Y) / (Y_hat * (1 - Y_hat))

### Categorical Cross Entropy (CCE)


In [22]:
class CCELoss(LossFunction):
    def forward(self, Y, Y_hat):
        # We clip `Y_hat` to avoid overflow since `log(0)` is undefined.
        Y_hat = np.clip(Y_hat, epsilon, None)

        return -np.sum((Y * np.log(Y_hat)))

    def backward(self, Y, Y_hat):
        # We clip `Y_hat` to avoid division by zero.
        Y_hat = np.clip(Y_hat, epsilon, None)

        return -Y / Y_hat