# Deep Learning - Assigment 1

Sagiv Melamed - I.D. 315092239 \
Dan Peled - I.D. 211547013

In [24]:
# imports
import numpy as np
# from util import *
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from typing import Callable
import tensorflow as tf
import time

# Part 1
This section contains the implementations of the forward propagation

In [2]:
def initialize_parameters(layers_dims: list) -> dict:
    """
    Create an ANN architecture depending on layers_dims
    :param layers_dims: list of layers dimentions
    :type layers_dims: list
    :return: dictionary built as follows:
        W: list of matrices representing layer's weights, initialized randomly,
        b: list of biases for each layer, initialized to zero
    :rtype: dict
    """
    # Create W

    W_sizes = [(next_dim, current_dim) for current_dim, next_dim in zip(layers_dims[:-1], layers_dims[1:])]
    W = [np.random.randn(*Wi_size) * np.sqrt(2 / Wi_size[1]) for Wi_size in W_sizes]

    # create b

    b_sizes = layers_dims[1:]
    b = [np.zeros((bi_size, 1)) for bi_size in b_sizes]

    return {
        "W": W,
        "b": b
    }


In [3]:
def linear_forward(A: np.ndarray, W: np.ndarray, B: np.ndarray) -> dict:
    """
    Performing linear forward on NN
    :param A: Activation vector of previous layer
    :type A: np.ndarray
    :param W: Weight matrix of the current layer
    :type W: np.ndarray
    :param B: Bias vector of the current layer
    :type B: np.ndarray
    :return: dictionary built as follows:
        Z: linear component of activation function
        linear_cache: A,W,B
    :rtype: dict
    """
    return {
        "Z": W.dot(A) + B,
        "linear_cache": {
            "A": A,
            "W": W,
            "B": B
        }
    }

Activation functions:

In [4]:
def softmax(Z: np.ndarray) -> dict:
    """
    Applying softmax on Z
    :param Z: the linear component of the activation function
    :type Z: np.ndarray
    :return: dictionary built as follows:
        A: Activation of th layer
        activation_cache: Z
    :rtype: dict
    """
    # To avoid overflow in the exponent we will subtract the max value from z
    # and perform softmax on that. The mathematical proof provided in the report.
    z = np.copy(Z)
    z -= z.max(axis=0)
    return {
        "A": np.exp(z) / np.exp(z).sum(axis=0),
        "activation_cache": {
            "Z": Z
        }
    }


def relu(Z: np.ndarray) -> dict:
    """
        Applying relu on Z
        :param Z: the linear component of the activation function
        :type Z: np.ndarray
        :return: dictionary built as follows:
            A: Activation of th layer
            activation_cache: Z
        :rtype: dict
        """
    return {
        "A": np.maximum(0, Z),
        "activation_cache": {
            "Z": Z
        }
    }


In [5]:
def linear_activation_forward(A_prev: np.ndarray, W: np.ndarray, B: np.ndarray,
                              activation: Callable[[np.ndarray], dict]) -> dict:
    cache = {}
    linear = linear_forward(A_prev, W, B)
    z, linear_cache = linear['Z'], linear['linear_cache']

    active = activation(z)
    a, activation_cache = active['A'], active['activation_cache']

    cache['linear_cache'] = linear_cache
    cache['activation_cache'] = activation_cache

    return {
        "A": a,
        "cache": cache
    }


In [6]:
def L_model_forward(X: np.ndarray, parameters: dict, use_batchnorm: bool = False):
    """

    :param X: matrix of inputs
    :type X: np.ndarray
    :param parameters: a dict like object containing W and b
    :type parameters: dict
    :param use_batchnorm: whether to use batch normalization or not
    :type use_batchnorm: bool
    :return:
        dictionary containing the activation of the ANN represented by the parameters on X and cache actions
    :rtype:
        dict
    """
    cache_list = list()
    a = X

    # Relu layers
    for W_i, b_i in zip(parameters["W"][:-1], parameters["b"][:-1]):
        results = linear_activation_forward(a, W_i, b_i, relu)
        a = results['A']
        if use_batchnorm:
            a = apply_batchnorm(a)

        cache_list.append(results['cache'])

    # Softmax layer
    results = linear_activation_forward(a, parameters["W"][-1], parameters["b"][-1], softmax)
    cache_list.append(results['cache'])
    return results['A'], cache_list



