## Deep Neural Network from Scratch - One step at a time

In [97]:
import numpy as np
import matplotlib.pyplot as plt
import h5py
import collections
import os
import json
import pickle

np.random.seed(42)

### Load the dataset

In [2]:
def load_dataset(path):
    '''
    Function to load the dataset
    '''
    
    f = h5py.File(path, 'r')
    x_key = list(f.keys())[1]
    y_key = list(f.keys())[2]
    
    X_data = f[x_key]
    y_data = f[y_key]
    
    return (X_data, y_data)

In [3]:
X_train, y_train = load_dataset('train_catvnoncat.h5')
X_test, y_test = load_dataset('test_catvnoncat.h5')

print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(209, 64, 64, 3)
(209,)
(50, 64, 64, 3)
(50,)


In [4]:
X_train = np.reshape(X_train, (X_train.shape[0],-1)).T
X_test = np.reshape(X_test, (X_test.shape[0],-1)).T

print(X_train.shape)
print(X_test.shape)

(12288, 209)
(12288, 50)


In [5]:
y_train = np.reshape(y_train, (1,-1))
y_test = np.reshape(y_test, (1,-1))

print(y_train.shape)
print(y_test.shape)

(1, 209)
(1, 50)


In [6]:
X_train = X_train / X_train.max()
X_test = X_test / X_test.max()

## Neural Network Model

