In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os

In [2]:
def getImage(path):
    # Read image
    image_list = []
    i = 0
    df = pd.read_csv('data/training-a.csv')
    filename = df['filename']
    labels = df['digit']
    # convert to numpy array
    labels = np.array(labels)
    print(labels)
    for filename in os.listdir(path):
        img = cv2.imread(os.path.join(path,filename))
        # print(img)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (128, 128))
        img = (255-img.transpose(2, 0, 1))/255
        image_list.append(img)
        i += 1
        if i == 1:
            break
    # print(len(image_list))
    return np.array(image_list)

In [None]:
getImage('data/training-a')

In [3]:
def getLabel(path):
    # Read label
    df = pd.read_csv(path)
    labels = df['digit']
    # convert to numpy array
    labels = np.array(labels)
    print(labels)
    return labels

In [15]:
class ConvolutionLayer:
    #write code for Convolution
    def __init__(self, num_filters, filter_size, stride, padding):
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.stride = stride
        self.padding = padding
        self.filters = None
        self.biases = None
        self.input_pad = None
        self.input = None
        self.cache = None
        
    def getWindows(self, input, output_size, kernel_size, padding=0, stride=1, dilate=0):
        working_input = input
        working_pad = padding
        # dilate the input if necessary
        if dilate != 0:
            working_input = np.insert(working_input, range(1, input.shape[2]), 0, axis=2)
            working_input = np.insert(working_input, range(1, input.shape[3]), 0, axis=3)

        # pad the input if necessary
        if working_pad != 0:
            working_input = np.pad(working_input, pad_width=((0,), (0,), (working_pad,), (working_pad,)), mode='constant', constant_values=(0.,))

        in_b, in_c, out_h, out_w = output_size
        out_b, out_c, _, _ = input.shape
        batch_str, channel_str, kern_h_str, kern_w_str = working_input.strides

        return np.lib.stride_tricks.as_strided(
            working_input,
            (out_b, out_c, out_h, out_w, kernel_size, kernel_size),
            (batch_str, channel_str, stride * kern_h_str, stride * kern_w_str, kern_h_str, kern_w_str)
        )
        
        
    def forward(self, input):

        # n, c, h, w = input.shape
        batch_size, num_channels, height, width = input.shape
        output_height = (height - self.filter_size + 2 * self.padding) // self.stride + 1
        output_weight = (width - self.filter_size + 2 * self.padding) // self.stride + 1
        
        if self.filters is None:
            self.filters = np.random.randn(self.num_filters, num_channels, self.filter_size, self.filter_size) / np.sqrt(2 / ( num_channels * self.filter_size * self.filter_size))
        if self.biases is None:
            self.biases = np.random.randn(self.num_filters)

        windows = self.getWindows(input, (batch_size, num_channels, output_height, output_weight), self.filter_size, self.padding, self.stride)

        output = np.einsum('bihwkl,oikl->bohw', windows, self.filters)

        # add bias to kernels
        output += self.biases[None, :, None, None]

        self.cache = input, windows
        print("conv forward done. output shape: ", output.shape)
        return output

    def backward(self, output_gradient, learning_rate):
        input, windows = self.cache

        padding = self.filter_size - 1 if self.padding == 0 else self.padding

        dout_windows = self.getWindows(output_gradient, input.shape, self.filter_size, padding=padding, stride=1, dilate=self.stride - 1)
        rotated_filter = np.rot90(self.filters, 2, axes=(2, 3))

        bias_gradient = np.sum(output_gradient, axis=(0, 2, 3))
        filter_gradient = np.einsum('bihwkl,bohw->oikl', windows, output_gradient)
        input_gradient = np.einsum('bohwkl,oikl->bihw', dout_windows, rotated_filter)

        print("conv back korlam. ", input_gradient.shape)
        return input_gradient
    
    def update_parameter(self, filter_gradient, bias_gradient, learning_rate):
        self.filters -= learning_rate * filter_gradient
        self.biases -= learning_rate * bias_gradient
        
    def _forward(self, input):
        self.input = input
        batch_size, num_channels, input_height, input_width = input.shape
        print("batch size: ", batch_size, "num_channels: ", num_channels, "input_height: ", input_height, "input_width: ", input_width)
        
        if self.filters is None:
            self.filters = np.random.randn(self.num_filters, num_channels, self.filter_size, self.filter_size) / np.sqrt(2 / ( num_channels * self.filter_size * self.filter_size))
        if self.biases is None:
            self.biases = np.random.randn(self.num_filters)   
        
        print("conv e filter size: ", self.filters.shape)    
        self.input_pad = np.pad(input, ((0,), (0,), (self.padding,), (self.padding,)), mode='constant')
        self.output = np.zeros((batch_size, self.num_filters, int((input_height - self.filter_size + 2*self.padding)/self.stride + 1), int((input_width - self.filter_size + 2*self.padding)/self.stride + 1)))
        # print("output shape: ", self.output.shape)
        for i in range(batch_size):
            for j in range(self.num_filters):
                for k in range(self.output.shape[2]):
                    for l in range(self.output.shape[3]):
                        self.output[i, j, k, l] = np.sum(self.filters[j, :, :, :] * self.input_pad[i, :, k*self.stride:k*self.stride+self.filter_size, l*self.stride:l*self.stride+self.filter_size]) + self.biases[j]
                        
        print("output shape: ", self.output.shape)
        return self.output
    
    
    def _backward(self, output_gradient, learning_rate):
        #write code for backward pass
        
        batch_size, num_channels, output_height, output_width = output_gradient.shape
        output_height_pad = (output_height - 1) * self.stride + 1
        output_width_pad = (output_width - 1) * self.stride + 1
        
        input_height = self.input_pad.shape[2] - 2 * self.padding
        input_width = self.input_pad.shape[3] - 2 * self.padding
        
        print("input pad shape: ", self.input_pad.shape)
        print("input height: ", input_height, "input width: ", input_width)
        
        bias_gradient = np.sum(output_gradient, axis=(0, 2, 3)) / batch_size
        print("output gradient shape: ", output_gradient.shape)
        output_gradient_sparse = np.zeros((batch_size, self.num_filters, output_height_pad, output_width_pad))
        print("output gradient sparse shape: ", output_gradient_sparse.shape)
        output_gradient_sparse[:, :, :: self.stride, :: self.stride] = output_gradient
        
        print("input pad shape: ", self.input_pad.shape)
        
        filters_ = np.rot90(self.filters, 2, (2, 3))  ### confusion!!
        filter_gradient = np.zeros((self.num_filters, self.input.shape[1], self.filter_size, self.filter_size))
        
        for i in range(self.num_filters):
                for k in range(self.filter_size):
                    for l in range(self.filter_size):
                        filter_gradient[i, :, k, l] = np.sum(self.input_pad[:, i:, k:k+output_height_pad, l:l+output_width_pad] * output_gradient_sparse[:, i, :, :], axis=(0, 2, 3))
        print("conv backprop. filter gradient shape: ", filter_gradient.shape)
        
        input_gradient = np.zeros((batch_size, self.input.shape[1], input_height, input_width))
        print("input gradient shape: ", input_gradient.shape)
        output_gradient_sparse_pad = np.pad(output_gradient_sparse, ((0,), (0,), (self.filter_size-1 - self.padding,), (self.filter_size-1 - self.padding,)), mode='constant')
        print("output gradient sparse pad shape: ", output_gradient_sparse_pad.shape)
        
        for i in range(batch_size):
            for j in range(self.input.shape[1]):
                for k in range(input_height):
                    for l in range(input_width):
                        # calculate input gradient using stride
                        input_gradient[i, j, k, l] = np.sum(filters_[:, j, :, :] * output_gradient_sparse_pad[i, :, k:k+self.filter_size, l:l+self.filter_size])
                        
        print("conv backprop. input gradient shape: ", input_gradient.shape)
        return input_gradient
        