In [7]:
def compute_cost(Al: np.ndarray, Y: np.ndarray):
    """
    Compute loss(cost) using prediction(Al) and true values(Y)
    :param Al:
    :type Al:
    :param Y:
    :type Y:
    :return:
    :rtype:
    """
    return np.sum(Y * np.log(Al)) / -Y.shape[1]

In [8]:
def apply_batchnorm(A: np.ndarray) -> np.ndarray:
    mean = A.mean()
    std = A.std()

    return (A - mean) / np.sqrt(std ** 2 + .0001)  # plus .0001 to avoid zero division

# Part 2

This part contains functions related to the back propagation

In [9]:
def linear_backward(dZ: np.ndarray, cache: dict):
    """
Implements the linear part of the backward propagation process for a single layer
    :param dZ: the gradient of the cost with respect to the linear output of the current laye
    :type dZ: np.ndarray
    :param cache:
    :type cache: dict
    :return:
        tuple of derivatives dA,dW,dB
    :rtype:
    """
    m = cache["A"].shape[1]

    dA = cache["W"].T.dot(dZ)
    dW = dZ.dot(cache['A'].T) / m
    dB = np.sum(dZ, axis=1, keepdims=True) / m
    return dA, dW, dB

In [10]:
def linear_activation_backward(dA: np.ndarray, cache: dict, activation):
    """
    Implements the backward propagation for the LINEAR->ACTIVATION layer. The function first computes dZ and then applies the linear_backward function.
    :param dA: post activation gradient of the current layer
    :type dA: np.ndarray
    :param cache: contains both the linear cache and the activations cache
    :type cache: dict
    :param activation: activation backward function
    :type activation:
    :return:
                tuple of derivatives dA,dW,dB
    :rtype:
    """
    dZ = activation(dA, cache['activation_cache'])
    return linear_backward(dZ, cache['linear_cache'])

In [11]:
def relu_backward(dA: np.ndarray, activation_catch: dict):
    """
    Implements backward propagation for a ReLU unit
    :param dA: the post-activation gradient
    :type dA: np.ndarray
    :param activation_catch: contains Z (stored during the forward propagation)
    :type activation_catch: dict
    :return:
        derivative of Z
    :rtype:
        np.ndarray
    """
    dZ = np.array(dA, copy=True)
    dZ[activation_catch['Z'] <= 0] = 0
    return dZ


def softmax_backward(dA, activation_cache):
    return dA


In [12]:
def l_model_backward(Al: np.ndarray, Y: np.ndarray, caches: list):
    """
    Implement the backward propagation process for the entire network.
    :param Al: the probabilities vector, the output of the forward propagation
    :type Al: np.ndarray
    :param Y: the true labels vector (the "ground truth" - true classifications)
    :type Y: np.ndarray
    :param caches: contains Z (stored during the forward propagation)
    :type caches: dict
    :return:
    gradient of the cost with respect to Z
    :rtype:
    np.ndarray
    """
    layers = len(caches)
    grads = dict()

    ## compute the gradient on predictions

    # softmax layer update
    current_cache = caches[layers - 1]
    dA_last = Al - Y  # gradient of loss function and softmax
    grads[f"dA_{layers - 1}"], grads[f"dW_{layers - 1}"], grads[f"dB_{layers - 1}"] = \
            linear_activation_backward(dA_last, current_cache, softmax_backward)
    # Layers update
    for l in reversed(range(layers - 1)):
        current_cache = caches[l]
        grads[f"dA_{l}"], grads[f"dW_{l}"], grads[f"dB_{l}"] = \
            linear_activation_backward(grads[f"dA_{l + 1}"], current_cache, relu_backward)
        # dA = grads[f"dA_{layers - i}"]
    return grads


