In [None]:
# a website that explains the back-prop for CNN clearly: https://medium.com/@pavisj/convolutions-and-backpropagations-46026a8f5d2c
import numpy as np
import pandas as pd
import math
import os
import sys
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

from sklearn.model_selection import train_test_split
# np.set_printoptions(threshold=sys.maxsize)


In [None]:
# load data
current_dir = %pwd
train_data = pd.read_csv('../input/digit-recognizer/train.csv',dtype = np.float32)
test_data = pd.read_csv('../input/digit-recognizer/test.csv',dtype = np.float32)

# split data into features/labels
y_train_data = train_data.label.values
x_train_data = train_data.loc[:,train_data.columns != "label"].values/255 # normalization
x_train_data=x_train_data.reshape(x_train_data.shape[0],28,28)
x_test = test_data.values/255
x_test = test_data.to_numpy().reshape(x_test.shape[0],28,28)
x_train_data = np.expand_dims(x_train_data, axis=3)
x_test = np.expand_dims(x_test, axis=3)

# split train/dev data
x_train, x_dev, y_train, y_dev = train_test_split(x_train_data, y_train_data, test_size=0.33, random_state=42)
print(len(x_train))
print(len(x_dev))
print(len(x_test))
print(x_train.shape)
print(x_test.shape)

In [None]:
# Utils
def relu(x):
    return np.maximum(x,0)

def soft_max(x): # x dim: [no_samples,output_classes]
    exponentials = np.exp(x)
    sum_exponentials = exponentials.sum(axis=1).reshape(exponentials.shape[0],-1)
    result = exponentials/sum_exponentials
    return result
def cross_entropy_loss(predicted,answer):
    return -y*np.log(predicted+1e-6).sum(axis=1)
    

In [None]:
class Conv2d: # w's in the transition between layers
    def __init__(self, channels_in, channels_out, kernel_size, stride=1, padding=0):
        self.channels_in=channels_in
        self.channels_out=channels_out
        self.kernel_size=kernel_size
        self.stride=stride
        self.padding=padding
        self.weights = None
        self.in_data = None
        self.out_data = None
        
    def __call__(self,x):
        # if it has a batch_size of 1 for x_test or just plain sgd
        if len(x.shape) == 3:
            np.expand_dims(x, axis=0)
            
        # if weights haven't been initilized, then initilize them using xavier's method
        if self.weights is None: # weights dim: [co, kernel_size, kernel_size, ci]
            self.weights = self.xavier_initialization(x)
        # padding
        if self.padding:
            x = np.pad(x, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)),'constant')
        
        # Convolving
        y = np.zeros((self.channels_out,
                     np.floor((x.shape[1]-self.kernel_size+1)/self.stride).astype(int),
                     np.floor((x.shape[2]-self.kernel_size+1)/self.stride).astype(int),
                     x.shape[0])) # y dimensions: [co][start_h][start_w][batch_size] (will swap 0&3 dimensions later)
        for co in range (self.channels_out):
            for start_h in range(0,x.shape[1]-self.kernel_size+1,self.stride): # height
                for start_w in range(0,x.shape[2]-self.kernel_size+1,self.stride):
                    # example dimension for the x below this line: 64*5*5*100(64 batch_size, 100 channels_in)
                    covariance = np.multiply(x[:, start_h:start_h+self.kernel_size, start_w:start_w+self.kernel_size, :],
                    self.weights[co]) # example dimension: 5*5*100(100 channels_in)

                    
                    y[co, np.floor(start_h/self.stride).astype(int), np.floor(start_w/self.stride).astype(int)] = \
                    np.array(np.sum(covariance,axis=(1,2,3)).reshape(2))
        y = np.swapaxes(y, 0, 3) 
        self.in_data = x
        self.out_data = y
        return self.out_data # [batch_size][start_h][start_w][channels_out]
    
    def xavier_initialization(self, x):
        fan_in = x.shape[1]*x.shape[2]*x.shape[3] #(batch size too? then add x.shape[0] too)
        fan_out = self.channels_out*np.floor((x.shape[1]-self.kernel_size+1)/self.stride).astype(int)*np.floor((x.shape[2]-self.kernel_size+1)/self.stride).astype(int)
        
        return np.random.randn(self.channels_out, self.kernel_size, self.kernel_size, self.channels_in)*np.sqrt(2/(fan_in+fan_out))
