# Imports

In [27]:
import numpy as np
import pandas as pd
import pickle
import sys
import os
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from time import *

# Vectorized Convolution

[Github link](https://github.com/slvrfn/vectorized_convolution/blob/master/convolution.py)

In [121]:
def getWindows(input, output_size, kernel_size, padding=0, stride=1, dilate=0):
    working_input = input
    working_pad = padding
    # dilate the input if necessary
    if dilate != 0:
        working_input = np.insert(working_input, range(1, input.shape[2]), 0, axis=2)
        working_input = np.insert(working_input, range(1, input.shape[3]), 0, axis=3)

    # pad the input if necessary
    if working_pad != 0:
        working_input = np.pad(working_input, pad_width=((0,), (0,), (working_pad,), (working_pad,)), mode='constant', constant_values=(0.,))

    in_b, in_c, out_h, out_w = output_size
    out_b, out_c, _, _ = input.shape
    batch_str, channel_str, kern_h_str, kern_w_str = working_input.strides

    return np.lib.stride_tricks.as_strided(
        working_input,
        (out_b, out_c, out_h, out_w, kernel_size, kernel_size),
        (batch_str, channel_str, stride * kern_h_str, stride * kern_w_str, kern_h_str, kern_w_str)
    )


class Conv2D:
    """
    An implementation of the convolutional layer. We convolve the input with out_channels different filters
    and each filter spans all channels in the input.
    """
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=0, weight=None, bias=None):
        """
        :param in_channels: the number of channels of the input data
        :param out_channels: the number of channels of the output(aka the number of filters applied in the layer)
        :param kernel_size: the specified size of the kernel(both height and width)
        :param stride: the stride of convolution
        :param padding: the size of padding. Pad zeros to the input with padding size.
        """
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        self.cache = None

        self._init_weights(weight, bias)

    def _init_weights(self, weight, bias):
        self.weight = 1e-3 * np.random.randn(self.out_channels, self.in_channels,  self.kernel_size, self.kernel_size)
        self.bias = np.zeros(self.out_channels)
        if weight is not None:
            self.weight = weight
        if bias is not None:
            self.bias = bias

    def forward(self, x):
        """
        The forward pass of convolution
        :param x: input data of shape (N, C, H, W)
        :return: output data of shape (N, self.out_channels, H', W') where H' and W' are determined by the convolution
                 parameters.
        """

        n, c, h, w = x.shape
        out_h = (h - self.kernel_size + 2 * self.padding) // self.stride + 1
        out_w = (w - self.kernel_size + 2 * self.padding) // self.stride + 1

        windows = getWindows(x, (n, c, out_h, out_w), self.kernel_size, self.padding, self.stride)

        out = np.einsum('bihwkl,oikl->bohw', windows, self.weight)

        # add bias to kernels
        out += self.bias[None, :, None, None]

        self.cache = x, windows
        return out

    def backward(self, dout):
        """
        The backward pass of convolution
        :param dout: upstream gradients
        :return: dx, dw, and db relative to this module
        """
        x, windows = self.cache

        padding = self.kernel_size - 1 if self.padding == 0 else self.padding

        dout_windows = getWindows(dout, x.shape, self.kernel_size, padding=padding, stride=1, dilate=self.stride - 1)
        rot_kern = np.rot90(self.weight, 2, axes=(2, 3))

        db = np.sum(dout, axis=(0, 2, 3))
        dw = np.einsum('bihwkl,bohw->oikl', windows, dout)
        dx = np.einsum('bohwkl,oikl->bihw', dout_windows, rot_kern)

        return db, dw, dx


in_channels = 3
out_channels = 128
kernel_size = 3
stride = 2
padding = 1
batch_size = (4, in_channels, 12, 10)
dout_size = (4, out_channels, 6, 5)

np.random.seed(42)

x = np.random.random(batch_size)  # create data for forward pass
dout = np.random.random(dout_size)  # create random data for backward
print('x: ', x.shape)
print('d_out: ', dout.shape)

conv = Conv2D(in_channels, out_channels, kernel_size, stride, padding)

conv_out = conv.forward(x)
print('conv_out: ', conv_out.shape)

db, dw, dx = conv.backward(dout)
print('db: ', db.shape)
print('dw: ', dw.shape)
print('dx: ', dx.shape)