In [3]:
class ReLuActivation:
    # write code for ReLu activation function
    def __init__(self):
        pass
    def forward(self, input):
        self.input = input
        print("relu korlam")
        return np.maximum(0, input)
    def backward(self, output_gradient, learning_rate):
        
        input_gradient = output_gradient * (self.input > 0)
        print("relu back korlam. shape: ", input_gradient.shape)
        return input_gradient
    

In [4]:
class MaxPooling:
    #write code for MaxPooling
    def __init__(self, pool_size, stride):
        self.pool_size = pool_size
        self.stride = stride
        
    def forward(self, input):
        self.input = input
        batch_size, num_channels, input_height, input_width = input.shape
        print("batch size: ", batch_size, "num_channels: ", num_channels, "input_height: ", input_height, "input_width: ", input_width)
        
        # print("before max pooling: ", input)
        self.output = np.zeros((batch_size, num_channels, int((input_height - self.pool_size)/self.stride + 1), int((input_width - self.pool_size)/self.stride + 1)))
        for i in range(batch_size):
            for j in range(num_channels):
                for k in range(self.output.shape[2]):
                    for l in range(self.output.shape[3]):
                        self.output[i, j, k, l] = np.max(input[i, j, k*self.stride:k*self.stride+self.pool_size, l*self.stride:l*self.stride+self.pool_size])
        print("max pooling done. shape: ", self.output.shape)
        # print("after max pooling: ", self.output)
        return self.output
    
    def backward(self, output_gradient, learning_rate):
        input_gradient = np.zeros(self.input.shape)
        batch_size, num_channels, input_height, input_width = output_gradient.shape
        for i in range(batch_size):
            for j in range(num_channels):
                for k in range(input_height):
                    for l in range(input_width):
                        max_index = np.argmax(self.input[i, j, k*self.stride:k*self.stride+self.pool_size, l*self.stride:l*self.stride+self.pool_size])
                        max_index = np.unravel_index(max_index, (self.pool_size, self.pool_size))
                        input_gradient[i, j, k*self.stride:k*self.stride+self.pool_size, l*self.stride:l*self.stride+self.pool_size][max_index] = output_gradient[i, j, k, l]
            
        print("max pooling back done. shape: ", input_gradient.shape)
        # print("after max pooling backprop: ", input_gradient)
        return input_gradient

