In [None]:
import numpy as np
import math
import struct as st
import matplotlib.pyplot as plt

In [None]:
def rescale(image):
    image = 255 - image.astype('float64')
    image = np.interp(image, (image.min(), image.max()), (-1, +1))
    return image

In [None]:
def create_kernel(size):
    
    """
    Width and height of the kernel where width = height
    """
    
    x = np.random.normal(scale = 1.0, size = size**2)
    # to ensure the initialization number is not too big
    f = lambda x: x if abs(x) < 1 else x/2
    try:
        while np.any(abs(y) > 1):
            y = np.array(tuple(map(f,y)))
    except:
        y = np.array(tuple(map(f,x)))
        
    return y.reshape(size,size)

In [None]:
def create_weights_bias(input_size, output_size):
    
    """
    Create weights and bias for fully connected layer. 
    """
    
    x = np.random.normal(scale = 1.0, size = input_size*output_size)
    b = np.random.normal(scale = 1.0, size = output_size)
    # to ensure the initialization number is not too big
    f = lambda x: x if abs(x) < 1 else x/2
    
    try:
        while np.any(abs(y) > 1):
            y = np.array(tuple(map(f,y)))
    except:
        y = np.array(tuple(map(f,x)))
        
    try:
        while np.any(abs(z) > 1):
            z = np.array(tuple(map(f,z)))
    except:
        z = np.array(tuple(map(f,b)))
        
    return {"weights": y.reshape(input_size,output_size),
           "bias": z}

In [None]:
def fcn_forward(inputs, weight_bias):
    
    """
    Feed forward for fully connected layers
    """
    
    weights = weight_bias['weights']
    bias = weight_bias['bias']
    output = np.dot(inputs, weights)+bias
    return output

def fcn_backward(backprop_prev, X, weight_bias, lr):
    
    """
    Backpropagation for fully connected layers, inputs required are:
    1. Gradient with respect to loss from previous layer
    2. The inputs in the feed forward layer.
    3. Weights and bias in this fully connected layer
    4. Learning Rate
    
    Return:
    1. Gradient with respect to loss of weights, inputs and bias.
    2. Updated weights and bias for the next batch of data.
    """
    
    X = np.array([X])
    weights = weight_bias['weights']
    bias = weight_bias['bias']
    backprop_prev_T = np.array([backprop_prev]).T
    dw = np.dot(backprop_prev_T, X).T
    weights_T = weights.T
    dx = np.dot(backprop_prev, weights_T)
    db = backprop_prev
    
    weights -= lr*dw
    bias -= lr*db
    
    return {"dw": dw,
           "dx":dx,
           "db": db,
           "fcn_weight_bias": {
               "weights": weights,
               "bias": bias
           }}

In [None]:
def conv_forward(X, kernel):
    
    """
    Feed forward for convolution layer.
    Current limitation: No striding and padding
    """
    
    # assume only square images and square kernels
    input_size = X.shape[0]
    kernel_size = kernel.shape[0]
    
    # assume stride 1 with no padding
    output_size = input_size - kernel_size + 1
    output = np.zeros((output_size, output_size))
    
    for height in range(output_size):
        for width in range(output_size):
            input_slice = X[height:height+kernel_size, width:width+kernel_size]
            output[height, width] = np.sum(np.multiply(input_slice, kernel))
    
    return output

def conv_backprop(backprop_prev, X, kernel, lr):
    
    """
    Backpropagation of convolution layer, inputs required are:
    1. Gradient with respect to loss from previous layer (in the conv_forward output shape)
    2. The inputs in the feed forward layer.
    3. Kernel in this convolution layer.
    4. Learning Rate
    
    Return:
    1. Gradient with respect to loss of weights and inputs.
    2. Updated kernel for the next batch of data.
    """
    
    # getting the size of the backprop from previous gradient
    # assuming square
    input_size = X.shape[0]
    backprop_prev_size = backprop_prev.shape[0]
    kernel_size = kernel.shape[0]
    dw = np.zeros((kernel_size, kernel_size))
    dx = np.zeros((input_size, input_size))
    
    for h in range(backprop_prev_size):
        for w in range(backprop_prev_size):
            dx[h:h+kernel_size, w:w+kernel_size] += kernel*backprop_prev[h,w]
            dw += X[h:h+kernel_size, w: w+kernel_size]*backprop_prev[h,w]
            
    kernel -= lr*dw
    
    return {"dw": dw, "dx": dx, "kernel": kernel} 