x:  (4, 3, 12, 10)
d_out:  (4, 128, 6, 5)
conv_out:  (4, 128, 6, 5)
db:  (128,)
dw:  (128, 3, 3, 3)
dx:  (4, 3, 12, 10)


# Convolution Class

In [5]:
class Convolution:
    def __init__(self, n_kernels, kernel_dim, stride, padding, learning_rate=0.1, kernels=None, bias=None):
        self.n_kernels = n_kernels
        self.kernel_dim = kernel_dim
        self.stride = stride
        self.padding = padding
        self.kernels = kernels
        self.bias = bias
        self.input = None
        self.learning_rate = learning_rate

    def zeropad(self, input, padding):
        """
        :param input: array of 2D matrices
        :return: array of padded 2D matrices
        """
        n_channels, n_rows, n_cols = input.shape[0], input.shape[1], input.shape[2]
        n_rows_out = 2*padding + n_rows
        n_cols_out = 2*padding + n_cols
        output = np.zeros((n_channels, n_rows_out, n_cols_out))
        output[:, padding:padding+n_rows, padding:padding+n_cols] = np.copy(input)
        return output

    def dilate(self, input, len):
        """
        :param input: array of 2D matrices
        :param len: length of dilation
        :return: array of dilated 2D matrices
        """
        n_channels, n_rows, n_cols = input.shape
        n_rows_out = n_rows + len*(n_rows-1)
        n_cols_out = n_cols + len*(n_cols-1) 
        output = np.zeros((n_channels, n_rows_out, n_cols_out))
        output[:,::len+1,::len+1] = input
        return output

    def rotate180(self, input):
        """
        :param input: an array of 2D matrices (representing one kernel)
        :return: an array of 180-degree rotated 2D matrices
        """
        output = np.copy(input)
        for i in range(output.shape[0]):
            output[i] = np.rot90(output[i])
            output[i] = np.rot90(output[i])
        return output

    def forward(self, input):
        """
        :param input: shape(n_channels, n_rows, n_cols)
        :returns: shape(n_kernels, n_rows_out, n_cols_out)
        self.kernels.shape(n_kernels, n_channels, kernel_dim, kernel_dim)
        """
        self.input = self.zeropad(input, self.padding)
        n_channels, n_rows, n_cols = self.input.shape
        n_rows_out = (n_rows-self.kernel_dim) // self.stride + 1
        n_cols_out = (n_cols-self.kernel_dim) // self.stride + 1
        output = np.zeros((self.n_kernels, n_rows_out, n_cols_out))

        if self.kernels is None:
            self.kernels = np.random.randn(self.n_kernels, self.input.shape[0], self.kernel_dim, self.kernel_dim)
            # Xavier initialization
            for i in range(self.n_kernels):
                self.kernels[i,:,:,:] = np.random.normal(loc=0, scale=np.sqrt(1./(n_channels*self.kernel_dim*self.kernel_dim)), size=(n_channels, self.kernel_dim, self.kernel_dim))

        if self.bias is None:
            self.bias = np.zeros((self.n_kernels, 1))
        
        for k in range(self.n_kernels):
            for row, row_out in zip(range(0, n_rows, self.stride), range(n_rows_out)):
                for col, col_out in zip(range(0, n_cols, self.stride), range(n_cols_out)):
                    output[k,row_out,col_out] = np.sum(self.input[:,row:row+self.kernel_dim, col:col+self.kernel_dim] * self.kernels[k,:,:,:]) + self.bias[k]
        
        return output

    def backward(self, dz):
        """
        :param dz: output gradient, from next layer
        dx = input gradient, to be passed to previous layer
        dk = weights/kernel gradient, for learning the weights
        db = bias gradient, for learning the biases
        """
        n_channels, n_rows, n_cols = self.input.shape
        n_rows_out = (n_rows-self.kernel_dim) // self.stride + 1
        n_cols_out = (n_cols-self.kernel_dim) // self.stride + 1

        # Gradient with respect to input
        dx = np.zeros((n_channels, n_rows, n_cols))
        dz_sparsed = self.dilate(dz, self.stride-1)     
        dz_sparsed = self.zeropad(dz_sparsed, self.kernel_dim-1)   
        # dx = sum( conv(dz_sparsed,rotated_K) )
        for k in range(self.n_kernels):
            for row in range(n_rows):
                for col in range(n_cols):
                    kernel_rotated = self.rotate180(self.kernels[k])
                    cell_wise_mult = dz_sparsed[k, row:row+self.kernel_dim, col:col+self.kernel_dim] * kernel_rotated[:,:,:]
                    dx[:,row,col] += np.sum(cell_wise_mult, axis=(1,2))

        # calculate gradient_kernels
        dk = np.zeros(self.kernels.shape)
        dz_dilated = self.dilate(dz, self.stride-1)
        # dk = conv(input, dilated output gradient)
        for k in range(self.n_kernels):
            for row in range(self.kernel_dim):
                for col in range(self.kernel_dim):
                    bostu = self.input[:, row:row+dz_dilated.shape[1], col:col+dz_dilated.shape[2]] * dz_dilated[k]
                    bostu = np.sum(bostu, axis=(1,2))
                    dk[k,:,row,col] = bostu

        # calculate gradient_bias
        db = np.zeros((self.n_kernels, 1))
        for k in range(self.n_kernels):
            db[k,:] = np.sum(dz[k,:,:])

        # update parameters
        self.kernels -= self.learning_rate*dk
        for k in range(self.n_kernels):
            self.bias[k] -= self.learning_rate*db[k]
        return dx


