<a href="https://colab.research.google.com/github/azare242/neural-networks-CI-Spring-2023/blob/master/NN-Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from abc import ABC, abstractmethod
import pickle
import tqdm
import matplotlib.pyplot as plt
from random import shuffle
import csv
#from google.colab import drive
from PIL import Image
import os
# drive.mount('/content/drive')
import pandas as pd
mnist_path = {2: '/content/drive/MyDrive/datasets/MNIST/2/', 5: '/content/drive/MyDrive/datasets/MNIST/5/'}
cali_path = {'train': '/content/drive/MyDrive/datasets/california_houses_price/california_housing_train.csv','test': '/content/drive/MyDrive/datasets/california_houses_price/california_housing_test.csv'}


In [2]:
class FC:
    def __init__(self, input_size : int, output_size : int, name : str, initialize_method : str="random"):
        self.input_size = input_size
        self.output_size = output_size
        self.name = name
        self.initialize_method = initialize_method
        self.parameters = [self.initialize_weights(), self.initialize_bias()]
        self.input_shape = None
        self.reshaped_shape = None

    def initialize_weights(self):
        if self.initialize_method == "random":
            # TODO: Initialize weights with random values using np.random.randn
            return np.random.randn(self.output_size, self.input_size) * 0.01

        elif self.initialize_method == "xavier":
            return None

        elif self.initialize_method == "he":
            return None

        else:
            raise ValueError("Invalid initialization method")

    def initialize_bias(self):
        # TODO: Initialize bias with zeros
        return np.zeros((self.output_size, 1))

    def forward(self, A_prev):
        """
        Forward pass for fully connected layer.
            args:
                A_prev: activations from previous layer (or input data)
                A_prev.shape = (batch_size, input_size)
            returns:
                Z: output of the fully connected layer
        """
        # NOTICE: BATCH_SIZE is the first dimension of A_prev
        self.input_shape = A_prev.shape
        A_prev_tmp = np.copy(A_prev)

        # TODO: Implement forward pass for fully connected layer
        if len(A_prev.shape) > 2: # check if A_prev is output of convolutional layer
            batch_size = A_prev.shape[0]
            A_prev_tmp = A_prev_tmp.reshape(batch_size, -1).T
        self.reshaped_shape = A_prev_tmp.shape

        # TODO: Forward part
        W, b = self.parameters
        Z = W @ A_prev_tmp + b
        return Z

    def backward(self, dZ, A_prev):
        """
        Backward pass for fully connected layer.
            args:
                dZ: derivative of the cost with respect to the output of the current layer
                A_prev: activations from previous layer (or input data)
            returns:
                dA_prev: derivative of the cost with respect to the activation of the previous layer
                grads: list of gradients for the weights and bias
        """
        A_prev_tmp = np.copy(A_prev)
        if len(A_prev.shape) > 2: # check if A_prev is output of convolutional layer
            batch_size = A_prev.shape[0]
            A_prev_tmp = A_prev_tmp.reshape(batch_size, -1).T

        # TODO: backward part
        W, b = self.parameters
        dW = dZ @ A_prev_tmp.T / A_prev_tmp.shape[1]
        db = np.sum(dZ, axis=1, keepdims=True) / A_prev_tmp.shape[1]
        dA_prev = W.T @ dZ
        grads = [dW, db]
        # reshape dA_prev to the shape of A_prev
        if len(A_prev.shape) > 2:    # check if A_prev is output of convolutional layer
            dA_prev = dA_prev.T.reshape(self.input_shape)
        return dA_prev, grads

    def update_parameters(self, optimizer, grads):
        """
        Update the parameters of the layer.
            args:
                optimizer: optimizer object
                grads: list of gradients for the weights and bias
        """
        self.parameters = optimizer.update(grads, self.name)

