# Building convolutional neural network in Numpy
---

***Author: Piotr Skalski***

### Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import os
from mlxtend.data import loadlocal_mnist

%load_ext autoreload
%autoreload 2

### Auxiliary function downloading the dataset

In [2]:
def download_mnist_dataset():
    # The MNIST data set is available at http://yann.lecun.com, let's use curl to download it
    if not os.path.exists("train-images-idx3-ubyte"):
        !curl -O http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
        !curl -O http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
        !curl -O http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
        !curl -O http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
        !gunzip t*-ubyte.gz
        
    # Let's use loadlocal_mnist available in mlxtend.data to get data in numpy array form.
    X1, y1 = loadlocal_mnist(
        images_path="train-images-idx3-ubyte", 
        labels_path="train-labels-idx1-ubyte")

    X2, y2 = loadlocal_mnist(
        images_path="t10k-images-idx3-ubyte", 
        labels_path="t10k-labels-idx1-ubyte")
    
    # We normalize the brightness values for pixels
    X1 = X1.reshape(X1.shape[0], -1) / 255
    X2 = X2.reshape(X2.shape[0], -1) /255

    X = np.concatenate([X1, X2])
    y = np.concatenate([y1, y2])
    
    return X, y

In [3]:
X, y = download_mnist_dataset()

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 9680k  100 9680k    0     0   354k      0  0:00:27  0:00:27 --:--:--  226k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 28881  100 28881    0     0   117k      0 --:--:-- --:--:-- --:--:--  117k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1610k  100 1610k    0     0   175k      0  0:00:09  0:00:09 --:--:--  163k     0   185k      0  0:00:08  0:00:03  0:00:05  185k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  4542  100  4542    0     0  36926      0 --:--:-- --:--:-- --:--:-- 36926

In [4]:
X.shape

(70000, 784)

In [16]:
y.shape

(70000,)

### Settings

In [19]:
# number of samples in the data set
N_SAMPLES = 1000
# ratio between training and test sets
TEST_SIZE = 0.1
# size of the photo
PHOTO_SIZE = 28
# number of pixels in the photo
PIXEL_NUMBER = PHOTO_SIZE * PHOTO_SIZE
# neural network architecture
NN_ARCHITECTURE = [
    {"input_dim": PIXEL_NUMBER, "output_dim": 1000, "activation": "relu"},
    {"input_dim": 1000, "output_dim": 1000, "activation": "relu"},
    {"input_dim": 1000, "output_dim": 500, "activation": "relu"},
    {"input_dim": 500, "output_dim": 500, "activation": "relu"},
    {"input_dim": 500, "output_dim": 10, "activation": "sigmoid"},
]

### Adaptation of the existing implementation to support multiple classes

In [5]:
def sigmoid(Z):
    return 1/(1+np.exp(-Z))

def relu(Z):
    return np.maximum(0,Z)

def sigmoid_backward(dA, Z):
    sig = sigmoid(Z)
    return dA * sig * (1 - sig)

def relu_backward(dA, Z):
    dZ = np.array(dA, copy = True)
    dZ[Z <= 0] = 0
    return dZ

In [6]:
def init_layers(nn_architecture, seed = 99):
    np.random.seed(seed)
    number_of_layers = len(nn_architecture)
    params_values = {}
    
    for idx, layer in enumerate(nn_architecture):
        layer_idx = idx + 1
        
        layer_input_size = layer["input_dim"]
        layer_output_size = layer["output_dim"]
        
        params_values['W' + str(layer_idx)] = np.random.randn(
            layer_output_size, layer_input_size) * 0.1
        params_values['b' + str(layer_idx)] = np.random.randn(
            layer_output_size, 1) * 0.1
        
    return params_values

In [7]:
def single_layer_forward_propagation(A_prev, W_curr, b_curr, activation="relu"):
    Z_curr = np.dot(W_curr, A_prev) + b_curr
    
    if activation is "relu":
        activation_func = relu
    elif activation is "sigmoid":
        activation_func = sigmoid
    else:
        raise Exception('Non-supported activation function')
        
    return activation_func(Z_curr), Z_curr

