In [1]:
import pickle
import numpy as np
import os

#from google.colab import drive
#drive.mount('/content/drive')

In [2]:
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def get_input(input_path):
    n = 0
    for file in os.listdir(input_path):
        n += 1
    Batch = [0 for _ in range(n)]

    i = 0
    for file in os.listdir(input_path):
        Batch[i] = unpickle(input_path+"/"+file)
        i +=1
    output_vector = np.concatenate([Batch[i][b'labels'] for i in range(n)])
    feature_vector = np.concatenate([Batch[i][b'data'] for i in range(n)])
    return feature_vector,output_vector

#feature_vector, output_vector = get_input("drive/MyDrive/COL341-A4/Train Set")
#feature_val, output_val = get_input("drive/MyDrive/COL341-A4/Validation Set")
feature_vector, output_vector = get_input("Train Set")
feature_val, output_val = get_input("Validation Set")

In [3]:
print(type(feature_vector))
print(feature_vector.shape)

print(type(output_vector))
print(output_vector)
print(min(output_vector))
print(max(output_vector))
#Labels from 0 to 9
print(output_vector.shape)

print(feature_val.shape)
print(output_val.shape)
#6000 Images per class

<class 'numpy.ndarray'>
(50000, 3072)
<class 'numpy.ndarray'>
[6 9 9 ... 9 1 1]
0
9
(50000,)
(10000, 3072)
(10000,)


In [4]:
def relu(x):
    return(x*(x>0))

def relu_deriv(x):
    deriv = (x>0).astype(int)
    return deriv

def CELoss(true, pred):
    return abs(true-pred)

def CELoss_deriv(true, pred):  #Check abs() #TODO
    return 1 if int(true)!=int(pred) else 0

In [5]:
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

In [6]:
class Linear(Layer):
    def __init__(self,input_size, output_size):
        super().__init__()
        self.weights = np.random.rand(input_size,output_size) - 0.5
        self.bias = np.random.rand(1,output_size) - 0.5

    def forward(self,input_vector):
        self.input = input_vector.reshape(1,input_vector.size)
        self.output = np.dot(self.input,self.weights) + self.bias
        return self.output
    
    def backward(self, output_error, lr):
        input_error = np.matmul(output_error, self.weights.T)
        weights_error = np.matmul(self.input.T, output_error)

        self.weights = self.weights - lr*weights_error
        self.bias = self.bias - lr*output_error
        return input_error

class ReLU(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input_data):
        self.input = input_data
        self.output = relu(self.input)
        return self.output

    def backward(self, output_error, lr):
        return relu_deriv(self.input) * output_error
    
class Flatten(Layer):   #TODO Verify
    def __init__(self):
        super().__init__()
    
    def forward(self, input_data):
        self.input = input_data
        self.output = np.copy(input_data).reshape((1,input_data.size))
        return self.output

    def backward(self, output_error, lr):
        return output_error.reshape(self.input.shape)

In [7]:
def convolve(input,weight,padding):
    x = int((input.shape[0] - weight.shape[0] + 2 * padding) + 1)
    y = int((input.shape[1] - weight.shape[1] + 2 * padding) + 1)
    output = np.zeros((x,y))

    padded_input = np.zeros((input.shape[0]+2*(padding),input.shape[1]+2*padding))
    padded_input[padding:-1*padding,padding:-1*padding] = np.copy(input)

    for col in range(padded_input.shape[1] - weight.shape[1]+1):
        for row in range(padded_input.shape[0] - weight.shape[0]+1):
            temp = weight * padded_input[row:row+weight.shape[0],col:col+weight.shape[1]]
            output[row,col] = np.sum(temp)
    return output