In [13]:
def update_parameters(parameters: dict, grads: dict, learning_rate: float):
    """
    Updates parameters using gradient descent
    :param parameters: parameters of the ANN
    :type parameters: dict
    :param grads: – a python dictionary containing the gradients (generated by L_model_backward)
    :type grads: dict
    :param learning_rate: the learning rate used to update
    :type learning_rate: float
    :return:
        Updated parameters of the ANN
    :rtype:
        dict
    """
    for index in range(len(parameters["W"])):
        parameters['W'][index] -= learning_rate * grads[f'dW_{index}']
        parameters['b'][index] -= learning_rate * grads[f'dB_{index}']
    return parameters


# Part 3

This part contains the functions of training and testing a model

In [14]:
def _build_mini_batches(X, Y, batch_size):
    """
    Splits the data set to mini-batches in size of batch_size.
    If the dataset length divides with a remaining by batch_size, another mini-batch will be appended in the size of the remaining.
    :param X:
    :param Y:
    :param batch_size:
    :return: list of mini-batches structured by (x,y)
    """

    # Shuffle the dataset first
    permutation = np.random.permutation(Y.shape[1])
    x_shuffled, y_shuffled = X[:, permutation], Y[:, permutation]

    batches_count, remain = divmod(Y.shape[1], batch_size)
    mini_batches = []
    for i in range(batches_count):
        mini_x = x_shuffled[:, i * batch_size:(i + 1) * batch_size]
        mini_y = y_shuffled[:, i * batch_size:(i + 1) * batch_size]
        mini_batches.append((mini_x, mini_y))

    if remain:
        mini_x = x_shuffled[:, -remain:]
        mini_y = y_shuffled[:, -remain:]
        mini_batches.append((mini_x, mini_y))

    return mini_batches

In [15]:
def L_layer_model(X, Y, layer_dims, learning_rate, num_iterations, batch_size, stop_eps=1e-5):
    """
    Train a model for one epoch.

    :param X: The training data
    :param Y: The training data's labels
    :param layer_dims: iterable of integers represents the number of neurons in every layer.
    :param learning_rate: the learning rate of the model
    :param num_iterations: maximum number of epochs, if the stopping by the validation won't occur.
    :param batch_size: size of 1 individual batch to feed the model at once.
    :param stop_eps: optional, the bound of validation cose change for stop training. default 1e-5.
    :return: tuple of (parameters, costs, metadata).
        parameters are the trained model, costs are the loss for every 100 training update, and metadata about the training - runtime, number of epochs, accuracies of the train and validation, batch size,
    """

    (x_train, y_train), (x_val, y_val) = _split_train_val(X, Y)
    mini_batches = _build_mini_batches(x_train, y_train, batch_size)

    params = initialize_parameters(layer_dims)

    costs = []
    last_val_cost = -np.inf

    update_count = 0  # number of time called to update_parameters

    start = time.perf_counter()
    for i in range(1, num_iterations + 1):
        print(f"Start epoch {i}")
        # Train
        for (x, y) in mini_batches:
            al, caches = L_model_forward(x, params)
            if (update_count + 1) % 100 == 0:
                costs.append(compute_cost(al, y))
            grads = l_model_backward(al, y, caches)
            params = update_parameters(params, grads, learning_rate)
            update_count += 1

        # Validation
        val_al, _ = L_model_forward(x_val, params)
        val_cost = compute_cost(val_al, y_val)
        print(f"End epoch {i} - {val_cost=}")
        if i > 15 and np.abs(val_cost - last_val_cost) < stop_eps:
            # The stopping checking is done only after 15 epochs to avoid fast stopping on high cost
            print(f"Stopping after the validation cost wasn't changed: {last_val_cost=}, {val_cost=}"
                  f" diff={np.abs(val_cost - last_val_cost)}")
            break
        last_val_cost = val_cost

    end = time.perf_counter()

    train_acc = predict(x_train, y_train, params)
    val_acc = predict(x_val, y_val, params)

    return params, costs, {"runtime": end-start, "epochs": i, "val_acc": val_acc, "train_acc": train_acc, "batch_size": batch_size, "mini-batches": len(mini_batches)}