In [8]:
def full_forward_propagation(X, params_values, nn_architecture):
    memory = {}
    A_curr = X
    
    for idx, layer in enumerate(nn_architecture):
        layer_idx = idx + 1
        A_prev = A_curr
        
        activ_function_curr = layer["activation"]
        W_curr = params_values["W" + str(layer_idx)]
        b_curr = params_values["b" + str(layer_idx)]
        A_curr, Z_curr = single_layer_forward_propagation(A_prev, W_curr, b_curr, activ_function_curr)
        
        memory["A" + str(idx)] = A_prev
        memory["Z" + str(layer_idx)] = Z_curr
       
    return A_curr, memory

In [9]:
def get_cost_value(Y_hat, Y, eps = 0.001):
    m = Y_hat.shape[1]
    cost = -1 / m * (np.dot(Y, np.log(Y_hat + eps).T) + np.dot(1 - Y, np.log(1 - Y_hat  + eps).T))
    return np.squeeze(cost)

In [10]:
def convert_prob_into_class(probs):
    probs_ = np.copy(probs)
    probs_[probs_ > 0.5] = 1
    probs_[probs_ <= 0.5] = 0
    return probs_


def get_accuracy_value(Y_hat, Y):
    Y_hat_ = convert_prob_into_class(Y_hat)
    return (Y_hat_ == Y).all(axis=0).mean()

In [11]:
def single_layer_backward_propagation(dA_curr, W_curr, b_curr, Z_curr, A_prev, activation="relu"):
    m = A_prev.shape[1]
    
    if activation is "relu":
        backward_activation_func = relu_backward
    elif activation is "sigmoid":
        backward_activation_func = sigmoid_backward
    else:
        raise Exception('Non-supported activation function')
    
    dZ_curr = backward_activation_func(dA_curr, Z_curr)
    dW_curr = np.dot(dZ_curr, A_prev.T) / m
    db_curr = np.sum(dZ_curr, axis=1, keepdims=True) / m
    dA_prev = np.dot(W_curr.T, dZ_curr)

    return dA_prev, dW_curr, db_curr

In [12]:
def full_backward_propagation(Y_hat, Y, memory, params_values, nn_architecture, eps = 0.000000000001):
    grads_values = {}
    m = Y.shape[1]
    Y = Y.reshape(Y_hat.shape)
    
    dA_prev = - (np.divide(Y, Y_hat + eps) - np.divide(1 - Y, 1 - Y_hat + eps))
    
    for layer_idx_prev, layer in reversed(list(enumerate(nn_architecture))):
        layer_idx_curr = layer_idx_prev + 1
        activ_function_curr = layer["activation"]
        
        dA_curr = dA_prev
        
        A_prev = memory["A" + str(layer_idx_prev)]
        Z_curr = memory["Z" + str(layer_idx_curr)]
        
        W_curr = params_values["W" + str(layer_idx_curr)]
        b_curr = params_values["b" + str(layer_idx_curr)]
        
        dA_prev, dW_curr, db_curr = single_layer_backward_propagation(
            dA_curr, W_curr, b_curr, Z_curr, A_prev, activ_function_curr)
        
        grads_values["dW" + str(layer_idx_curr)] = dW_curr
        grads_values["db" + str(layer_idx_curr)] = db_curr
    
    return grads_values

In [13]:
def update(params_values, grads_values, nn_architecture, learning_rate):

    for layer_idx, layer in enumerate(nn_architecture, 1):
        params_values["W" + str(layer_idx)] -= learning_rate * grads_values["dW" + str(layer_idx)]        
        params_values["b" + str(layer_idx)] -= learning_rate * grads_values["db" + str(layer_idx)]

    return params_values

In [14]:
def train(X, Y, nn_architecture, epochs, learning_rate, verbose=False, callback=None):
    params_values = init_layers(nn_architecture, 2)
    cost_history = []
    accuracy_history = []
    
    for i in range(epochs):
        Y_hat, cashe = full_forward_propagation(X, params_values, nn_architecture)
        
        cost = get_cost_value(Y_hat, Y)
        cost_history.append(cost)
        accuracy = get_accuracy_value(Y_hat, Y)
        accuracy_history.append(accuracy)
        
        grads_values = full_backward_propagation(Y_hat, Y, cashe, params_values, nn_architecture)
        params_values = update(params_values, grads_values, nn_architecture, learning_rate)
        
        if(i % 50 == 0):
            if(verbose):
                print("Iteration: {:05} - cost: {:.5f} - accuracy: {:.5f}".format(i, cost, accuracy))
            if(callback is not None):
                callback(i, params_values)
            
    return params_values, cost_history, accuracy_history