In [115]:
class NeuralNetwork(object):
    
    def __init__(self, n_features, n_layers):
        self.n_layers = n_layers + 1
        self.layer_dims = [1] * (n_layers + 1)
        self.activations = [None] * (n_layers + 1)
        self.layer_dims[0] = n_features
        self.activations[0] = None
        self.add_layer_count = 1
    '''
    Helper functions for calculating the activations
    '''
    def sigmoid(self, z):
        s = np.divide(1, (1 + np.exp(-z)))
        return s
    
    def relu(self, z):
        return np.maximum(0,z)
    
    def leaky_relu(self, z):
        return np.maximum(0.01*z,z)
    
    def tanh(self, z):
        return (np.exp(z) - np.exp(-z)) / (np.exp(z) + np.exp(-z))
    
    '''
    Helper functions to calculate the derivative of the activations
    '''
    
    def sigmoid_back(self, z):
        g_prime = np.multiply(self.sigmoid(z), (1 - self.sigmoid(z)))
        return g_prime
    
    def relu_back(self, z):
        g_prime = np.where(z <= 0, 0, 1)
        return g_prime
    
    def leaky_relu_back(self, z):
        g_prime = np.where(z <= 0, 0.01, 1)
        return g_prime
    
    def tanh_back(z):
        g_prime = 1 - (self.tanh(z))**2
        return g_prime
    
    '''
    Main Functions
    '''
    
    def forward_prop(self, X, n_layers, layers_dict, training=True):
        '''
        Performs one forward propagation iteration through entire training set and computes Z and A
        Input: Training data X, number of hidden layers + output layer, layers_dict that contains 
               initial weights and bias, and the activation function for each layer, and a boolean, 
               training, that denotes training or prediction
        Output: Activation at the output layer and cache(Z,A) for each layer stored inside the layers_dict
        
        '''
    
        m = X.shape[1]

        if not training:
            print("m: ",m)
            temp_A = [None]

        for curr_layer in range(1, n_layers):

            if curr_layer == 1:
                A_prev = X
            else:
                if training:
                    A_prev = layers_dict[str(curr_layer - 1)]["cache"]["A"]
                else:
                    A_prev = temp_A[curr_layer-1]

            W = layers_dict[str(curr_layer)]["params"]["W"]
            b = layers_dict[str(curr_layer)]["params"]["b"]

            activation = layers_dict[str(curr_layer)]["activation"]["forward"]

            assert(W.shape[1] == A_prev.shape[0])
            assert(W.shape[0] == b.shape[0])

            Z = np.dot(W, A_prev) + b
            A = eval(activation)(Z) # Calls the layer's corresponding activation function

            assert(A.shape[1] == m)

            if training:
                '''update the dictionary only if it is training'''
                cache = {
                    "A" : A,
                    "Z" : Z,
                }
                layers_dict[str(curr_layer)]["cache"] = cache

            else:
                temp_A.append(A)

        return layers_dict, A
    
    def compute_cost_and_dA(self, y, A):
        '''
        Method to compute the loss/cost and the first derivative dA
        Input: The vector of label of shape (1,m) and Activation of the output layer computed through
               forward propagation(also shape (1,m))
        Output: Cost J and dA, the derivative of J wrt to output layer
        
        '''
    
        m = y.shape[1]

        J = ( -1 / m ) * np.sum( np.multiply(y, np.log(A)) + ( np.multiply((1 - y), np.log(1 - A))) )
        J = np.squeeze(J)

        dA = -( np.divide(y, A) ) + ( np.divide((1 - y), (1 - A)))

        assert(dA.shape == A.shape)
        assert(J.shape == ())

        return J, dA
    
    def backprop(self, X, dA, layers_dict, n_layers):
        '''
        Performs single iteration of backpropagation to compute dW and db for each layer
        Input: Training data X, dA(first derivative of J computed by compute_cost_and_dA method),
               The dictionary containing the parameters, activation, and cache for each layer
        Output: Updated version of input dictionary that contains the derivatives dW and db for each
                layer
        
        '''
    
        m = dA.shape[1]

        for curr_layer in range(n_layers-1, 0, -1):

            if curr_layer != n_layers - 1:
                dA = layers_dict[str(curr_layer + 1)]["derivative"]["dA"]

            if curr_layer == 1:
                A_l_minus_one = X
            else:
                A_l_minus_one = layers_dict[str(curr_layer - 1)]["cache"]["A"]

            Z = layers_dict[str(curr_layer)]["cache"]["Z"]
            W = layers_dict[str(curr_layer)]["params"]["W"]
            activation = layers_dict[str(curr_layer)]["activation"]["backward"]

            #compute dZ[l] = dA[l] * g[l]'(Z[l]), shape: (l,m) * (l,m) = (l,m)
            dZ = np.multiply(dA, eval(activation)(Z))

            #compute dW[l] = (1/m)(dZ[l]. A[l-1].T), shape: (l,m).(m,l-1) = (l,l-1)
            dW = np.multiply((1 / m), np.dot(dZ, A_l_minus_one.T))

            #compute db[l] = (1/m)(sum(dZ) across the rows), shape: sum(l,m) = (l,1)
            db = np.multiply((1 / m), np.sum(dZ, axis=1, keepdims=True))

            #compute dA[l-1] = (W[l].T . dZ[l]), shape: (l-1,l).(l,m) = (l-1,m)
            dA = np.dot(W.T, dZ)

            derivative = {
                "dW" : dW,
                "db" : db,
                "dA" : dA
            }

            layers_dict[str(curr_layer)]["derivative"] = derivative

        return layers_dict
    
    def initialize_dictionary(self, n_layers, layer_dims, activations):
        '''
        Method to create a suitable data structure in the form of dictionary and to initialize 
        weights, bias, and to include the activation functions for each layer
        Input: A list containing the number of hidden units for each layer and another list with
               corresponding activation functions
        Output: A dictionary of dictionaries to store the activation, cache, derivative, params for each 
                layer. Params will contain W (nhl,nhl-1) and b(nhl,1) for each layer initialized with
                random values
        '''
    
        d = collections.defaultdict()

        for curr_layer in range(1, n_layers):

            if activations:
                activation = {
                    'forward' : "self." + activations[curr_layer],
                    'backward' : "self." + activations[curr_layer] + "_back"
                }

            else:
                activation = None

            l_minus_one = layer_dims[curr_layer - 1]
            l = layer_dims[curr_layer]

            W = np.random.randn(l, l_minus_one) * 0.05
            b = np.zeros((l, 1))

            params = {
                "W": W, 
                "b": b
            }

            empty_dict = {
                "activation" : activation,
                "cache" : None,
                "derivative" : None,
                "params" : params
            }

            d[str(curr_layer)] = empty_dict

        return d
    
    def update_parameters(self, alpha, n_layers, layers_dict):
        '''
        Method to update the weights and bias for all the layers
        Input: Learning rate alpha, number of hidden layers + output layer, the main dictionary
        Output: The input dictionary with updated Weights and bias
        '''
    
        for curr_layer in range(1, n_layers):

            W = layers_dict[str(curr_layer)]["params"]["W"]
            b = layers_dict[str(curr_layer)]["params"]["b"]

            dW = layers_dict[str(curr_layer)]["derivative"]["dW"]
            db = layers_dict[str(curr_layer)]["derivative"]["db"]

            assert(W.shape == dW.shape)
            assert(b.shape == db.shape)

            W = W - np.multiply(alpha, dW)
            b = b - np.multiply(alpha, db)

            layers_dict[str(curr_layer)]["params"]["W"] = W
            layers_dict[str(curr_layer)]["params"]["b"] = b

        return layers_dict
    
    def train(self, X_train, y_train, n_layers, layer_dims, activations, epochs, alpha, verbose):
        '''
        Method to initiate the training. Performs initializing the dictionary, forward propagation,
        computes cost, back propagation, and updates the parameters
        Input: Data pertaining to the training
        Output: Updated dictionary that contains all the required data and list of losses for each epoch
        '''

        costs = []

        layers_dict = self.initialize_dictionary(n_layers=n_layers, layer_dims=layer_dims, activations=activations)

        for epoch in range(1, epochs+1):

            layers_dict, A = self.forward_prop(X=X_train, n_layers=n_layers, layers_dict=layers_dict)

            J, dA = self.compute_cost_and_dA(y=y_train, A=A)
            costs.append(J)


            layers_dict = self.backprop(X=X_train, dA=dA, n_layers=n_layers, layers_dict=layers_dict)

            layers_dict = self.update_parameters(alpha=alpha, n_layers=n_layers, layers_dict=layers_dict)

            if verbose and (epoch == 1 or epoch%100 == 0):

                print("Epoch: {}, Loss: {}".format(epoch, J))

        return costs, layers_dict
    
    def get_params(self, n_layers, layers_dict):
        '''
        Method to cherrypick the parametes from the main dictionary
        Input: Main dictionary
        Output: params dictionary that has Weights and bias for each layer
        '''
    
        params = {}
        for curr_layer in range(1, n_layers):

            curr_layer_params = layers_dict[str(curr_layer)]["params"]
            params[str(curr_layer)] = curr_layer_params

        return params
    
    def predict(self, X, n_layers, layers_dict, threshold):
        '''
        Method to make predictions
        Input: The data for which prediction has to be made, the main dictionary, threshold
        Output: A numpy array containing the predictions
        '''
    
        layers_dict, A = self.forward_prop(X=X, n_layers=n_layers, layers_dict=layers_dict, training=False)

        predictions = [(lambda pred: 0 if activation < threshold else 1)(activation) for activation in np.squeeze(A)]
        predictions = np.array(predictions).reshape(1,-1)

        return predictions
    
    def add_layer(self, hidden_units=2, activation='sigmoid'):
        '''
        Method to add a new layer to the NN model
        Input: number of hidden units and the activation function for the layer
        Output: None
        '''
        
        if self.add_layer_count > self.n_layers:
            raise IndexError("Max layer count reached.")
        self.layer_dims[self.add_layer_count] = hidden_units
        if activation not in ['relu', 'leaky_relu', 'sigmoid', 'tanh']:
            raise ValueError("Unknown activation function: {}".format(activation))
        self.activations[self.add_layer_count] = activation
        
        print("Layer " + str(self.add_layer_count) + " added.")
        self.add_layer_count += 1
        print("Add " + str(self.n_layers - self.add_layer_count) + " more layer")
        
    
    def run(self, X_train, y_train, X_test, y_test, epochs=1000, alpha=0.01, verbose=True, threshold=0.5, save_path=None):
        '''
        Method that the user can invoke to run training and check the accuracy
        Input: Training, test data, path to save the weights, epochs, learning rate, threshold
        Output: None
        '''
        if self.add_layer_count != self.n_layers:
            diff = self.n_layers - self.add_layer_count
            raise AssertionError("Add {} more layer to continue..".format(diff))
    
        n_layers = self.n_layers
        layer_dims = self.layer_dims
        activations = self.activations
        
        n_features = X_train.shape[0]
        m_train = X_train.shape[1]
        m_test = X_test.shape[1]

        costs, layers_dict = self.train(X_train, y_train, n_layers, layer_dims, activations, epochs, alpha, verbose)

        yhat_train = self.predict(X_train, n_layers, layers_dict, threshold)
        yhat_test = self.predict(X_test, n_layers, layers_dict, threshold)

        train_acc = 100 - np.mean(np.abs(yhat_train - y_train)) * 100
        test_acc = 100 - np.mean(np.abs(yhat_test - y_test)) * 100

        print("Train Accuracy: {:.4f}".format(train_acc))
        print("Test Accuracy: {:.4f}".format(test_acc))
        
        if save_path:
            if os.path.exists(save_path):
                params = self.get_params(n_layers, layers_dict)
                file_name = os.path.join(save_path, str(epochs) + 'ep.txt')
                with open(file_name, 'w') as txt_write:
                    txt_write.write(str(params))
                print("Weights saved successfully!")

    