In [8]:
class Conv2D(Layer):
    def __init__(self,in_channels,out_channels,kernel_size):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size

        self.weight_list = [0 for i in range(out_channels)]
        for i in range(out_channels):
            temp_list = [0 for j in range(in_channels)]
            for j in range(in_channels):
                weight = np.random.rand(kernel_size,kernel_size) -0.5
                temp_list[j] = weight
            self.weight_list[i] = temp_list
        self.bias = np.random.rand(out_channels) - 0.5

    def forward(self, input_data):  #Input as a 3D np array - 3 2Ds jaisa
        temp_input = input_data[0]
        self.input = input_data
        self.output = np.zeros((self.out_channels,temp_input.shape[0],temp_input.shape[1]))

        for i in range(self.out_channels):
            for j in range(self.in_channels):
                input = input_data[j]
                weight = self.weight_list[i][j]
                self.output[i,:,:] = self.output[i,:,:] + convolve(input,weight,padding = (weight.shape[0]-1)//2)
            self.output[i,:,:] = self.output[i,:,:] #+ self.bias[i]
        return self.output
    
    def backward(self,output_error,lr):
        #Update Bias check
        #Pass error to prev layer

        weights_error = np.zeros_like(self.weight_list)

        for i in range(self.out_channels):#Output_error for each output_channel
            op_error = output_error[i]
            for j in range(self.in_channels):
                input = self.input[j]
                error = convolve(input,op_error,(self.kernel_size-1)//2)
                weights_error[i,j] = error
        #W Updated

        input_error = np.zeros_like(self.input,dtype=float)
        for j in range(self.in_channels):
            error = input_error[j]
            for i in range(self.out_channels):
                op_error = output_error[i]
                weight = self.weight_list[i][j]
                weight180 = np.flipud(weight)
                error += convolve(op_error,weight180,(weight180.shape[0]-1)//2)

        for i in range(self.out_channels):
            for j in range(self.in_channels):
                self.weight_list[i][j] -= lr*weights_error[i,j]
        return input_error

In [9]:
class MaxPool2D(Layer):
    def __init__(self,kernel_size):
        super().__init__()
        self.kernel_size = kernel_size #Assuming stride is kernel_size

    def forward(self, input_data):
        d1 = int((input_data.shape[1])/self.kernel_size)
        d2 = int((input_data.shape[2])/self.kernel_size)
        self.output = np.zeros((input_data.shape[0],d1,d2))
        for i in range(input_data.shape[0]):
            op = np.zeros((d1,d2))
            ip = input_data[i]
            for a in range(d1):
                for b in range(d2):
                    op[a,b] = np.amax(ip[a*self.kernel_size:(a+1)*self.kernel_size,b*self.kernel_size:(b+1)*self.kernel_size])
            self.output[i] = op
        self.input = input_data
        return self.output
    
    def backward(self,output_error,lr):
        input_data = self.input
        d1 = int((input_data.shape[1])/self.kernel_size)
        d2 = int((input_data.shape[2])/self.kernel_size)
        
        input_error = np.zeros_like(self.input)

        for i in range(input_data.shape[0]):
            ip = input_data[i]
            for a in range(d1):
                for b in range(d2):
                    m = ip[a*self.kernel_size:(a+1)*self.kernel_size,b*self.kernel_size:(b+1)*self.kernel_size]
                    rel_posn = np.unravel_index(np.argmax(m, axis=None), m.shape)
                    posn = [a*self.kernel_size,b*self.kernel_size]
                    posn[0] += rel_posn[0]
                    posn[1] += rel_posn[1]
                    input_error[i,posn[0],posn[1]] = lr*output_error[i,a,b]
        return input_error

In [13]:
class Network:
    def __init__(self,layers,loss,loss_deriv):
        self.layers = layers
        self.loss = loss
        self.loss_deriv = loss_deriv

    def predict(self, input_data):
      #Change code of this function whole
      samples,_ = input_data.shape
      result = []
      for i in range(samples):
          output = input_data[i]
          for layer in self.layers:
              output = layer.forward(output)
          result.append(output)
      return result

    def fit(self, x_train, y_train, n_epochs, lr, batch_size):
        n = len(y_train)

        for i in range(n_epochs):
          epoch_loss = 0
          for j in range(n):
              output = np.copy(x_train[j])
              for l in self.layers:
                output = l.forward(output)

              op = np.argmax(output)
              epoch_loss += self.loss(y_train[j],op)
              dev = self.loss_deriv(y_train[j],op)

              if(j%batch_size==0):
                error_dev = dev*(output)/np.linalg.norm(output)
                continue
              elif(j%batch_size==31):
                 error_dev += dev*(output)/np.linalg.norm(output)
                 error_dev = error_dev/batch_size
                 #print("Back at ",j)
              else:
                error_dev += dev*(output)/np.linalg.norm(output)
                continue
              for l in reversed(self.layers):
                error_dev = l.backward(error_dev, lr)
          mean_loss = epoch_loss/n
          print("Epoch :",i+1," and Error :",mean_loss)

In [14]:
layers = [
    Conv2D(3,32,3),
    ReLU(),
    MaxPool2D(2),
    Conv2D(32,64,5),
    ReLU(),
    MaxPool2D(2),
    Conv2D(64,64,3),
    ReLU(),
    Flatten(),
    Linear(64*8*8, 64),
    ReLU(),
    Linear(64, 10)
]
net = Network(
    layers = layers,
    loss = CELoss,
    loss_deriv = CELoss_deriv
)

In [None]:
feature_reshaped = feature_vector.reshape((50000,3,32,32))
net.fit(feature_reshaped,output_vector,20,0.001,32)#20 epochs

predicted_vector = net.predict(feature_reshaped)
print(predicted_vector)