#         return np.arange(out1*ks*ks*in1).reshape(out1,ks,ks,in1) #random
    


In [None]:
# testing Conv2d
xr = np.random.randn(2*8*8*1).reshape(2,8,8,1)
c = Conv2d(1, 10, 3)
yr = c(xr)
# print(yr)
print(yr[1,:,:,0])
yr.shape

In [None]:
class MaxPool2d: # w's in the transition between layers
    def __init__(self, kernel_size, stride, padding=0):
        self.kernel_size=kernel_size
        self.stride=stride
        self.padding=padding
        self.in_data = None
        self.out_data = None
        
    def __call__(self,x):
        # if it has a batch_size of 1 for x_test or just plain sgd
        if len(x.shape) == 3:
            np.expand_dims(x, axis=0)
                                                                ###
        # padding
        if self.padding:
            x = np.pad(x, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)),'constant')
        
        # MaxPooling
        x = np.swapaxes(x, 1, 3) # new x dim: [batch_size][co][start_w][start_h] will swap 1&3 back later
        y = np.zeros((x.shape[0],
                     x.shape[1],
                     np.floor((x.shape[2])/self.stride).astype(int),
                     np.floor((x.shape[3])/self.stride).astype(int))
                     )# y dimensions: [batch_size][co][start_h][start_w] (will swap 1&3 dimensions later)
        for start_h in range(0,x.shape[3],self.stride): # height
            for start_w in range(0,x.shape[2],self.stride):
                y[:, :, np.floor(start_w/self.stride).astype(int), np.floor(start_h/self.stride).astype(int)] = \
                np.amax(x[:, :,start_w:start_w+self.kernel_size,start_h:start_h+self.kernel_size], axis=(2,3))
                
        y = np.swapaxes(y, 1, 3) 
        x = np.swapaxes(x, 1, 3) 
        self.in_data = x
        self.out_data = y
        return self.out_data # [batch_size][start_h][start_w][channels_out]
def relu(x):
    return np.maximum(x,0)
def soft_max(x): # x dim: [no_samples,output_classes]
    exponentials = np.exp(x)
    sum_exponentials = exponentials.sum(axis=1).reshape(exponentials.shape[0],-1)
    result = exponentials/sum_exponentials
    return result

In [None]:
# testing MaxPool & relu
re_yr = relu(yr)
print(re_yr[1,1,4,0])
print(re_yr[1,1,5,0])
print(re_yr[1,2,4,0])
print(re_yr[1,2,5,0])
print(re_yr[1,0,3,0])
print(re_yr[1,1,3,0])
print(re_yr[1,0,4,0])
print(re_yr[1,0,5,0])
print(re_yr[1,2,3,0])
# print(re_yr.shape)
m = MaxPool2d(3,3)
zr = m(re_yr)
print(f"max of those 9 elements is: {zr[1,0,1,0]}")
print(zr.shape)


In [None]:
class Linear: # w's in the transition between layers
    def __init__(self, in_size, out_size):
        self.in_size=in_size
        self.out_size=out_size
        self.weights = None
        self.in_data = None
        self.out_data = None
        
    def __call__(self,x):
        if self.weights is None: # weights dim: [co, kernel_size, kernel_size, ci]
            self.weights = np.random.randn(self.in_size,self.out_size)/100
        y = x@self.weights
        self.in_data = x
        self.out_data = y
        return y
    def backward(self):
        
        
        

In [None]:
# testing Linear
x= np.random.rand(3,2,5,10)
x = x.reshape(-1, 100)
print(x.shape)
l = Linear(100,10)
pr = np.arange(2000).reshape(20,100)
qr = l(pr)
print(qr.shape)


In [None]:
class CNN:
    def __init__(self):
        self.conv1 = Conv2d(1, 64, 5) # 28*28 ->  24*24
        self.pool1 = MaxPool2d(3,stride=3) # 24*24 ->8*8
        self.fc1 = nn.Linear(64*8*8, 10)
        
    def Forward(self, x):
        x = self.pool(relu(self.conv1(x)) )
        x = x.reshape(-1, 64 * 8 * 8)
        x = self.fc1(x)
        exponentials = np.exp(x)
        sum_exponentials = np.sum(e, axis=1)
        result = exponentials/sum_exponentials
        return x
        
    def Backward(self):
        self.fc1.backward()
        
	

In [None]:
cnn = CNN()
cnn.Forward()
cnn.Backward()