In [3]:
class Conv2D:
    def __init__(self, in_channels, out_channels, name, kernel_size=(1, 1), stride=(1, 1), padding=(1, 1), initialize_method="random"):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.name = name
        self.initialize_method = initialize_method

        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = (stride, stride) if isinstance(stride, int) else stride
        self.padding = (padding, padding) if isinstance(padding, int) else padding
        self.parameters = [self.initialize_weights(), self.initialize_bias()]


    def initialize_weights(self):
        """
        Initialize weights.
        returns:
            weights: initialized kernel with shape: (kernel_size[0], kernel_size[1], in_channels, out_channels)
        """
        # TODO: Implement initialization of weights

        if self.initialize_method == "random":
            return np.random.randn(self.kernel_size[0], self.kernel_size[1], self.in_channels, self.out_channels) * 0.01
        if self.initialize_method == "xavier":
            return None
        if self.initialize_method == "he":
            return None
        else:
            raise ValueError("Invalid initialization method")

    def initialize_bias(self):
        """
        Initialize bias.
        returns:
            bias: initialized bias with shape: (1, 1, 1, out_channels)

        """
        # TODO: Implement initialization of bias
        return np.zeros((1, 1, 1, self.out_channels))


    def target_shape(self, input_shape):
        """
        Calculate the shape of the output of the convolutional layer.
        args:
            input_shape: shape of the input to the convolutional layer
        returns:
            target_shape: shape of the output of the convolutional layer
        """
        # TODO: Implement calculation of target shape
        h = input.shape[1]
        w = input.shape[2]
        H = int((h + 2*self.padding[0] - self.kernel_size[0]) / self.stride[0]) + 1
        W = int((w + 2*self.padding[1] - self.kernel_size[1]) / self.stride[1]) + 1
        return (H, W)

    def pad(self, A, padding, pad_value=0):
        """
        Pad the input with zeros.
        args:
            A: input to be padded
            padding: tuple of padding for height and width
            pad_value: value to pad with
        returns:
            A_padded: padded input
        """
        A_padded = np.pad(A, ((0, 0), (padding[0], padding[0]), (padding[1], padding[1]), (0, 0)), mode="constant", constant_values=(pad_value, pad_value))
        return A_padded

    def single_step_convolve(self, a_slic_prev, W, b):
        """
        Convolve a slice of the input with the kernel.
        args:
            a_slic_prev: slice of the input data
            W: kernel
            b: bias
        returns:
            Z: convolved value
        """
        # TODO: Implement single step convolution
        Z = np.multiply(a_slic_prev, W)   # hint: element-wise multiplication
        Z = np.sum(Z)   # hint: sum over all elements
        Z = np.float(Z + b)    # hint: add bias as type float using np.float(None)
        return Z

    def forward(self, A_prev):
        """
        Forward pass for convolutional layer.
            args:
                A_prev: activations from previous layer (or input data)
                A_prev.shape = (batch_size, H_prev, W_prev, C_prev)
            returns:
                A: output of the convolutional layer
        """
        # TODO: Implement forward pass
        W, b = self.parameters
        (batch_size, H_prev, W_prev, C_prev) = A_prev.shape
        (kernel_size_h, kernel_size_w, C_prev, C) = W.shape
        stride_h, stride_w = self.stride
        padding_h, padding_w = self.padding
        H, W = self.target_shape(A_prev.shape)
        Z = np.zeros((batch_size, H, W, C))
        A_prev_pad = self.pad(A_prev, (padding_h, padding_w)) # hint: use self.pad()
        for i in range(batch_size):
            for h in range(H):
                h_start = h * stride_h
                h_end = h_start + kernel_size_h
                for w in range(W):
                    w_start = w * stride_w
                    w_end = w_start + kernel_size_w
                    for c in range(C):
                        a_slic_prev = A_prev_pad[i, h_start:h_end, w_start:w_end, :]
                        Z[i, h, w, c] = self.single_step_convolve(a_slic_prev, W[:, :, :, c], b[:, :, : c]) # hint: use self.single_step_convolve()
        return Z

    def backward(self, dZ, A_prev):
        """
        Backward pass for convolutional layer.
        args:
            dZ: gradient of the cost with respect to the output of the convolutional layer
            A_prev: activations from previous layer (or input data)
            A_prev.shape = (batch_size, H_prev, W_prev, C_prev)
        returns:
            dA_prev: gradient of the cost with respect to the input of the convolutional layer
            gradients: list of gradients with respect to the weights and bias
        """
        # TODO: Implement backward pass
        W, b = self.parameters
        (batch_size, H_prev, W_prev, C_prev) = A_prev.shape
        (kernel_size_h, kernel_size_w, C_prev, C) = W.shape
        stride_h, stride_w = self.stride
        padding_h, padding_w = self.padding
        H, W = dZ.shape[1:3]
        dA_prev = np.zeros((batch_size, H_prev, W_prev, C_prev))  # hint: same shape as A_prev
        dW = np.zeros_like(W)    # hint: same shape as W
        db = np.zeros_like(b)    # hint: same shape as b
        A_prev_pad = self.pad(A_prev, (padding_h, padding_w)) # hint: use self.pad()
        dA_prev_pad = self.pad(dA_prev, (padding_h, padding_w)) # hint: use self.pad()
        for i in range(batch_size):
            a_prev_pad = A_prev_pad[i]
            da_prev_pad = dA_prev_pad[i]
            for h in range(H):
                for w in range(W):
                    for c in range(C):
                        h_start = h * stride_h
                        h_end = h_start + kernel_size_h
                        w_start = w * stride_w
                        w_end = w_start + kernel_size_w
                        a_slice = a_prev_pad[h_start:h_end, w_start:w_end, :]
                        da_prev_pad[h_start:h_end, w_start:w_end, :] += W[..., c] / dZ[i, h, w, c] # hint: use element-wise multiplication of dZ and W
                        dW[..., c] += a_slice * dZ[i, h, w, c]# hint: use element-wise multiplication of dZ and a_slice
                        db[..., c] += dZ[i, h, w, c] # hint: use dZ
            dA_prev[i, :, :, :] = da_prev_pad[padding_h:-padding_h, padding_w:-padding_w, :] # hint: remove padding (trick: pad:-pad)
        grads = [dW, db]
        return dA_prev, grads

    def update_parameters(self, optimizer, grads):
        """
        Update parameters of the convolutional layer.
        args:
            optimizer: optimizer to use for updating parameters
            grads: list of gradients with respect to the weights and bias
        """
        self.parameters = optimizer.update(grads, self.name)

