## Task 5

#### Instructions

Run the program from top to bottom. Download the pkl_quickdraw.pkl from the Google Drive or run Repackaging Quickdraw MNIST.ipynb with the .npy files from numpy_quickdraw_indv.zip on the Google Drive.

#### Google Drive
https://drive.google.com/drive/folders/1s2Ni2n0RjN2gfEagrqMHpTigm511qxLO?usp=sharing

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import zipfile as zp
import os
from scipy.stats import truncnorm

#### SGD Momentum

SGD Momentum is performed after each call of the train_single function where the momentum value is applied to the weights.

With external help from:
https://github.com/ngailapdi/MNIST_numpy/blob/master/MNIST_SGD_Numpy.ipynb

In [3]:
@np.vectorize
# Activation Functions
def sigmoid(x):
    return 1 / (1 + np.e ** -x)

def dsigmoid(x):
    return x * (1 - x)
            
def softmax(x):
    e_x = np.exp(x)
    return e_x / e_x.sum()

def dsoftmax(x, y):
    si_sj = -x * x.reshape(y, 1)
    s_der = np.diag(x) + si_sj
    return s_der
    
def relu(x):
    return np.maximum(0, x)

def drelu(x):
    return np.where(x <= 0, 0, 1)

def leaky_relu(x):
    return np.where(x >= 0, x, x * 0.01)

def dleaky_relu(x):
    out = np.ones_like(x)
    out[x < 0] *= 0.01
    return out


def get_batch(imgs, labels, batch_size):
    img_batches = []
    lbl_batches = []
    for counter in range(0,len(imgs),batch_size):
        img_batches.append(imgs[counter:counter+batch_size])
        lbl_batches.append(labels[counter:counter+batch_size])
    return np.array(img_batches), np.array(lbl_batches)

from scipy.stats import truncnorm
def truncated_normal(mean=0, sd=1, low=0, upp=10):
    return truncnorm((low - mean) / sd, 
                     (upp - mean) / sd, 
                     loc=mean, 
                     scale=sd)