In [5]:
class Flattening:
    #write code for Flattening
    def __init__(self):
        self.input = None
        
    def forward(self, input):
        print("dhuksi")
        self.input = input
        self.output = input.reshape(input.shape[0], -1)
        print("flattening e dhuksi. shape: ", self.output.shape)
        return self.output
    
    def backward(self, output_gradient, learning_rate):
        input_gradient = output_gradient.reshape(self.input.shape)
        print("flattening er backprop e dhukse. shape: ", input_gradient.shape)
        return input_gradient

In [6]:
class FullyConnectedLayer:
    #write code for FullyConnectedLayer
    def __init__(self, num_units):
        self.num_units = num_units
        self.weights = None
        self.bias = None
        self.input = None
        
    def forward(self, input):
        self.input = input
        if self.weights is None:
            self.weights = np.random.randn(input.shape[1], self.num_units)
        if self.bias is None:
            self.bias = np.zeros(self.num_units)
            
        print("input shape: ", input.shape)
        print("weights shape: ", self.weights.shape)
        print("bias shape: ", self.bias.shape)
    
        self.output = np.dot(input, self.weights) + self.bias
        print("fully connected e dhuksi. shape: ", self.output.shape)
        return self.output
    
    def backward(self, output_gradient, learning_rate):
        self.weights_gradient = np.dot(self.input.T, output_gradient)/output_gradient.shape[1]
        self.bias_gradient = np.mean(output_gradient, axis=0)
        input_gradient = np.dot(output_gradient, self.weights.T)
        self.update(learning_rate)
        print("fc backprop done. input gradient shape: ", input_gradient.shape)
        return input_gradient
    
    
    def update(self, learning_rate):
        self.weights -= learning_rate * self.weights_gradient
        print("fc te weight shape.", self.weights.shape)
        self.bias -= learning_rate * self.bias_gradient
        self.weights_gradient = np.zeros(self.weights.shape)
        self.bias_gradient = np.zeros(self.bias.shape)
    