In [4]:
class MaxPool2D:
    def __init__(self, kernel_size=(3, 3), stride=(1, 1), mode="max"):
        """
        Max pooling layer.
            args:
                kernel_size: size of the kernel
                stride: stride of the kernel
                mode: max or average
            Question:Why we don't need to set name for the layer?
        """
        self.stride = (stride, stride) if isinstance(stride, int) else stride
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.mode = mode

    def target_shape(self, input_shape):
        """
        Calculate the shape of the output of the layer.
            args:
                input_shape: shape of the input
            returns:
                output_shape: shape of the output
        """
        # TODO: Implement target_shape
        H = int((input_shape[1] - self.kernel_size[0]) / self.stride[0] + 1)
        W = int((input_shape[2] - self.kernel_size[1]) / self.stride[1] + 1)
        return H, W

    def forward(self, A_prev):
        """
        Forward pass for max pooling layer.
            args:
                A_prev: activations from previous layer (or input data)
            returns:
                A: output of the max pooling layer
        """
        # TODO: Implement forward pass for max pooling layer
        (batch_size, H_prev, W_prev, C_prev) = A_prev.shape
        (f_h, f_w) = self.kernel_size
        strideh, stridew = self.stride
        H, W = self.target_shape(A_prev.shape)
        A = np.zeros((batch_size, H, W, C_prev))
        for i in range(batch_size):
            for h in range(H):
                h_start = h * strideh
                h_end = h_start + f_h
                for w in range(W):
                    w_start = w * stridew
                    w_end = w_start + f_w
                    for c in range(C_prev):
                        a_prev_slice = A_prev[i, h_start:h_end, w_start:w_end, c]
                        if self.mode == "max":
                            A[i, h, w, c] = np.max(a_prev_slice * self.create_mask_from_window(a_prev_slice))
                        elif self.mode == "average":
                            A[i, h, w, c] = np.mean(a_prev_slice)
                        else:
                            raise ValueError("Invalid mode")

        return A

    def create_mask_from_window(self, x):
        """
        Create a mask from an input matrix x, to identify the max entry of x.
            args:
                x: numpy array
            returns:
                mask: numpy array of the same shape as window, contains a True at the position corresponding to the max entry of x.
        """
        # TODO: Implement create_mask_from_window
        mask = x == np.max(x)
        return mask

    def distribute_value(self, dz, shape):
        """
        Distribute the input value in the matrix of dimension shape.
            args:
                dz: input scalar
                shape: the shape (n_H, n_W) of the output matrix for which we want to distribute the value of dz
            returns:
                a: distributed value
        """
        # TODO: Implement distribute_value
        (n_H, n_W) = shape
        average = dz/ (n_H * n_W)
        a = np.ones(shape) * average
        return a

    def backward(self, dZ, A_prev):
        """
        Backward pass for max pooling layer.
            args:
                dA: gradient of cost with respect to the output of the max pooling layer
                A_prev: activations from previous layer (or input data)
            returns:
                dA_prev: gradient of cost with respect to the input of the max pooling layer
        """
        # TODO: Implement backward pass for max pooling layer
        (f_h, f_w) = self.kernel_size
        strideh, stridew = self.stride
        batch_size, H_prev, W_prev, C_prev = A_prev.shape
        batch_size, H, W, C = dZ.shape
        dA_prev = np.zeros((batch_size, H_prev, W_prev, C_prev))
        for i in range(batch_size):
            for h in range(H):
                for w in range(W):
                    for c in range(C):
                        h_start = h * strideh
                        h_end = h_start + f_h
                        w_start = w * strideh
                        w_end = w_start + f_w
                        if self.mode == "max":
                            a_prev_slice = A_prev[i, h_start:h_end, w_start:w_end, c]
                            mask = self.create_mask_from_window(a_prev_slice)
                            dA_prev[i, h_start:h_end, w_start:w_end, c] += np.multiply(mask, dZ[i, h, w, c])
                        elif self.mode == "average":
                            dz = dZ[i, h, w, c]
                            dA_prev[i, h_start:h_end, w_start:w_end, c] += self.distribute_value(dz, self.kernel_size)
                        else:
                            raise ValueError("Invalid mode")
        # Don't change the return
        return dA_prev, None




In [5]:
class BinaryCrossEntropy:
    def __init__(self) -> None:
        pass

    def compute(self, y_hat: np.ndarray, y: np.ndarray) -> float:
        """
        Computes the binary cross entropy loss.
            args:
                y: true labels (n_classes, batch_size)
                y_hat: predicted labels (n_classes, batch_size)
            returns:
                binary cross entropy loss
        """
        # TODO: Implement binary cross entropy loss
        batch_size = y.shape[1]
        cost = np.log(y_hat - y)
        return np.squeeze(cost)

    def backward(self, y_hat: np.ndarray, y: np.ndarray) -> np.ndarray:
        """
        Computes the derivative of the binary cross entropy loss.
            args:
                y: true labels (n_classes, batch_size)
                y_hat: predicted labels (n_classes, batch_size)
            returns:
                derivative of the binary cross entropy loss
        """
        # hint: use the np.divide function
        # TODO: Implement backward pass for binary cross entropy loss
        return -(y / y_hat) + ((1 - y) / (1 - y_hat))