class NeuralNetwork:
    
    def __init__(self, 
                 no_of_in_nodes, 
                 no_of_out_nodes, 
                 no_of_hidden_nodes,
                 activation_function,
                 learning_rate,
                 bias=None,
                ):  
        self.no_of_in_nodes = no_of_in_nodes
        self.no_of_out_nodes = no_of_out_nodes       
        self.no_of_hidden_nodes = no_of_hidden_nodes          
        self.learning_rate = learning_rate 
        self.bias = bias
        
        if activation_function == 'sigmoid':
            self.activation_ih = sigmoid
            self.activation_ho = sigmoid
            self.dactivation_oh = dsigmoid
            self.dactivation_hi = dsigmoid
            
        if activation_function == 'softmax':
            self.activation_ih = sigmoid
            self.activation_ho = softmax
            self.dactivation_oh = dsoftmax
            self.dactivation_hi = dsigmoid
        
        if activation_function == 'relu':
            self.activation_ih = relu
            self.activation_ho = relu
            self.dactivation_oh = drelu
            self.dactivation_hi = drelu
            
        if activation_function == 'leakyrelu':
            self.activation_ih = leaky_relu
            self.activation_ho = leaky_relu
            self.dactivation_oh = dleaky_relu
            self.dactivation_hi = dleaky_relu
        
        self.use_dropout = False
        self.active_input_percentage = 1.0
        self.active_hidden_percentage = 1.0
            
        self.create_weight_matrices()
        
        self.v0 = np.zeros(self.wih.shape)
        self.v1 = np.zeros(self.who.shape)
        
    def create_weight_matrices(self):
        X = truncated_normal(mean=2, sd=1, low=-0.5, upp=0.5)
        
        bias_node = 1 if self.bias else 0

        n = (self.no_of_in_nodes + bias_node) * self.no_of_hidden_nodes
        X = truncated_normal(mean=2, sd=1, low=-0.5, upp=0.5)
        self.wih = X.rvs(n).reshape((self.no_of_hidden_nodes, 
                                                   self.no_of_in_nodes + bias_node))

        n = (self.no_of_hidden_nodes + bias_node) * self.no_of_out_nodes
        X = truncated_normal(mean=2, sd=1, low=-0.5, upp=0.5)
        self.who = X.rvs(n).reshape((self.no_of_out_nodes, 
                                                    (self.no_of_hidden_nodes + bias_node)))

    def dropout_weight_matrices(self,
                                active_input_percentage=1.0,
                                active_hidden_percentage=1.0):
        # restore wih array, if it had been used for dropout
        self.wih_orig = self.wih.copy()
        self.no_of_in_nodes_orig = self.no_of_in_nodes
        self.no_of_hidden_nodes_orig = self.no_of_hidden_nodes
        self.who_orig = self.who.copy()
        

        active_input_nodes = int(self.no_of_in_nodes * active_input_percentage)
        active_input_indices = sorted(random.sample(range(0, self.no_of_in_nodes), 
                                      active_input_nodes))
        active_hidden_nodes = int(self.no_of_hidden_nodes * active_hidden_percentage)
        active_hidden_indices = sorted(random.sample(range(0, self.no_of_hidden_nodes), 
                                       active_hidden_nodes))
        
        self.wih = self.wih[:, active_input_indices][active_hidden_indices]       
        self.who = self.who[:, active_hidden_indices]
        
        self.no_of_hidden_nodes = active_hidden_nodes
        self.no_of_in_nodes = active_input_nodes
        return active_input_indices, active_hidden_indices
    
    def weight_matrices_reset(self, 
                              active_input_indices, 
                              active_hidden_indices):
        
        # resets weight matrices back to original
        # self.wih and self.who variables are updated from the active nodes
 
        temp = self.wih_orig.copy()[:,active_input_indices]
        temp[active_hidden_indices] = self.wih
        self.wih_orig[:, active_input_indices] = temp
        self.wih = self.wih_orig.copy()

        self.who_orig[:, active_hidden_indices] = self.who
        self.who = self.who_orig.copy()
        self.no_of_in_nodes = self.no_of_in_nodes_orig
        self.no_of_hidden_nodes = self.no_of_hidden_nodes_orig
        
    
    def train_single(self, input_vector, target_vector):
        # Forward Propagation
        # Input Layer to Hidden Layer
        if self.bias:
            # adding bias node to the end of the input_vector
            input_vector = np.concatenate( (input_vector, [self.bias]) )

        input_vector = np.array(input_vector, ndmin=2).T
        target_vector = np.array(target_vector, ndmin=2).T

        output_vector1 = np.dot(self.wih, input_vector) # linear
        output_vector_hidden = self.activation_ih(output_vector1) # non-linear
        
        # Hidden Layer to Output Layer
        if self.bias:
            output_vector_hidden = np.concatenate( (output_vector_hidden, [[self.bias]]) )
        
        output_vector2 = np.dot(self.who, output_vector_hidden) # linear
        output_vector_network = self.activation_ho(output_vector2) # non-linear
        
        # Backward Propagation
        # Output Layer to Hidden Layer
        output_errors = target_vector - output_vector_network
        
        # if using softmax
        if self.dactivation_oh == dsoftmax:
            ovn = output_vector_network.reshape(output_vector_network.size,)
            dsoftmax_output = dsoftmax(ovn, self.no_of_out_nodes)
            tmp = np.dot(dsoftmax_output, output_errors)
            
        else:
            tmp = output_errors * self.dactivation_oh(output_vector_network) # derivative
        
        if self.use_dropout:
            tmp = tmp / self.active_hidden_percentage # Inverted Dropout
            
        delta_w1 = np.dot(tmp, output_vector_hidden.T)

        # Hidden Layer to Input Layer
        # calculate hidden errors:
        hidden_errors = np.dot(self.who.T, output_errors)
        # update the weights:
        tmp = hidden_errors * self.dactivation_hi(output_vector_hidden) # derivative
        
        if self.use_dropout:
            tmp = tmp / self.active_input_percentage # Inverted Dropout
            
        if self.bias:
            delta_w0 = np.dot(tmp, input_vector.T)[:-1,:] 
        else:
            delta_w0 = np.dot(tmp, input_vector.T)
        
        # Momentum
        if not self.use_momentum:
            self.wih += self.learning_rate * delta_w0
            self.who += self.learning_rate * delta_w1
        else:
            self.v0 = self.momentum * self.v0 + (1 - self.momentum) * delta_w0
            self.v1 = self.momentum * self.v1 + (1 - self.momentum) * delta_w1
            self.wih += self.learning_rate * self.v0
            self.who += self.learning_rate * self.v1
        
    def train(self, data_array, 
              labels_one_hot_array,
              momentum=1.0,
              epochs=1,
              active_input_percentage=1.0,
              active_hidden_percentage=1.0,
              no_of_dropout_tests = 1, 
              use_dropout = False, 
              use_momentum = False):
        
        self.momentum = momentum
        self.use_momentum = use_momentum
        
        self.use_dropout = use_dropout
        self.active_input_percentage = active_input_percentage
        self.active_hidden_percentage = active_hidden_percentage

        partition_length = int(len(data_array) / no_of_dropout_tests)
        
        # if using dropout
        if self.use_dropout:
            for epoch in range(epochs):
                print("epoch: ", epoch)
                for start in range(0, len(data_array), partition_length):
                    active_in_indices, active_hidden_indices = \
                               self.dropout_weight_matrices(active_input_percentage,
                                                            active_hidden_percentage)
                    for i in range(start, start + partition_length):
                        self.train_single(data_array[i][active_in_indices], 
                                         labels_one_hot_array[i]) 

                    self.weight_matrices_reset(active_in_indices, active_hidden_indices)
        else:
            for epoch in range(epochs):
                print("epoch: ", epoch)
                
                for i in range(len(data_array)):
                        self.train_single(data_array[i], labels_one_hot_array[i])
                        
                
                    
        
    def confusion_matrix(self, data_array, labels):
        cm = np.zeros((10, 10), int)
        for i in range(len(data_array)):
            res = self.run(data_array[i])
            res_max = res.argmax()
            target = labels[i][0]
            cm[res_max, int(target)] += 1
        return cm
    
    def precision(self, label, confusion_matrix):
        col = confusion_matrix[:, label]
        return confusion_matrix[label, label] / col.sum()
    
    def recall(self, label, confusion_matrix):
        row = confusion_matrix[label, :]
        return confusion_matrix[label, label] / row.sum()
        
    
    def run(self, input_vector):
        """ input_vector can be tuple, list or ndarray """
        
        input_vector = np.array(input_vector, ndmin=2).T
        output_vector = np.dot(self.wih, 
                               input_vector)
        output_vector = self.activation_ih(output_vector)
        
        output_vector = np.dot(self.who, 
                               output_vector)
        output_vector = self.activation_ho(output_vector)
    
        return output_vector
    
    def evaluate(self, data, labels):
        corrects, wrongs = 0, 0
        for i in range(len(data)):
            res = self.run(data[i])
            res_max = res.argmax()
            if res_max == labels[i]:
                corrects += 1
            else:
                wrongs += 1
        return corrects, wrongs
    
    def mean_squared_error(self, data, labels):
        actual = labels
        predicted = [None] * len(actual)
        
        corrects, wrongs = 0, 0
        for i in range(len(data)):
            res = self.run(data[i])
            res_max = res.argmax()
            predicted[i] = res_max
        
        sum_square_error = 0.0
        for i in range(len(actual)):
            sum_square_error += (actual[i] - predicted[i])**2.0
        mean_square_error = 1.0 / len(actual) * sum_square_error
        return mean_square_error