In [16]:
def predict(X, Y, parameters, use_batchnorm: bool=False):
    predicted, _ = L_model_forward(X, parameters, use_batchnorm)
    diff = np.argmax(predicted, axis=0) == np.argmax(Y, axis=0)
    return diff.sum() / len(diff)


# Part 4
Training the model over MNIST dataset.

## Loading the dataset

In [17]:
def _to_matrix(y):
    """
    This function takes the y vector from the MNIST dataset and transform it to one-hot matrix
    :param y:
    :return:
    """
    return np.eye(10)[y].T

In [18]:
def _split_train_val(X, Y, ratio=0.2):
    """
    Splits train set to train and validation
    :param X:
    :param Y:
    :return: (x_train, y_train), (x_val, y_val)
    """
    valdition_count = int(ratio * Y.shape[1])
    val_mask = np.zeros(Y.shape[1])
    val_mask[:valdition_count] = 1
    np.random.shuffle(val_mask)
    val_mask = val_mask.astype(bool)
    x_val, y_val = X[:, val_mask], Y[:, val_mask]
    x_train, y_train = X[:, ~val_mask], Y[:, ~val_mask]

    return (x_train, y_train), (x_val, y_val)

In [19]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

x_train = np.moveaxis(x_train, 0, -1).reshape((784, -1)) / 255.0
x_test = np.moveaxis(x_test, 0, -1).reshape((784, -1)) / 255.0

y_train = _to_matrix(y_train)
y_test = _to_matrix(y_test)

## Training the model

In [25]:
learning_rate = .009
layers = [784, 20, 7, 5, 10]
batch_size = 8

params, c, metadata = L_layer_model(x_train, y_train, layers, learning_rate, 100, batch_size, stop_eps=0.001)

Start epoch 1
End epoch 1 - val_cost=0.46255522330357857
Start epoch 2
End epoch 2 - val_cost=0.3411523715674415
Start epoch 3
End epoch 3 - val_cost=0.2966929967762058
Start epoch 4
End epoch 4 - val_cost=0.28171416594345267
Start epoch 5
End epoch 5 - val_cost=0.2574458456179455
Start epoch 6
End epoch 6 - val_cost=0.2490123689772437
Start epoch 7
End epoch 7 - val_cost=0.23942916159337554
Start epoch 8
End epoch 8 - val_cost=0.2322655953490141
Start epoch 9
End epoch 9 - val_cost=0.22443946040601703
Start epoch 10
End epoch 10 - val_cost=0.22294040319678746
Start epoch 11
End epoch 11 - val_cost=0.21881220560272333
Start epoch 12
End epoch 12 - val_cost=0.21654011975198198
Start epoch 13
End epoch 13 - val_cost=0.21312414726422252
Start epoch 14
End epoch 14 - val_cost=0.21201312329944538
Start epoch 15
End epoch 15 - val_cost=0.2137730146843351
Start epoch 16
End epoch 16 - val_cost=0.21404595887986133
Start epoch 17
End epoch 17 - val_cost=0.2148128118636058
Start epoch 18
End epo

KeyboardInterrupt: 

## Final accuracy of the test set

In [23]:
acc = predict(x_test, y_test, params) * 100
print(f"Test accuracy {acc:.2f}%")

Test accuracy 93.27%
