<center>
    
# COL341 Spring 2023 <br> Assignment 4 : CNN  
## Part 1
### Amaiya Singhal
</center>

In [1]:
import numpy as np

## Defining some helper functions

In [2]:
def softmax(v):
    exp_array = np.exp(v - np.max(v))
    return exp_array / np.sum(exp_array)

In [3]:
def relu(map):    
    output_tensor = np.maximum(map, 0)
    return output_tensor

In [4]:
def pooling(map, dim):
    n_channels = map.shape[0]
    map_size = map.shape[1]
    output_size = map.shape[1]//dim
    return map.reshape(n_channels, output_size, dim, output_size, dim).max(axis=(2,4))

In [5]:
def convolution(sample, kernel):
    kernel_size = kernel.shape[1]
    pad = kernel_size//2
    n, h, w = sample.shape
    
    out_sample = np.zeros((n, h+2*pad, w+2*pad))
    for i in range(n):
        out_sample[i] = np.pad(sample[i], (pad,), 'constant', constant_values = 0)
    sample = out_sample
    
    size_feature_map = h
    n_out_channels = kernel.shape[0]

    output_tensor = np.zeros((n_out_channels, size_feature_map, size_feature_map))

    for i in range(n_out_channels):
        current_kernel = kernel[i]

        for r in range(size_feature_map):
            for c in range(size_feature_map):
                window = sample[:, r : r + kernel_size, c : c + kernel_size]
                value = np.sum(window*current_kernel, axis = None)
                output_tensor[i, r, c] = value

    return output_tensor

In [6]:
def other_convolution(sample, kernel, pad):
    kernel_size = kernel.shape[1]
    n, h, w = sample.shape
    
    out_sample = np.zeros((n, h+2*pad, w+2*pad))
    for i in range(n):
        out_sample[i] = np.pad(sample[i], (pad,), 'constant', constant_values = 0)
    sample = out_sample
    
    size_feature_map = h + 2*pad - kernel_size + 1
    n_out_channels = kernel.shape[0]

    output_tensor = np.zeros((n_out_channels, size_feature_map, size_feature_map))

    for i in range(n_out_channels):
        current_kernel = kernel[i]

        for r in range(size_feature_map):
            for c in range(size_feature_map):
                window = sample[:, r : r + kernel_size, c : c + kernel_size] 
                value = np.sum(window*current_kernel, axis = None)
                output_tensor[i, r, c] = value

    return output_tensor

In [7]:
def convolve2d(sample, kernel):
    size_feature_map = sample.shape[0]
    kernel_size = kernel.shape[0]
    pad = kernel_size//2
    
    sample = np.pad(sample, (pad,), 'constant', constant_values = 0)
    
    output_tensor = np.zeros((size_feature_map, size_feature_map))
    for r in range(size_feature_map):
        for c in range(size_feature_map):
            window = sample[r : r + kernel_size, c : c + kernel_size]
            value = np.sum(window*kernel, axis = None)
            output_tensor[r, c] = value

## CONV2D Class for the Convolution Layers
Forward and Backward Pass has been implemented