In [4]:
# Load data from Pickle file
with open(os.path.join(".","pkl_quickdraw.pkl"), "br") as fh:
    data = pickle.load(fh)
train_imgs = data[0]
test_imgs = data[1]
train_labels = data[2]
test_labels = data[3]
train_labels_one_hot = data[4]
test_labels_one_hot = data[5]

img_size = 28 # dimensions
num_of_labels = 10 # 0, 1, 2, ... 9
image_pixels = img_size * img_size



In [5]:
# Run 100 times
epochs = 10

NN = NeuralNetwork(no_of_in_nodes = image_pixels, 
                    no_of_out_nodes = 10, 
                    no_of_hidden_nodes = 100,
                    learning_rate = 0.1,
                    activation_function = 'softmax', 
                    bias=None)

NN.train(train_imgs,
         train_labels_one_hot,
         epochs = epochs,  
         momentum = 0.1, 
         active_input_percentage = 1.0, 
         active_hidden_percentage = 1.0, 
         no_of_dropout_tests = 100,
         use_dropout = False, 
         use_momentum = True)
                

epoch:  0
epoch:  1
epoch:  2
epoch:  3
epoch:  4
epoch:  5
epoch:  6
epoch:  7
epoch:  8
epoch:  9


In [6]:
corrects, wrongs = NN.evaluate(train_imgs, train_labels)
print("accuracy train: ", corrects / ( corrects + wrongs))
corrects, wrongs = NN.evaluate(test_imgs, test_labels)
print("accuracy test: ", corrects / ( corrects + wrongs))
print()