In [6]:

class MeanSquaredError:
    def __init__(self):
        pass

    def compute(self, y_pred, y_true):
        """
        computes the mean squared error loss
            args:
                y_pred: predicted labels (n_classes, batch_size)
                y_true: true labels (n_classes, batch_size)
            returns:
                mean squared error loss
        """
        # TODO: Implement mean squared error loss
        batch_size = y_pred.shape[1]
        cost = np.sum(np.square(y_pred - y_true)) / (2 * batch_size)
        return np.squeeze(cost)

    def backward(self, y_pred, y_true):
        """
        computes the derivative of the mean squared error loss
            args:
                y_pred: predicted labels (n_classes, batch_size)
                y_true: true labels (n_classes, batch_size)
            returns:
                derivative of the mean squared error loss
        """
        # TODO: Implement backward pass for mean squared error loss
        return (y_pred - y_true) / y_pred.shape[1]

In [7]:
# TODO: Implement the gradient descent optimizer
class GD:
    def __init__(self, layers_list: dict, learning_rate: float):
        """
        Gradient Descent optimizer.
            args:
                layers_list: dictionary of layers name and layer object
                learning_rate: learning rate
        """
        self.learning_rate = learning_rate
        self.layers = layers_list

    def update(self, grads, name):
        """
        Update the parameters of the layer.
            args:
                grads: list of gradients for the weights and bias
                name: name of the layer
            returns:
                params: list of updated parameters
        """
        layer = self.layers[None]
        params = []
        #TODO: Implement gradient descent update
        for i in range(len(grads)):
            params.append(layer.parameters[i] - self.learning_rate * grads[i])
        return params

In [8]:

# TODO: Implement Adam optimizer
class Adam:
    def __init__(self, layers_list, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.layers = layers_list
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.V = {}
        self.S = {}
        for i in range(len(layers_list)):
            # TODO: Initialize V and S for each layer (v and s are lists of zeros with the same shape as the parameters)
            v = [np.zeros_like(p) for p in layers_list[i].parameters]
            s = [np.zeros_like(p) for p in layers_list[i].parameters]
            self.V[i] = v
            self.S[i] = s

    def update(self, grads, name, epoch):
        layer = self.layers[name]
        params = []
        # TODO: Implement Adam update
        for i in range(len(grads)):
            self.V[name][i] = self.beta1 * self.V[name][i] + (1 - self.beta1) * grads[i]
            self.S[name][i] = self.beta2 * self.S[name][i]  +(1 - self.beta2) * np.square(grads[i])
            self.V[name][i] /= (1 - np.power(self.beta1, epoch+1)) # TODO: correct V
            self.S[name][i] /= (1 - np.power(self.beta2, epoch+1)) # TODO: correct S
            params.append(layer.parameters[i] - self.learing_rate * self.V[name][i]/ (np.sqrt(self.S[name][i]) + self.epsilon))
        return params

In [9]:


class Activation:
    def __init__(self) -> None:
        pass

    @abstractmethod
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Forward pass for activation function.
            args:
                Z: input to the activation function
            returns:
                A: output of the activation function
        """
        pass

    @abstractmethod
    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        pass

class Sigmoid(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Sigmoid activation function.
            args:
            x: input to the activation function
            returns:
                sigmoid(x)
        """
        # TODO: Implement sigmoid activation function
        A = 1. / 1. + np.exp(-Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for sigmoid activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        A = self.forward(Z)
        # TODO: Implement backward pass for sigmoid activation function
        dZ = dA * A * (1 - A)
        return dZ


class ReLU(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        ReLU activation function.
            args:
                x: input to the activation function
            returns:
                relu(x)
        """
        # TODO: Implement ReLU activation function
        A = np.maximum(0, Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for ReLU activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        # TODO: Implement backward pass for ReLU activation function
        dZ = np.array(dA, copy=True)
        dZ[Z <= 0] = 0

        return dZ



class Tanh(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Tanh activation function.
            args:
                x: input to the activation function
            returns:
                tanh(x)
        """
        # TODO: Implement tanh activation function
        A = np.tanh(Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for tanh activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        A = self.forward(Z)
        # TODO: Implement backward pass for tanh activation function
        dZ = dA * (1 - np.square(np.tanh(Z)))
        return dZ

class LinearActivation(Activation):
    def linear(Z: np.ndarray) -> np.ndarray:
        """
        Linear activation function.
            args:
                x: input to the activation function
            returns:
                x
        """
        # TODO: Implement linear activation function
        A = Z
        return A

    def backward(dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for linear activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        # TODO: Implement backward pass for linear activation function
        dZ = dA * np.ones_like(Z)
        return dZ

def get_activation(activation: str) -> tuple:
    """
    Returns the activation function and its derivative.
        args:
            activation: activation function name
        returns:
            activation function and its derivative
    """
    if activation == 'sigmoid':
        return Sigmoid
    elif activation == 'relu':
        return ReLU
    elif activation == 'tanh':
        return Tanh
    elif activation == 'linear':
        return LinearActivation
    else:
        raise ValueError('Activation function not supported')

In [10]:
class Model:
    def __init__(self, arch, criterion, optimizer, name=None):
        """
        Initialize the model.
        args:
            arch: dictionary containing the architecture of the model
            criterion: loss
            optimizer: optimizer
            name: name of the model
        """
        if name is None:
            self.model = arch
            self.criterion = criterion
            self.optimizer = optimizer
            self.layers_names = list(arch.keys())
        else:
            self.model, self.criterion, self.optimizer, self.layers_names = self.load_model(name)

    def is_layer(self, layer):
        """
        Check if the layer is a layer.
        args:
            layer: layer to be checked
        returns:
            True if the layer is a layer, False otherwise
        """
        # TODO: Implement check if the layer is a layer
        return isinstance(layer, FC) or isinstance(layer, Conv2D) or isinstance(layer, MaxPool2D)

    def is_activation(self, layer):
        """
        Check if the layer is an activation function.
        args:
            layer: layer to be checked
        returns:
            True if the layer is an activation function, False otherwise
        """
        # TODO: Implement check if the layer is an activation
        return isinstance(layer, Activation)

    def forward(self, x):
        """
        Forward pass through the model.
        args:
            x: input to the model
        returns:
            output of the model
        """
        tmp = []
        A = x
        # TODO: Implement forward pass through the model
        # NOTICE: we have a pattern of layers and activations
        for l in range(len(self.layer_names), 2):
            Z = self.model[self.layer_names[l]].forward(A)
            tmp.append(np.copy(Z))    # hint add a copy of Z to tmp
            A = self.model[self.layer_names[l+1]].forward(Z)
            tmp.append(np.copy(A))    # hint add a copy of A to tmp
        return tmp

    def backward(self, dAL, tmp, x):
        """
        Backward pass through the model.
        args:
            dAL: derivative of the cost with respect to the output of the model
            tmp: list containing the intermediate values of Z and A
            x: input to the model
        returns:
            gradients of the model
        """
        dA = dAL
        grads = {}
        # TODO: Implement backward pass through the model
        # NOTICE: we have a pattern of layers and activations
        # for from the end to the beginning of the tmp list
        for l in range(None):
            if l > 2:
                Z, A = tmp[l - 1], tmp[l - 2]
            else:
                Z, A = tmp[l - 1], x
            dZ = self.model[self.layer_names[l]].backward(dA, Z)
            dA, grad = self.model[self.layer_names[l - 1]].backward(dZ, A)
            grads[self.layers_names[l - 1]] = grad
        return grads

    def update(self, grads):
        """
        Update the model.
        args:
            grads: gradients of the model
        """
        for n in self.layer_names:
            if self.is_layer(self.model[n]) and not (isinstance(self.model[n] ,MaxPool2D)) :    # hint check if the layer is a layer and also is not a maxpooling layer
                self.model[n].update(self.optimizer, grads[n])

    def one_epoch(self, x, y):
        """
        One epoch of training.
        args:
            x: input to the model
            y: labels
            batch_size: batch size
        returns:
            loss
        """
        # TODO: Implement one epoch of training
        tmp = self.forward(x)
        AL = tmp[-1]
        loss = self.criterion.compute(AL, y)
        dAL = self.criterion.backward(AL, y)
        grads =  self.backward(dAL, tmp, x)
        self.update(grads)
        return loss

    def save(self, name):
        """
        Save the model.
        args:
            name: name of the model
        """
        with open(name, 'wb') as f:
            pickle.dump((self.model, self.criterion, self.optimizer, self.layers_names), f)

    def load_model(self, name):
        """
        Load the model.
        args:
            name: name of the model
        returns:
            model, criterion, optimizer, layers_names
        """
        with open(name, 'rb') as f:
            return pickle.load(f)

    def shuffle(self, m, shuffling):
        order = list(range(m))
        if shuffling:
            return np.random.shuffle(order)
        return order

    def batch(self, X, y, batch_size, index, order):
        """
        Get a batch of data.
        args:
            X: input to the model
            y: labels
            batch_size: batch size
            index: index of the batch
                e.g: if batch_size = 3 and index = 1 then the batch will be from index [3, 4, 5]
            order: order of the data
        returns:
            bx, by: batch of data
        """
        # TODO: Implement batch
        last_index = min(((index + 1) * batch_size),
                         len(order))  # hint last index of the batch check for the last batch
        batch = order[(index * batch_size): last_index]
        # NOTICE: inputs are 4 dimensional or 2 demensional
        if len(X.shape) == 2:
            bx = X[:, batch]
            by = y[:, batch]
            return bx, by
        else:
            bx = X[batch]
            by = y[batch]
            return bx, by


    def compute_loss(self, X, y, batch_size):
        """
        Compute the loss.
        args:
            X: input to the model
            y: labels
            Batch_Size: batch size
        returns:
            loss
        """
        # TODO: Implement compute loss
        m = X.shape[0] if len(X.shape) == 4 else X.shape[1]
        order = self.shuffle(m, False)
        cost = 0
        for b in range(m // batch_size):
            bx, by = self.batch(X, y, batch_size, b, order)
            tmp = self.forward(bx)
            AL = tmp[-1]
            cost += self.criterion.compute(AL, y)(m // batch_size)
        return cost


    def train(self, X, y, epochs, val=None, batch_size=3, shuffling=False, verbose=1, save_after=None):
        """
        Train the model.
        args:
            X: input to the model
            y: labels
            epochs: number of epochs
            val: validation data
            batch_size: batch size
            shuffling: if True shuffle the data
            verbose: if 1 print the loss after each epoch
            save_after: save the model after training
        """
        # TODO: Implement training
        train_cost = []
        val_cost = []
        # NOTICE: if your inputs are 4 dimensional m = X.shape[0] else m = X.shape[1]
        m = X.shape[0] if len(X.shape) == 4 else X.shape[1]
        for e in tqdm(range(1, epochs + 1)):
            order = self.shuffle(m, shuffling)
            cost = 0
            for b in range(m // batch_size):
                bx, by = self.batch(X, y, batch_size, b, order)
                cost += (self.one_epoch(X,y)) / (m // batch_size)
            train_cost.append(cost)
            if val is not None:
                val_cost.append(self.compute_loss(val, y, batch_size)) # ********* ERROR PRONE *********
            if verbose != False:
                if e % verbose == 0:
                    print("Epoch {}: train cost = {}".format(e, cost))
                if val is not None:
                    print("Epoch {}: val cost = {}".format(e, val_cost[-1]))
        if save_after is not None:
            self.save(save_after)
        return train_cost, val_cost

    def predict(self, X):
        """
        Predict the output of the model.
        args:
            X: input to the model
        returns:
            predictions
        """
        # TODO: Implement prediction
        return self.forward(X)[-1]
        

In [11]:
arch_model_MNIST = {
    "CONV1": Conv2D(1, 2, name="CONV1", kernel_size=(10, 10), stride=(1, 1), padding=(1, 1)),
    "RELU1": get_activation("relu")(),
    "CONV2": Conv2D(2, 4, name="CONV2", kernel_size=(5, 5), stride=(1, 1), padding=(0, 0)),
    "RELU2": get_activation("relu")(),
    "FC1": FC(16*16, 16, "FC1"),
    "SIGMOMID1": get_activation("sigmoid")(),
    "FC2": FC(16, 1, "FC2"),
    "SIGMOMID2": get_activation("sigmoid")(),
}

criterion = BinaryCrossEntropy()
optimizer = GD(arch_model_MNIST, learning_rate=0.01)

myModel = Model(arch_model_MNIST, criterion, optimizer)


In [None]:
def construct_dataset_mnist():
  directory = os.listdir(mnist_path[2])
  MNIST2FILES = [mnist_path[2] + file_name for file_name in directory]
  directory = os.listdir(mnist_path[5])
  MNIST5FILES = [mnist_path[5] + file_name for file_name in directory]
  print(MNIST2FILES)
  print(MNIST5FILES)

  data = []
  for image_path in MNIST2FILES:
    data.append((np.expand_dims(np.array(Image.open(image_path)) / 255., axis=-1), 0))
  for image_path in MNIST5FILES:
    data.append((np.expand_dims(np.array(Image.open(image_path)) / 255., axis=-1), 1))

  return data


In [None]:
data = construct_dataset_mnist()

In [12]:
arch_model_MNIST

{'CONV1': <__main__.Conv2D at 0x23f9bbe3c40>,
 'RELU1': <__main__.ReLU at 0x23f9bbe17e0>,
 'CONV2': <__main__.Conv2D at 0x23f9bbe0940>,
 'RELU2': <__main__.ReLU at 0x23f8b4b1db0>,
 'FC1': <__main__.FC at 0x23f8b4b09d0>,
 'SIGMOMID1': <__main__.Sigmoid at 0x23f9bbe0d90>,
 'FC2': <__main__.FC at 0x23f9bbe3f70>,
 'SIGMOMID2': <__main__.Sigmoid at 0x23f9bbe3340>}

In [28]:


class Activation:
    def __init__(self) -> None:
        pass

    @abstractmethod
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Forward pass for activation function.
            args:
                Z: input to the activation function
            returns:
                A: output of the activation function
        """
        pass

    @abstractmethod
    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        pass

class Sigmoid(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Sigmoid activation function.
            args:
            x: input to the activation function
            returns:
                sigmoid(x)
        """
        # TODO: Implement sigmoid activation function
        A = 1. / 1. + np.exp(-Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for sigmoid activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        A = self.forward(Z)
        # TODO: Implement backward pass for sigmoid activation function
        dZ = dA * A * (1 - A)
        return dZ


class ReLU(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        ReLU activation function.
            args:
                x: input to the activation function
            returns:
                relu(x)
        """
        # TODO: Implement ReLU activation function
        A = np.maximum(0, Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for ReLU activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        # TODO: Implement backward pass for ReLU activation function
        dZ = np.array(dA, copy=True)
        dZ[Z <= 0] = 0

        return dZ



class Tanh(Activation):
    def forward(self, Z: np.ndarray) -> np.ndarray:
        """
        Tanh activation function.
            args:
                x: input to the activation function
            returns:
                tanh(x)
        """
        # TODO: Implement tanh activation function
        A = np.tanh(Z)
        return A

    def backward(self, dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for tanh activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        A = self.forward(Z)
        # TODO: Implement backward pass for tanh activation function
        dZ = dA * (1 - np.square(np.tanh(Z)))
        return dZ

class LinearActivation(Activation):
    def linear(Z: np.ndarray) -> np.ndarray:
        """
        Linear activation function.
            args:
                x: input to the activation function
            returns:
                x
        """
        # TODO: Implement linear activation function
        A = Z
        return A

    def backward(dA: np.ndarray, Z: np.ndarray) -> np.ndarray:
        """
        Backward pass for linear activation function.
            args:
                dA: derivative of the cost with respect to the activation
                Z: input to the activation function
            returns:
                derivative of the cost with respect to Z
        """
        # TODO: Implement backward pass for linear activation function
        dZ = dA * np.ones_like(Z)
        return dZ

def get_activation(activation: str) -> tuple:
    """
    Returns the activation function and its derivative.
        args:
            activation: activation function name
        returns:
            activation function and its derivative
    """
    if activation == 'sigmoid':
        return Sigmoid
    elif activation == 'relu':
        return ReLU
    elif activation == 'tanh':
        return Tanh
    elif activation == 'linear':
        return LinearActivation
    else:
        raise ValueError('Activation function not supported')

In [29]:
class Model:
    def __init__(self, arch, criterion, optimizer, name=None):
        """
        Initialize the model.
        args:
            arch: dictionary containing the architecture of the model
            criterion: loss
            optimizer: optimizer
            name: name of the model
        """
        if name is None:
            self.model = arch
            self.criterion = criterion
            self.optimizer = optimizer
            self.layers_names = list(arch.keys())
        else:
            self.model, self.criterion, self.optimizer, self.layers_names = self.load_model(name)

    def is_layer(self, layer):
        """
        Check if the layer is a layer.
        args:
            layer: layer to be checked
        returns:
            True if the layer is a layer, False otherwise
        """
        # TODO: Implement check if the layer is a layer
        return isinstance(layer, FC) or isinstance(layer, Conv2D) or isinstance(layer, MaxPool2D)

    def is_activation(self, layer):
        """
        Check if the layer is an activation function.
        args:
            layer: layer to be checked
        returns:
            True if the layer is an activation function, False otherwise
        """
        # TODO: Implement check if the layer is an activation
        return isinstance(layer, Activation)

    def forward(self, x):
        """
        Forward pass through the model.
        args:
            x: input to the model
        returns:
            output of the model
        """
        tmp = []
        A = x
        # TODO: Implement forward pass through the model
        # NOTICE: we have a pattern of layers and activations
        for l in range(len(self.layer_names), 2):
            Z = self.model[self.layer_names[l]].forward(A)
            tmp.append(np.copy(Z))    # hint add a copy of Z to tmp
            A = self.model[self.layer_names[l+1]].forward(Z)
            tmp.append(np.copy(A))    # hint add a copy of A to tmp
        return tmp

    def backward(self, dAL, tmp, x):
        """
        Backward pass through the model.
        args:
            dAL: derivative of the cost with respect to the output of the model
            tmp: list containing the intermediate values of Z and A
            x: input to the model
        returns:
            gradients of the model
        """
        dA = dAL
        grads = {}
        # TODO: Implement backward pass through the model
        # NOTICE: we have a pattern of layers and activations
        # for from the end to the beginning of the tmp list
        for l in range(None):
            if l > 2:
                Z, A = tmp[l - 1], tmp[l - 2]
            else:
                Z, A = tmp[l - 1], x
            dZ = self.model[self.layer_names[l]].backward(dA, Z)
            dA, grad = self.model[self.layer_names[l - 1]].backward(dZ, A)
            grads[self.layers_names[l - 1]] = grad
        return grads

    def update(self, grads):
        """
        Update the model.
        args:
            grads: gradients of the model
        """
        for n in self.layer_names:
            if self.is_layer(self.model[n]) and not (isinstance(self.model[n] ,MaxPool2D)) :    # hint check if the layer is a layer and also is not a maxpooling layer
                self.model[n].update(self.optimizer, grads[n])

    def one_epoch(self, x, y):
        """
        One epoch of training.
        args:
            x: input to the model
            y: labels
            batch_size: batch size
        returns:
            loss
        """
        # TODO: Implement one epoch of training
        tmp = self.forward(x)
        AL = tmp[-1]
        loss = self.criterion.compute(AL, y)
        dAL = self.criterion.backward(AL, y)
        grads =  self.backward(dAL, tmp, x)
        self.update(grads)
        return loss

    def save(self, name):
        """
        Save the model.
        args:
            name: name of the model
        """
        with open(name, 'wb') as f:
            pickle.dump((self.model, self.criterion, self.optimizer, self.layers_names), f)

    def load_model(self, name):
        """
        Load the model.
        args:
            name: name of the model
        returns:
            model, criterion, optimizer, layers_names
        """
        with open(name, 'rb') as f:
            return pickle.load(f)

    def shuffle(self, m, shuffling):
        order = list(range(m))
        if shuffling:
            return np.random.shuffle(order)
        return order

    def batch(self, X, y, batch_size, index, order):
        """
        Get a batch of data.
        args:
            X: input to the model
            y: labels
            batch_size: batch size
            index: index of the batch
                e.g: if batch_size = 3 and index = 1 then the batch will be from index [3, 4, 5]
            order: order of the data
        returns:
            bx, by: batch of data
        """
        # TODO: Implement batch
        last_index = min(((index + 1) * batch_size),
                         len(order))  # hint last index of the batch check for the last batch
        batch = order[(index * batch_size): last_index]
        # NOTICE: inputs are 4 dimensional or 2 demensional
        if len(X.shape) == 2:
            bx = X[:, batch]
            by = y[:, batch]
            return bx, by
        else:
            bx = X[batch]
            by = y[batch]
            return bx, by


    def compute_loss(self, X, y, batch_size):
        """
        Compute the loss.
        args:
            X: input to the model
            y: labels
            Batch_Size: batch size
        returns:
            loss
        """
        # TODO: Implement compute loss
        m = X.shape[0] if len(X.shape) == 4 else X.shape[1]
        order = self.shuffle(m, False)
        cost = 0
        for b in range(m // batch_size):
            bx, by = self.batch(X, y, batch_size, b, order)
            tmp = self.forward(bx)
            AL = tmp[-1]
            cost += self.criterion.compute(AL, y)(m // batch_size)
        return cost


    def train(self, X, y, epochs, val=None, batch_size=3, shuffling=False, verbose=1, save_after=None):
        """
        Train the model.
        args:
            X: input to the model
            y: labels
            epochs: number of epochs
            val: validation data
            batch_size: batch size
            shuffling: if True shuffle the data
            verbose: if 1 print the loss after each epoch
            save_after: save the model after training
        """
        # TODO: Implement training
        train_cost = []
        val_cost = []
        # NOTICE: if your inputs are 4 dimensional m = X.shape[0] else m = X.shape[1]
        m = X.shape[0] if len(X.shape) == 4 else X.shape[1]
        for e in tqdm(range(1, epochs + 1)):
            order = self.shuffle(m, shuffling)
            cost = 0
            for b in range(m // batch_size):
                bx, by = self.batch(X, y, batch_size, b, order)
                cost += (self.one_epoch(X,y)) / (m // batch_size)
            train_cost.append(cost)
            if val is not None:
                val_cost.append(self.compute_loss(val, y, batch_size)) # ********* ERROR PRONE *********
            if verbose != False:
                if e % verbose == 0:
                    print("Epoch {}: train cost = {}".format(e, cost))
                if val is not None:
                    print("Epoch {}: val cost = {}".format(e, val_cost[-1]))
        if save_after is not None:
            self.save(save_after)
        return train_cost, val_cost

    def predict(self, X):
        """
        Predict the output of the model.
        args:
            X: input to the model
        returns:
            predictions
        """
        # TODO: Implement prediction
        return self.forward(X)[-1]
        

In [36]:
arch_model_MNIST = {
    "CONV1": Conv2D(1, 2, name="CONV1", kernel_size=(10, 10), stride=(1, 1), padding=(1, 1)),
    "RELU1": get_activation("relu")(),
    "CONV2": Conv2D(2, 4, name="CONV2", kernel_size=(5, 5), stride=(1, 1), padding=(0, 0)),
    "RELU2": get_activation("relu")(),
    "FC1": FC(16*16, 16, "FC1"),
    "SIGMOMID1": get_activation("sigmoid")(),
    "FC2": FC(16, 1, "FC2"),
    "SIGMOMID2": get_activation("sigmoid")(),
}

criterion = BinaryCrossEntropy()
optimizer = GD(arch_model_MNIST, learning_rate=0.01)

myModel = Model(arch_model_MNIST, criterion, optimizer)


In [64]:
def construct_dataset_mnist():
  directory = os.listdir(mnist_path[2])
  MNIST2FILES = [mnist_path[2] + file_name for file_name in directory]
  directory = os.listdir(mnist_path[5])
  MNIST5FILES = [mnist_path[5] + file_name for file_name in directory]
  print(MNIST2FILES)
  print(MNIST5FILES)

  data = []
  for image_path in MNIST2FILES:
    data.append((np.expand_dims(np.array(Image.open(image_path)) / 255., axis=-1), 0))
  for image_path in MNIST5FILES:
    data.append((np.expand_dims(np.array(Image.open(image_path)) / 255., axis=-1), 1))

  return data


In [65]:
data = construct_dataset_mnist()

['/content/drive/MyDrive/datasets/MNIST/2/img_3176.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_3188.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_507.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_4330.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_2812.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_5840.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_5170.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_2468.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_3292.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_3328.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_3617.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_579.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_239.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_3555.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_2426.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_2800.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_2406.jpg', '/content/drive/MyDrive/datasets/MNIST/2/img_5684.

(array([[[0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.01176471],
         [0.00392157],
         [0.00784314],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.00784314],
         [0.        ],
         [0.01176471],
         [0.        ],
         [0.05882353],
         [0.        ],
         [0.00784314],
         [0.00784314],
         [0.00392157],
         [0.00784314],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ]],
 
        [[0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.        ],
         [0.05882353],
         [0.02352941],
         [0.01960784],
         [0.05098039],
         [0.03529412],
        