In [8]:
class conv2d:
    def __init__(self, num, size):
        self.kernel = np.random.randn(num, size, size)/4096 # Xavier Initialisation
        self.bias = np.random.rand(num, size, size)/4096
        self.layer_input = None
        self.layer_activated = None
        self.kernel_grad = None
        self.size = size
    
    def forward_pass(self, sample):
        self.layer_input = sample
        output_tensor = convolution(sample, self.kernel)
        self.layer_activated = relu(output_tensor)
        return self.layer_activated
        
    def backward_pass(self, inp_grad):
        n, h, w = self.layer_input.shape
        pass_grad = np.zeros((n,h,w))
        relu_mat = self.layer_activated
        relu_mat[np.nonzero(relu_mat)] = 1
        inp_grad = inp_grad * relu_mat
        
        kernel_grad = other_convolution(self.layer_input, inp_grad, self.size//2)
        # print("here: ", inp_grad.shape, self.layer_input.shape, kernel_grad.shape)
        # check this kernel gradient once, need to perform full convolution
        self.kernel_grad = kernel_grad
        
        not_final = np.zeros((inp_grad.shape[0], h, w))
        for i in range(inp_grad.shape[0]):
            curr_grad = inp_grad[i]
            curr_kernel = np.flip(self.kernel[i], axis = (0,1))
            not_final[i] = convolve2d(curr_grad, curr_kernel)
        still_not_final = np.sum(pass_grad, axis = 0)
        
        for i in range(n):
            pass_grad[i] = still_not_final
        return pass_grad
        
        # flip_kernel = np.flip(kernel_grad, axis=(0,1))
        # not_final = convolution(inp_grad, flip_kernel)
        # still_not_final = np.sum(not_final, axis = 0)
        
    def update(self):
        self.kernel -= 0.001*self.kernel_grad
        return None

## MAXPOOL2D Class for the Pooling Layers
Forward and Backward Pass has been implemented

In [9]:
class maxpool2d:
    def __init__(self, dim):
        self.dim = dim
        self.layer_input = None
        self.layer_output = None
        
    def forward_pass(self , sample):
        self.layer_input = sample
        self.layer_output = pooling(sample, 2)
        return self.layer_output
    
    def backward_pass(self, inp_grad):
        n, h, w = self.layer_input.shape
        x = self.layer_input
        dim = self.dim
        
        pass_mat = np.zeros((n,h,w))

        for i in range(n):
            for r in range(0, h-1, dim):
                for c in range(0, w-1, dim):
                    window = x[i, r:r+dim, c:c+dim]
                    max_ind = np.unravel_index(window.argmax(), window.shape)
                    pass_mat[i, r:r+dim, c:c+dim][max_ind] = inp_grad[i, r//2, c//2]
            
        return pass_mat

## FC_1 Class for the First Fully Connected Layer
Forward and Backward Pass has been implemented

In [10]:
class fc_1:
    def __init__(self, size, next_size):
        self.weights = np.random.randn(next_size, size)/4096
        self.bias = np.random.randn(1, next_size)/4096
        self.weights_grad = None
        self.bias_grad = None
        self.layer_input = None
        self.layer_output = None
        self.layer_output_active = None
        
    def forward_pass(self, sample):
        self.layer_input = sample
        output = sample @ self.weights.T + self.bias
        self.layer_output = output
        self.layer_output_active = relu(output)
        return self.layer_output_active
    
    def backward_pass(self, inp_grad):
        relu_mat = self.layer_output_active
        relu_mat[np.nonzero(relu_mat)] = 1
        relued_grad = inp_grad * relu_mat
        pass_grad = relued_grad.reshape(1,-1) @ self.weights
        self.weights_grad = relued_grad.reshape(-1,1) @ self.layer_input.reshape(1,-1)
        self.bias_grad = relued_grad.reshape(1,-1)
        return pass_grad
    
    def update(self):
        self.weights -= 0.001*self.weights_grad
        self.bias -= 0.001*self.bias_grad

## FC_2 Class for the Second Fully Connected Layer
Forward and Backward Pass has been implemented

In [11]:
class fc_2:
    def __init__(self, size, next_size):
        self.weights = np.random.randn(next_size, size)/64
        self.bias = np.random.randn(1, next_size)/64
        self.weights_grad = None
        self.bias_grad = None
        self.layer_input = None
        self.layer_output = None
        
    def forward_pass(self, sample):
        self.layer_input = sample
        output = sample @ self.weights.T + self.bias
        self.layer_output = output
        return self.layer_output
    
    def backward_pass(self, inp_grad):
        pass_grad = inp_grad.reshape(1,-1) @ self.weights
        self.weights_grad = inp_grad.reshape(-1,1) @ self.layer_input.reshape(1,-1)
        # print(inp_grad.shape)
        self.bias_grad = inp_grad.reshape(1,-1)
        return pass_grad
    
    def update(self):
        self.weights -= 0.001*self.weights_grad
        self.bias -= 0.001*self.bias_grad

## Loading the CIFAR-10 Data

In [12]:
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [13]:
set1 = unpickle("./content/data_batch_1")
set2 = unpickle("./content/data_batch_2")
set3 = unpickle("./content/data_batch_3")
set4 = unpickle("./content/data_batch_4")
set5 = unpickle("./content/data_batch_5")
x_train = np.vstack((set1[b'data'], set2[b'data'], set3[b'data'], set4[b'data'], set5[b'data']))
y_train = np.hstack((np.array(set1[b'labels']), np.array(set2[b'labels']), np.array(set3[b'labels']), np.array(set4[b'labels']) ,np.array(set5[b'labels']) ))
x_train.shape, y_train.shape

((50000, 3072), (50000,))

In [14]:
x_trial = []
y_trial = y_train[0:1000]
for i in range(1000):
    x_trial.append(x_train[i].reshape(3,32,32))

## Defining the Train Function

In [16]:
def train(x, y, epochs, batch_size):
    conv1 = conv2d(32, 3)
    pool1 = maxpool2d(2)
    conv2 = conv2d(64, 5)
    pool2 = maxpool2d(2)
    conv3 = conv2d(64, 3)
    fc1 = fc_1(4096, 64)
    fc2 = fc_2(64, 10)
    num_batch = 150
    parameters = []
    for i in range(epochs):
        perm = np.random.permutation(x.shape[0])
        x = x[perm]
        y = y[perm]
        print("EPOCH: ", i)
        total_loss = 0
        for j in range(num_batch):
            update_fc2_w = []
            update_fc1_w = []
            update_fc2_b = []
            update_fc1_b = []
            update_conv3 = []
            update_conv2 = []
            update_conv1 = []
            count = 0
            loss = 0
            for k in range(batch_size):
                a1 = conv1.forward_pass(x[j*32 + k])
                a2 = pool1.forward_pass(a1)
                a3 = conv2.forward_pass(a2)
                a4 = pool2.forward_pass(a3)
                a5 = conv3.forward_pass(a4).flatten()
                a6 = fc1.forward_pass(a5)
                a7 = fc2.forward_pass(a6)
                out = softmax(a7)
                onehot = np.zeros(10)
                onehot[y[j*32 + k]] = 1
                grad_fc2 = out - onehot
                grad_fc1 = fc2.backward_pass(grad_fc2.reshape(1,-1))
                grad_conv3 = fc1.backward_pass(grad_fc1).reshape(64, 8, 8)
                grad_pool2 = conv3.backward_pass(grad_conv3)
                grad_conv2 = pool2.backward_pass(grad_pool2)
                grad_pool1 = conv2.backward_pass(grad_conv2)
                grad_conv1 = pool1.backward_pass(grad_pool1)
                init_grad = conv1.backward_pass(grad_conv1)
                update_fc2_w += [fc2.weights]
                update_fc1_w += [fc1.weights]
                update_fc2_b += [fc2.bias] 
                update_fc1_b += [fc1.bias]
                update_conv3 += [conv3.kernel]
                update_conv2 += [conv2.kernel]
                update_conv1 += [conv1.kernel]
                if out.argmax() == y[j*32 + k]:
                    count += 1
                loss -= np.log(np.max(out))
            print("Batch: ", j, ", correct: ", count, ", loss: ", loss/32)
            total_loss += loss/32
            fc2.weights = np.sum(update_fc2_w, axis = 0)/32 
            fc1.weights = np.sum(update_fc1_w, axis = 0)/32
            fc2.bias = np.sum(update_fc2_b, axis = 0)/32
            fc1.bias = np.sum(update_fc1_b, axis = 0)/32
            conv3.kernel = np.sum(update_conv3, axis = 0)/32
            conv2.kernel = np.sum(update_conv2, axis = 0)/32
            conv1.kernel = np.sum(update_conv1, axis = 0)/32   
            fc2.update()
            fc1.update()
            conv3.update()
            conv2.update()
            conv1.update()
        parameters += [[fc2.weights, fc1.weights, fc2.bias, fc1.bias, conv3.kernel, conv2.kernel, conv1.kernel]]
        print("Total Loss = ", total_loss/150)
        print()
    return parameters

In [17]:
x_me = []
y_me = []
for i in range(10):
    for j in range(480):
        x_me.append(x_train[5000*i+j].reshape(3,32,32))
        y_me.append(y_train[5000*i+j])
x_me = np.array(x_me)
y_me = np.array(y_me)

## Training the Network

In [18]:
per_epoch_parameters = train(x_me, y_me, 10, 32)

EPOCH:  0
Batch:  0 , correct:  3 , loss:  2.2821732033037847
Batch:  1 , correct:  1 , loss:  2.282276138681037
Batch:  2 , correct:  4 , loss:  2.2823799926750468
Batch:  3 , correct:  2 , loss:  2.2824824413988916
Batch:  4 , correct:  5 , loss:  2.2825863539235938
Batch:  5 , correct:  4 , loss:  2.282690347989605
Batch:  6 , correct:  3 , loss:  2.282794854552202
Batch:  7 , correct:  3 , loss:  2.282894971897414
Batch:  8 , correct:  4 , loss:  2.2829989895167575
Batch:  9 , correct:  1 , loss:  2.2830991785681376
Batch:  10 , correct:  6 , loss:  2.283199437021148
Batch:  11 , correct:  4 , loss:  2.283301823903812
Batch:  12 , correct:  6 , loss:  2.2834018710430435
Batch:  13 , correct:  5 , loss:  2.283503698684125
Batch:  14 , correct:  2 , loss:  2.2836056337213777
Batch:  15 , correct:  2 , loss:  2.283708794722919
Batch:  16 , correct:  5 , loss:  2.283808857443812
Batch:  17 , correct:  4 , loss:  2.2839090179104624
Batch:  18 , correct:  2 , loss:  2.2840109746603603
Ba