train_cm = NN.confusion_matrix(train_imgs, train_labels)
print("Training confusion matrix: \n", train_cm)
print()

test_cm = NN.confusion_matrix(test_imgs, test_labels)
print("Testing confusion matrix: \n", test_cm)
print()

print("Training Label Accuracy")
for i in range(10):
    print("digit: ", i, "precision: ", NN.precision(i, train_cm), "recall: ", NN.recall(i, train_cm))

print()
print("Testing Label Accuracy")
for i in range(10):
    print("digit: ", i, "precision: ", NN.precision(i, test_cm), "recall: ", NN.recall(i, test_cm))

print()
train_mse = NN.mean_squared_error(train_imgs, train_labels)
print("Training mean square error: ", train_mse)

test_mse = NN.mean_squared_error(test_imgs, test_labels)
print("Testing mean square error: ", test_mse)

accuracy train:  0.7083166666666667
accuracy test:  0.7014

Training confusion matrix: 
 [[5234   37  153  588   25    5 1730    0   25   11]
 [  40 5737   21  185   42    0   23    0    6    0]
 [   0    0    0    0    0    0    0    0    0    0]
 [ 346  127   58 4794  265    3  237    0   28    3]
 [  24    7  835  288 3562    0  899    0   15    0]
 [   6    0    2    0    0 5222    0  136    8   67]
 [ 313   90 4906  133 2092    0 3048    0  247    0]
 [   3    0    1    1    0  355    0 5535   22  238]
 [  34    2   23   11   14  176   62  267 5648 1962]
 [   0    0    1    0    0  239    1   62    1 3719]]

Testing confusion matrix: 
 [[864  11  30  81   1   2 309   0   5   2]
 [ 11 960   4  43   8   0   4   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0]
 [ 58  11   9 805  43   0  40   0   2   0]
 [  3   1 140  41 602   0 152   0   4   0]
 [  0   0   0   0   0 844   0  26   3  11]
 [ 53  17 811  27 341   0 481   0  46   0]
 [  0   0   0   1   0  68   0 905   3  42]
 [ 10  



Training mean square error:  [3.4482]
Testing mean square error:  [3.5814]