In [None]:
def maxpool_forward(X, pool_size, stride):
    
    """
    Feed forward of maxpooling.
    Current limitation: No padding.
    Requires input, size of pooling and stride.
    
    Returns output after maxpooling and the input index for each of the output (to be used later during backprop)
    """
    
    # assume no padding
    input_size = X.shape[0]
    
    # assume no padding
    output_size = math.ceil((input_size - pool_size)/stride + 1)
    output = np.zeros((output_size, output_size))
    max_index = np.empty((output_size, output_size), dtype = object)
    grad_matrix = np.zeros((input_size, input_size))
    
    for height in range(output_size):
        for width in range(output_size):
            input_slice = X[height*stride:(height*stride+pool_size), 
                            width*stride:(width*stride+pool_size)]
            output[height, width] = np.max(input_slice)
            local_max_location = np.unravel_index(np.argmax(input_slice, axis = None), input_slice.shape)
            max_index[height, width] = np.asarray(local_max_location)+[height*stride,width*stride]
            
#     max_index = max_index.flatten()
    
#     for index in max_index:
#         grad_matrix[index[0], index[1]] += 1.00
            
    return {"maxpool_input": X, "maxpool_output": output, "max_index_location": max_index}


def maxpool_backprop(input_size, backprop_prev, max_index_location):
    
    """
    Backpropagation for the maxpool layer.
    Require:
    1. The size of the input when passed throught this layer.
    2. The gradient flowing from the previous layer.
    3. The index of the input that contributed to the output of the maxpool layer.
    """

    backprop_prev_size = backprop_prev.shape[0]
    maxpool_grad = np.zeros((input_size, input_size))
    
    for height in range(backprop_prev_size):
        for weight in range(backprop_prev_size):
            max_loc = max_index_location[height, weight]
            maxpool_grad[max_loc[0], max_loc[1]] += backprop_prev[height, weight]
            
    return maxpool_grad

In [None]:
def relu(input_array):
    
    """
    Turns any inputs that are negative to zero.
    Returns 1 for index of the inputs that are non-zero after RELU for backpropagation purpose.
    """
    
    shape = input_array.shape
    grad_matrix = np.zeros(shape)
    input_array[input_array < 0] = 0
    nonzero_location = np.asarray(input_array.nonzero()).T
    for index in nonzero_location:
        grad_matrix[index[0], index[1]] = 1.00
        
    return grad_matrix

In [None]:
def stable_softmax(input_vector):
    
    """
    Refer to https://deepnotes.io/softmax-crossentropy
    """
    
    exps = np.exp(input_vector - np.max(input_vector))
    return exps / np.sum(exps)

def cross_entropy_loss(pred_array, actual):
    
    pred_prob = pred_array[actual]
    loss_log_likelihood = -np.log(pred_prob)
    
    return loss_log_likelihood


def delta_cross_entropy(X,y):
    
    """
    Refer to https://deepnotes.io/softmax-crossentropy
    Returns loss and gradient up to the softmax layer.
    """
    
    grad = stable_softmax(X)
    pred = grad.argmax()
    loss = cross_entropy_loss(grad,y)
    grad[y] -= 1
    return {"loss": loss,
           "gradient": grad,
           "pred": pred}