# Convolution: Testing

In [222]:
input = np.reshape(np.arange(75), (3,5,5))
kernels = np.reshape(np.arange(54), (2,3,3,3))
bias = np.array([0]*2)

kernels = kernels.astype('float64')
bias = bias.astype('float64')

conv2d = Conv2D(input.shape[0], kernels.shape[0], kernels.shape[2], 2, 0, kernels, bias)
input_ = np.zeros((1,3,5,5))
input_[0] = np.copy(input)
f1 = conv2d.forward(input_)
db, dk, dx = conv2d.backward(f1) 

conv = Convolution(kernels.shape[0], kernels.shape[2], 2, 0, 0.1, kernels, bias)
f2 = conv.forward(input)
dx2= conv.backward(f2)

print('f1')
print(f1)
print('------')
print('f2')
print(f2)

print('------------')
print('dk1')
print(dk)
print('-------')
print('dk2')

print('------------')
print('dx1')
print(dx)
print('-------')
print('dx2')
print(dx2)

f1
[[[[15219. 15921.]
   [18729. 19431.]]

  [[37818. 39978.]
   [48618. 50778.]]]]
------
f2
[[[15219. 15921.]
  [18729. 19431.]]

 [[37818. 39978.]
  [48618. 50778.]]]
------------
dk1
[[[[  452304.   521604.   590904.]
   [  798804.   868104.   937404.]
   [ 1145304.  1214604.  1283904.]]

  [[ 2184804.  2254104.  2323404.]
   [ 2531304.  2600604.  2669904.]
   [ 2877804.  2947104.  3016404.]]

  [[ 3917304.  3986604.  4055904.]
   [ 4263804.  4333104.  4402404.]
   [ 4610304.  4679604.  4748904.]]]


 [[[ 1175472.  1352664.  1529856.]
   [ 2061432.  2238624.  2415816.]
   [ 2947392.  3124584.  3301776.]]

  [[ 5605272.  5782464.  5959656.]
   [ 6491232.  6668424.  6845616.]
   [ 7377192.  7554384.  7731576.]]

  [[10035072. 10212264. 10389456.]
   [10921032. 11098224. 11275416.]
   [11806992. 11984184. 12161376.]]]]