In [7]:
class SoftMax:
    #write code for SoftMax
    def __init__(self):
        pass
    def __str__(self):
        return "Softmax"
    def forward(self, input):
        self.input = input
        #normalize input
        self.input -= np.max(self.input, axis=1, keepdims=True)
        self.output = np.exp(self.input)
        self.output /= np.sum(self.output, axis=1, keepdims=True)
        print("forward sheshhhhh!!!!!!")
        return self.output
    def backward(self, output_gradient, learning_rate):
        print("softmax er backprop dhukse. shape: ", output_gradient.shape)
        return output_gradient

In [8]:
def model():
    layers = []
    layers.append(ConvolutionLayer(3, 3, 1, 1))
    layers.append(ReLuActivation())
    layers.append(MaxPooling(2, 2))
    layers.append(ConvolutionLayer(3, 3, 1, 1))
    layers.append(ReLuActivation())
    layers.append(MaxPooling(2, 2))
    layers.append(Flattening())
    layers.append(FullyConnectedLayer(10))
    layers.append(SoftMax())
    print("model created")
    return layers

In [9]:
def forward_propagation(layers, input):
    for layer in layers:
        input = layer.forward(input)
    return input

def backward_propagation(layers, output_gradient, learning_rate):
    for layer in reversed(layers):
        output_gradient = layer.backward(output_gradient, learning_rate)
        
def update_params(layers, learning_rate):
    for layer in layers:
        layer.update(learning_rate)

In [16]:
input_shape = (2, 10, 20, 20)
input = np.random.randint(-10,10,size = input_shape)
# print("input matrix: ", input)
y = [1]
layers = model()
conv_forward_output = forward_propagation(layers, input)
gradient = conv_forward_output - y
conv_back_output = backward_propagation(layers, gradient, 0.01)

model created
conv forward done. output shape:  (2, 3, 20, 20)
relu korlam
batch size:  2 num_channels:  3 input_height:  20 input_width:  20
max pooling done. shape:  (2, 3, 10, 10)
conv forward done. output shape:  (2, 3, 10, 10)
relu korlam
batch size:  2 num_channels:  3 input_height:  10 input_width:  10
max pooling done. shape:  (2, 3, 5, 5)
dhuksi
flattening e dhuksi. shape:  (2, 75)
input shape:  (2, 75)
weights shape:  (75, 10)
bias shape:  (10,)
fully connected e dhuksi. shape:  (2, 10)
forward sheshhhhh!!!!!!
softmax er backprop dhukse. shape:  (2, 10)
fc te weight shape. (75, 10)
fc backprop done. input gradient shape:  (2, 75)
flattening er backprop e dhukse. shape:  (2, 3, 5, 5)
max pooling back done. shape:  (2, 3, 10, 10)
relu back korlam. shape:  (2, 3, 10, 10)
max pooling back done. shape:  (2, 3, 20, 20)
relu back korlam. shape:  (2, 3, 20, 20)


In [29]:
def test_model(layers, test_input, test_output):
    test_output_pred = forward_propagation(layers, test_input)
    test_output_pred = np.argmax(test_output_pred, axis=1)
    test_output = np.argmax(test_output, axis=1)
    accuracy = np.sum(test_output_pred == test_output) / test_output.shape[0]
    print("accuracy: ", accuracy)

In [30]:
def train_model(layers, X_train, Y_train, X_test, Y_test, num_epochs, learning_rate):
    for epoch in range(num_epochs):
        print("epoch: ", epoch)
        output = forward_propagation(layers, X_train)
        loss = np.sum(-Y_train * np.log(output))
        print("loss: ", loss)
        output_gradient = output - Y_train
        backward_propagation(layers, output_gradient)
        update_params(layers, learning_rate)
        test_model(layers, X_test, Y_test)
    print("training complete")

In [None]:
# call the train model
layers = model()
X_train = getImage('data/training-a/')
Y_train = getLabel('data/training-a.csv')
X_test = getImage('data/training-b/')
Y_test = getLabel('data/training-b.csv')
train_model(layers, X_train, Y_train, X_test, Y_test, 1, 0.01)