In [None]:
def train(input, weights = None):
    
    if weights is None:
        # Weights, kernels and bias initialization
        conv1_kernel = create_kernel(3)
        conv2_kernel = create_kernel(2)
        fcn1_weight_bias = create_weights_bias(36, 28)
        fcn2_weight_bias = create_weights_bias(28,10)
    
    else:
        conv1_kernel = weights['conv1_kernel']
        conv2_kernel = weights['conv2_kernel']
        fcn1_weight_bias = weights['fcn1_weight_bias']
        fcn2_weight_bias = weights['fcn2_weight_bias']
    
    #training for loop
    for counter, image_label in enumerate(input):
        label = image_label['label']
        input_image = image_label['images']
        # first layer set
        x1 = conv_forward(input_image, conv1_kernel)
        maxpool1_size = x1.shape[0]
        relu_1 = relu(x1)
        y1 = maxpool_forward(x1,2,2)
        x1 = y1['maxpool_output']
        
        # second layer set
        x2 =  conv_forward(x1, conv2_kernel)
        maxpool2_size = x2.shape[0]
        relu_2 = relu(x2)
        y2 = maxpool_forward(x2,2,2)
        x2 = y2['maxpool_output']
        
        # third layer set
        x3 = x2.flatten() # flatten into one d array
        x4 = fcn_forward(x3, fcn1_weight_bias)
        x5 = fcn_forward(x4, fcn2_weight_bias)
        
        z = delta_cross_entropy(x5, label)
        print('Image ' + str(counter) + ' loss: ' + str(z['loss']) + '. predicted: ' + str(z['pred']) + 
              ', actual: ' + str(label))
        
        fcn2_backprop = fcn_backward(z['gradient'], x4, fcn2_weight_bias, 0.0001)
        x5_backprop, fcn2_weight_bias = fcn2_backprop['dx'], fcn2_backprop['fcn_weight_bias']
        
        fcn1_backprop = fcn_backward(x5_backprop, x3, fcn1_weight_bias, 0.0001)
        x4_backprop, fcn1_weight_bias = fcn1_backprop['dx'], fcn1_backprop['fcn_weight_bias']
        
        x3_backprop = x4_backprop.reshape(x2.shape)
        x2_maxpool_backprop = maxpool_backprop(maxpool2_size, x3_backprop, y2['max_index_location'])
        x2_relu_backprop = np.multiply(x2_maxpool_backprop, relu_2)
        x2_conv_backprop = conv_backprop(x2_relu_backprop, x1, conv2_kernel, 0.0001)
        x2_backprop, conv2_kernel = x2_conv_backprop['dx'], x2_conv_backprop['kernel']
        
        x1_maxpool_backprop = maxpool_backprop(maxpool1_size, x2_backprop, y1['max_index_location'])
        x1_relu_backprop = np.multiply(x1_maxpool_backprop, relu_1)
        x1_conv_backprop = conv_backprop(x1_relu_backprop, input_image, conv1_kernel, 0.0001)
        conv1_kernel = x1_conv_backprop['kernel']
    
    return {
        "conv1_kernel": conv1_kernel,
        "conv2_kernel": conv2_kernel,
        "fcn1_weight_bias": fcn1_weight_bias,
        "fcn2_weight_bias": fcn2_weight_bias
    }

In [None]:
filename = {'images' : './data/raw/train-images-idx3-ubyte' ,'labels' : './data/raw/train-labels-idx1-ubyte'}
train_imagesfile = open(filename['images'],'rb')

In [None]:
train_imagesfile.seek(0)
magic = st.unpack('>4B',train_imagesfile.read(4))

In [None]:
Img = st.unpack('>I',train_imagesfile.read(4))[0] #num of images
nR = st.unpack('>I',train_imagesfile.read(4))[0] #num of rows
nC = st.unpack('>I',train_imagesfile.read(4))[0] #num of column
images_array = np.zeros((Img,nR,nC))
nBytesTotal = Img*nR*nC*1 #since each pixel data is 1 byte
images_array = 255 - np.asarray(st.unpack('>'+'B'*nBytesTotal,train_imagesfile.read(nBytesTotal))).reshape((Img,nR,nC))
train_imagesfile.close()

In [None]:
train_label = open(filename['labels'], 'rb')
train_label.read(8)
train_label_list = []
for i in range(20):
    label = ord(train_label.read(1))
    train_label_list.append(label)

In [None]:
input_dict_list = []

In [None]:
for i in range(len(train_label_list)):
    image = rescale(images_array[i])
    input_dict = {"images": image,
                 "label": train_label_list[i]}
    input_dict_list.append(input_dict)

In [None]:
parameters = train(input_dict_list)

In [None]:
for i in range(100):
    parameters = train(input_dict_list, parameters)