## Testing the model

In [132]:
n_features = X_train.shape[0]
model = NeuralNetwork(n_features, 5)
save_path = "F://Conda_Scripts//deeplearning.ai//01_Neural_Networks_and_Deep_Learning//03"

In [133]:
model.add_layer(hidden_units=64, activation='leaky_relu')
model.add_layer(hidden_units=20, activation='leaky_relu')
model.add_layer(hidden_units=10, activation='leaky_relu')
model.add_layer(hidden_units=4, activation='leaky_relu')
model.add_layer(hidden_units=1, activation='sigmoid')

Layer 1 added.
Add 4 more layer
Layer 2 added.
Add 3 more layer
Layer 3 added.
Add 2 more layer
Layer 4 added.
Add 1 more layer
Layer 5 added.
Add 0 more layer


In [134]:
model.run(X_train, y_train, X_test, y_test, epochs=7500, alpha=0.1, save_path=save_path)

Epoch: 1, Loss: 0.6931329354598631
Epoch: 100, Loss: 0.6443976930084547
Epoch: 200, Loss: 0.6439657169446765
Epoch: 300, Loss: 0.6439557043542569
Epoch: 400, Loss: 0.6439492932115576
Epoch: 500, Loss: 0.6439418323842193
Epoch: 600, Loss: 0.643933050684222
Epoch: 700, Loss: 0.6439219784555213
Epoch: 800, Loss: 0.6439076791967533
Epoch: 900, Loss: 0.6438893999495753
Epoch: 1000, Loss: 0.6438638930521184
Epoch: 1100, Loss: 0.6438260050644836
Epoch: 1200, Loss: 0.6437668851508482
Epoch: 1300, Loss: 0.6436641022412896
Epoch: 1400, Loss: 0.6434758977725299
Epoch: 1500, Loss: 0.6430216197878172
Epoch: 1600, Loss: 0.641561342534048
Epoch: 1700, Loss: 0.6319023466503358
Epoch: 1800, Loss: 0.5482072060250862
Epoch: 1900, Loss: 0.6402542721726597
Epoch: 2000, Loss: 0.5206987187428366
Epoch: 2100, Loss: 0.4666431592350448
Epoch: 2200, Loss: 0.382174180174454
Epoch: 2300, Loss: 0.32589551087674623
Epoch: 2400, Loss: 0.2509222705455521
Epoch: 2500, Loss: 0.41140749289783807
Epoch: 2600, Loss: 0.2864