-------
dk2
------------
dx1
[[[[ 1021086.  1074123.  2206566.  1135305.  1191204.]
   [ 1180197.  1233234.  2533374.  1303002.  1358901.]
   [ 2651994.  2772378.  5678

# Max Pooling

In [6]:
class MaxPooling:
    def __init__(self, pool_dim, stride):
        self.pool_dim = pool_dim
        self.stride = stride

    def forward(self, input):
        """
        :param input: array of 2D matrices
        :return: array of 2D matrices, each matrix maxpool filter applied
        """
        self.input = input
        n_channels, n_rows, n_cols = self.input.shape
        n_rows_out = (n_rows-self.pool_dim) // self.stride + 1
        n_cols_out = (n_cols-self.pool_dim) // self.stride + 1
        output = np.zeros((n_channels, n_rows_out, n_cols_out))
        for c in range(n_channels):
            for row, row_out in zip(range(0, n_rows, self.stride), range(n_rows_out)):
                for col, col_out in zip(range(0, n_cols, self.stride), range(n_cols_out)):
                    output[c,row_out,col_out] = np.max(self.input[c, row:row+self.pool_dim, col:col+self.pool_dim])

        return output

    def backward(self, dz):
        """
        :param dz: 
        """
        n_channels, n_rows, n_cols = dz.shape
        dx = np.zeros_like(self.input)
        for i in range(n_channels):
            for j in range(n_rows):
                for k in range(n_cols):
                    patch = self.input[i, j*self.stride:j*self.stride+self.pool_dim, k*self.stride:k*self.stride+self.pool_dim]
                    max_index = np.unravel_index(np.argmax(patch), patch.shape)
                    dx[i, j*self.stride:j*self.stride+self.pool_dim, k*self.stride:k*self.stride+self.pool_dim][max_index] = dz[i, j, k]
        return dx
        # n_channels, n_rows, n_cols = self.input.shape
        # n_rows_out = (n_rows-self.pool_dim) // self.stride + 1
        # n_cols_out = (n_cols-self.pool_dim) // self.stride + 1
        # dx = np.zeros(self.input.shape)
        # for c in range(n_channels):
        #     for row, row_out in zip(range(0, n_rows, self.stride), range(n_rows_out)):
        #         for col, col_out in zip(range(0, n_cols, self.stride), range(n_cols_out)):
        #             st = np.argmax(self.input[c, row:row+self.pool_dim, col:col+self.pool_dim])
        #             idx, idy = np.unravel_index(st, (self.pool_dim, self.pool_dim))
        #             dx[c, row+idx, col+idy] = dz[c, row_out, col_out]
        # return dx
        # assert not isinstance(dz, str)


# Max Pool: Testing

In [208]:
input = np.reshape(np.arange(75), (3,5,5)).astype('float64')
maxpooling = MaxPooling(3, 2)
output = maxpooling.forward(input)
print('input ->\n' + str(input))
print('output ->\n' + str(output))
dx = maxpooling.backward(output)
print('dx ->\n' + str(dx))

input ->
[[[ 0.  1.  2.  3.  4.]
  [ 5.  6.  7.  8.  9.]
  [10. 11. 12. 13. 14.]
  [15. 16. 17. 18. 19.]
  [20. 21. 22. 23. 24.]]

 [[25. 26. 27. 28. 29.]
  [30. 31. 32. 33. 34.]
  [35. 36. 37. 38. 39.]
  [40. 41. 42. 43. 44.]
  [45. 46. 47. 48. 49.]]

 [[50. 51. 52. 53. 54.]
  [55. 56. 57. 58. 59.]
  [60. 61. 62. 63. 64.]
  [65. 66. 67. 68. 69.]
  [70. 71. 72. 73. 74.]]]
output ->
[[[12. 14.]
  [22. 24.]]

 [[37. 39.]
  [47. 49.]]

 [[62. 64.]
  [72. 74.]]]
dx ->
[[[ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 12.  0. 14.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 22.  0. 24.]]

 [[ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 37.  0. 39.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 47.  0. 49.]]

 [[ 0.  0.  0.  0.  0.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 62.  0. 64.]
  [ 0.  0.  0.  0.  0.]
  [ 0.  0. 72.  0. 74.]]]


# ReLU Activation

In [7]:
class Relu:
    def __init__(self):
        pass

    def forward(self, input):
        """
        :param input: input to ReLU activation
        :returns: max(0,input)
        """
        self.input = input
        ret = np.copy(input)
        ret[ret<0] = 0
        return ret

    def backward(self, dz):
        dx = np.copy(dz)
        dx[self.input<0] = 0
        return dx

# Softmax

When working with exponents, there's a danger of overflow errors if the base gets too large. This can easily happen when working with the output of a linear layer. To protect ourselves from this, we can use a trick described by Paul Panzer on StackOverflow. Because softmax(x) = softmax(x - c) for any constant c, we can calculate $\sigma(x)_i = \frac{exp(x_i-max(x))}{\sum_{j=1}^{N} exp(x_j - max(x))}$ instead

In [8]:
class Softmax:
    def __init__(self):
        pass

    def forward(self, input):   
        exp = np.exp(input-np.max(input), dtype=np.float64)
        self.output = exp/np.sum(exp)
        return self.output
    
    def backward(self, dz):
        return dz 


# Flattening layer

In [9]:
class Flatten:
    def __init__(self):
        pass

    def forward(self, input):
        """
        :param input: array of 2D matrices
        :returns: row vector of shape (1,)
        """
        self.input_shape = input.shape
        return np.reshape(input, (1, input.shape[0]*input.shape[1]*input.shape[2]))

    def backward(self, dz):
        """
        :returs: array of 2D matrices
        """
        return np.reshape(dz, self.input_shape)


# Flattening Layer: Testing

In [129]:
input = np.reshape(np.arange(75), (3,5,5)).astype('float64')
flatten = Flatten()
output = flatten.forward(input)
print('input ->\n' + str(input))
print('forward ->\n' + str(output))
print('backward ->\n' + str(flatten.backward(output)))

input ->
[[[ 0.  1.  2.  3.  4.]
  [ 5.  6.  7.  8.  9.]
  [10. 11. 12. 13. 14.]
  [15. 16. 17. 18. 19.]
  [20. 21. 22. 23. 24.]]

 [[25. 26. 27. 28. 29.]
  [30. 31. 32. 33. 34.]
  [35. 36. 37. 38. 39.]
  [40. 41. 42. 43. 44.]
  [45. 46. 47. 48. 49.]]

 [[50. 51. 52. 53. 54.]
  [55. 56. 57. 58. 59.]
  [60. 61. 62. 63. 64.]
  [65. 66. 67. 68. 69.]
  [70. 71. 72. 73. 74.]]]
forward ->
[[ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14. 15. 16. 17.
  18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
  36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
  54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
  72. 73. 74.]]
backward ->
[[[ 0.  1.  2.  3.  4.]
  [ 5.  6.  7.  8.  9.]
  [10. 11. 12. 13. 14.]
  [15. 16. 17. 18. 19.]
  [20. 21. 22. 23. 24.]]

 [[25. 26. 27. 28. 29.]
  [30. 31. 32. 33. 34.]
  [35. 36. 37. 38. 39.]
  [40. 41. 42. 43. 44.]
  [45. 46. 47. 48. 49.]]

 [[50. 51. 52. 53. 54.]
  [55. 56. 57. 58. 59.

# Fully Connected Layer

In [10]:
class FullyConnected:
    def __init__(self, n_output, learning_rate=0.01):
        """
        :n_output: no of outputs
        :input: shape(1,n_input); dynamically assigned during forward
        :weights: shape(n_inputs, n_outputs)
        :bias: shape(1,n_outputs)
        """
        self.n_output = n_output
        self.learning_rate = learning_rate
        
        self.input = None
        self.weights = None
        self.bias = None

    def setParams(self, weights, bias):
        self.weights = weights
        self.bias = bias

    def forward(self, input):
        """
        :input: shape(1, n_input)
        :weight: shape(n_input, n_output)
        :bias: shape(1, n_output)
        :return: shape(1, n_output)
        """
        self.input = input
        n_input = input.shape[1]

        if self.weights is None:
            # Xavier initialization
            self.weights = np.random.normal(loc=0, scale=np.sqrt(1./(n_input*self.n_output)), size=(n_input, self.n_output))

        if self.bias is None:
            self.bias = np.zeros((1, self.n_output))

        return np.dot(self.input, self.weights) + self.bias

    def backward(self, dz):
        """
        :dz: shape(1, n_output)
        :dw: shape(n_input, n_output)
        :dx: shape(1, n_input)
        """
        dw = np.dot(self.input.T, dz)           # (n_input,n_output) = (n_input,1) DOT (1,n_output)
        dx = np.dot(dz, self.weights.T)         # (1,n_input) = (1,n_output) DOT (n_output,n_input)
        db = np.sum(dz, axis=1, keepdims=True)  # (1,n_output) = (1,n_output)

        self.weights -= self.learning_rate * dw
        self.bias -= self.learning_rate * db

        return dx


# Fully Connected Testing

In [131]:
input = np.reshape(np.arange(4), (1,4)).astype('float64')

weights = np.reshape(np.arange(12), (4,3)).astype('float64')
# bias = np.reshape(np.arange(3), (1,3)).astype('float64')


fc = FullyConnected(3)
fc.setParams(weights, None)
output = fc.forward(input)

dx = fc.backward(output)

print('input ->\n' + str(input))
print('weights ->\n ' + str(weights))
print('bias -> \n' + str(bias))
print('forward ->\n' + str(output))
print('backward -> \n' + str(dx))

input ->
[[0. 1. 2. 3.]]
weights ->
 [[0.   1.   2.  ]
 [2.58 3.52 4.46]
 [5.16 6.04 6.92]
 [7.74 8.56 9.38]]
bias -> 
[ -6930.  -17719.2]
forward ->
[[42. 48. 54.]]
backward -> 
[[ 156.  588. 1020. 1452.]]


# Cross Entropty Loss

In [11]:
def crossEntropyLoss(y_pred, y_true):
    """
    Computes the cross-entropy loss between the true labels and predicted probabilities.

    Parameters:
        y_true (np.array): A row matrix of shape (1, n_classes) representing the true labels.
        y_pred (np.array): A row matrix of shape (1, n_classes) representing the predicted probabilities.

    Returns:
        float: The cross-entropy loss.
    """
    true_class = np.argmax(y_true, axis=1)
    loss = -np.log(y_pred[0, true_class])
    return np.sum(loss)


# Loading Data

In [12]:
def loadImage(image_name):
    """
    :param image_name: filename of the image
    :returns: array of 2D matrices (1 channels) shape(1,h,w)
    Returns the grayscaled pixel values
    """
    img = Image.open(image_name).convert('L')
    img = img.resize((32,32))
    data = np.asarray(img)
    data = np.reshape(data, (1,32,32))
    return data

def oneHotEncode(digit):
    output = np.zeros((1, 10))
    output[0,digit] = 1.0
    return output

def oneHotDecode(onehot):
    return np.argmax(onehot)

def readcsv(csv_file_name):
    df = pd.read_csv(csv_file_name)
    X = []
    Y = []
    for index, row in df.iterrows():
        image_file_name = './Dataset/' + row['database name'] + '/' + row['filename']
        digit = row['digit']
        data = loadImage(image_file_name)
        label = oneHotEncode(digit)
        X.append(data)
        Y.append(label)
    return np.asarray(X), np.asarray(Y)

# Load Data Testing

In [189]:
oneHotDecode(np.reshape(np.array([0,.8,0,.9,0,0,0,0,0,0]), (1,10)))

X, Y = readcsv('./Dataset/training-d.csv')

print(X.shape)
print(Y.shape)

for x in X:
    assert x.shape == X[0].shape, str(x.shape) + '!=' + str(X[0].shape)

# print(X)

3

# Network

In [30]:
class Network:
    def __init__(self):
        self.layers = []
    
    def addLayer(self, layer):
        self.layers.append(layer)

    def batchNormalize(self, X):
        return (X-np.mean(X)) / np.std(X)

    def train(self, X_train, Y_train, batch_size, epoch):
        """
        :param X_train: training data. 
            Each input is a grayscaled image of 32x32 pixels 
            shape(n_inputs, 1,32,32)
        :param Y_train: label of each training data
            Each label is one-hot encoded
            shape(n_inputs, 1,10)
        :patam batch_size: the number of inputs to be taken in a single batch
        :param epoch: epoch
        :returns:
        """
        # Number of input samples
        n_inputs = X_train.shape[0]

        for e in tqdm(range(epoch), desc='Epoch', position=0):
            loss_training = 0
            
            # Shuffle the training data
            perm = np.random.permutation(n_inputs)
            X_train = X_train[perm]
            Y_train = Y_train[perm]
            
            # Split training data into mini-batches
            for i in tqdm(range(0, n_inputs, batch_size), desc='Batch', position=1, leave=False):
                X_batch = X_train[i : np.minimum(i+batch_size, n_inputs)]
                Y_batch = Y_train[i : np.minimum(i+batch_size, n_inputs)]
                
                start_time_batch = time()
                # Take a single input X and respective label Y (one-hot-encoded)
                for X, Y in zip(X_batch, Y_batch):
                    # Forward through the layers
                    output = self.batchNormalize(X)
                    for layer in self.layers:
                        output = layer.forward(output)
            
                    loss_training += crossEntropyLoss(output, Y)

                    # Backward through the layers
                    output_gradient = (output - np.copy(Y)) / batch_size
                    for layer in reversed(self.layers):
                        output_gradient = layer.backward(output_gradient)
                
                end_time_batch = time()
                time_batch = end_time_batch-start_time_batch
                remaining_time = (n_inputs*epoch - n_inputs*e - i)/batch_size * time_batch
                hours, remainder = divmod(remaining_time, 3600)
                minutes, seconds = divmod(remainder, 60)

            loss_training /= n_inputs

            # Perform validation on the training dataset to get different metrics
            loss_validation, accuracy, f1, c_matrix = self.validate(X_train, Y_train)

            print("Epoch:{0:2d}/{1:2d} | Loss Training:{2:.5f} | Loss Validation:{3:.5f} | Accuracy:{4:.5f} | F1:{5:5f} | ETA:{6:2d}:{7:2d}:{8:2d}"
            .format(e, epoch, loss_training, loss_validation, accuracy, f1, int(hours), int(minutes), int(seconds)))
            print('---Confusion Matrix---\n' + str(c_matrix))


    def validate(self, X_test, Y_test):
        """
        Parameters:
            X_test:
            Y_test: array of row matrices
        """
        loss = 0
        Y_pred_list = []
        for X, Y in zip(X_test, Y_test):
            output = self.batchNormalize(X)
            # forward pass
            for layer in self.layers:
                output = layer.forward(output)
            loss += crossEntropyLoss(output, Y)            
            Y_pred_list.append(oneHotDecode(output))

        Y_test_list = list(map(oneHotDecode, Y_test[:,0,:]))
        assert len(Y_test_list) == len(Y_pred_list), str(len(Y_test_list)) + '!=' + str(len(Y_pred_list))
        
        loss /= X_test.shape[0]
        accuracy = accuracy_score(Y_test_list, Y_pred_list)
        f1 = f1_score(Y_test_list, Y_pred_list, average='macro')
        c_matrix = confusion_matrix(Y_test_list, Y_pred_list)

        return loss, accuracy, f1, c_matrix

    def predict(self, X):
        """
        Parameters:
            X: a single input

        Returns:
            d: a digit representing the prediction
        """
        output = self.batchNormalize(X)
        for layer in self.layers:
            output = layer.forward(output)
        return oneHotDecode(output)


# Network Testing

In [36]:
network = Network()
network.addLayer(Convolution(n_kernels=6, kernel_dim=5, stride=1, padding=0))
network.addLayer(Relu())
network.addLayer(MaxPooling(pool_dim=2, stride=2))

network.addLayer(Convolution(n_kernels=16, kernel_dim=5, stride=1, padding=0))
network.addLayer(Relu())
network.addLayer(MaxPooling(pool_dim=2, stride=2))

network.addLayer(Convolution(n_kernels=120, kernel_dim=5, stride=1, padding=0, learning_rate=0.0001))
network.addLayer(Relu())

network.addLayer(Flatten())

network.addLayer(FullyConnected(n_output=84))
network.addLayer(Relu())

network.addLayer(FullyConnected(n_output=10))
network.addLayer(Relu())

network.addLayer(Softmax())

X_train, Y_train = readcsv('./Dataset/training-b.csv')
network.train(X_train, Y_train, 128, 5)

Epoch:   0%|          | 0/5 [18:43<?, ?it/s]


KeyboardInterrupt: 

# Store the model in pickle file

In [32]:
pickle_file = open('1705085_model.pickle', 'wb')
pickle.dump(network, pickle_file)
pickle_file.close()

# Load the model from pickle file

In [33]:
pickle_file = open('1705085_model.pickle', 'rb')
network = pickle.load(pickle_file)
pickle_file.close()

# Prediction of all images in a folder

In [34]:
pred_file_content = 'FileName,Digit\n'

folder_name = './Dataset/training-b/'
for entry in os.scandir(folder_name):
    if entry.is_file():
        image_path = folder_name + entry.name
        X = loadImage(image_path)
        prediction = network.predict(X)
        pred_file_content += entry.name + ',' + str(prediction) + '\n'

pred_file_name = '170585_prediction.csv'
pred_file = open(pred_file_name, 'w')
pred_file.write(pred_file_content)
pred_file.close()