# Conv Nets

In [5]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


In [23]:
data_folder = './../Data/cifar/cifar-10/'
Xtr = np.load(data_folder + 'Xtr.npy')
ytr = np.load(data_folder + 'ytr.npy')
Xte = np.load(data_folder + 'Xte.npy')
yte = np.load(data_folder + 'yte.npy')
labels = read_dictionary = np.load(data_folder + 'labels_dict.npy',allow_pickle='TRUE').item()
true_shape = (32, 32, 3)
# transpose to (1,2,0)
n = int(Xtr.shape[0]*0.8)
Xtr , Xval = Xtr[:n] , Xtr[n:]
ytr , yval = ytr[:n] , ytr[n:]

class CIFARDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        x = self.X[idx].reshape(true_shape).permute(2, 0, 1)
        return x, self.y[idx]


train_dataset = CIFARDataset(Xtr, ytr)
val_dataset = CIFARDataset(Xval, yval)
test_dataset = CIFARDataset(Xte, yte)


train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Forward Pass

In [139]:
class CNN_Layer(nn.Module):
    def __init__(self, n_channels, input_size, filter_size, stride, padding, kernals):
        super(CNN_Layer, self).__init__()
        self.n_channels = n_channels # C ( = D1)
        self.input_size = input_size # N ( = W = H) 
        self.filter_size = filter_size # F
        self.stride = stride # S
        self.padding = padding # P 
        self.kernals = kernals # K ( = D2)

        # filter = torch.rand(self.filter_size, self.filter_size , self.n_channels)
        # print(filter.shape)
        self.filters = nn.Parameter(torch.rand(self.kernals, self.n_channels, self.filter_size, self.filter_size)) # (K, C, F, F)
        self.biases = nn.Parameter(torch.rand(self.kernals)) # (K)
        # print(self.filters.shape , self.biases.shape)
        self.parameters_count = self.filters.numel() + self.biases.numel()
        # print(self.parameters_count)
        
        
    def forward(self, x):
        # p = x[0, :, :, :].detach().numpy()
        # plt.imshow(p.astype(int).reshape(3, 32, 32).transpose(1,2, 0))
        # plt.show()
        
        # print(f'{x.shape = }') # (B, C, N, N)
        activation_size = int((self.input_size + 2 * self.padding - self.filter_size)/self.stride) + 1  # N2 ( = W2 = H2)
        # print(activation_size) # N2 
        activations = torch.zeros(x.shape[0], self.kernals, activation_size, activation_size) # (B, K, N2, N2) 
        # print(f'{activations.shape = }')
        
        if self.padding > 0:
            x2 = torch.zeros(x.shape[0], self.n_channels, self.input_size + 2 * self.padding, self.input_size + 2 * self.padding)
            # print(x2.shape) # (B, C, N + 2P, N + 2P) 
            x2[:, :, self.padding:self.input_size+self.padding, self.padding:self.input_size+self.padding] = x 
            # t = x2[0, :, :, :].detach().numpy()
            # plt.imshow(t.astype(int).reshape(3, 34, 34).transpose(1,2, 0))
            # plt.show()
            x = x2
        
        # moves = int((self.input_size - self.filter_size)/self.stride) + 1
        # print(moves) 
        
        for i in range(self.input_size):
            for j in range(self.input_size):
                x_slice = x[:, :, i*self.stride:i*self.stride + self.filter_size, j*self.stride:j*self.stride + self.filter_size]
                # print(f'{x_slice.shape = }') # (B, C, F, F)
                x_slice = x_slice.unsqueeze(1) 
                # print(f'{x_slice.shape = }')
                # print(f'{self.filters.shape = }')
                # print(f'{self.biases.shape = }')
                convolute_dot = torch.sum(x_slice * self.filters, dim=(2,3,4)) # (B, 1, C, F, F) * ( K, C, F, F) -> (B, K, C, F, F) sum-> (B, K)
                # print(f'{convolute_dot.shape = }')
                # print(f'{activations[:, :, i, j].shape = }')
                activations[:, :, i, j] = convolute_dot + self.biases  # (B, K) + (K) -> (B, K) 
                # print(f'{activations[:2, :2, i, j]= }')

        return activations
        
cnn_layer = CNN_Layer(
    n_channels = 3, 
    input_size = 32, 
    filter_size = 5,
    stride = 1,
    padding = 2,
    kernals = 10
    )
Xbtr , ybtr = next(iter(train_loader))
# print(Xbtr.shape, ybtr.shape)

activ = cnn_layer(Xbtr)
activ.shape

torch.Size([64, 10, 32, 32])

In [127]:
class ReLU_Layer(nn.Module):
    def __init__(self):
        super(ReLU_Layer, self).__init__()
    
    def forward(self, x):
        return torch.max(x, torch.zeros_like(x))

relu_layer = ReLU_Layer()
activ2 = relu_layer(activ)
activ2.shape

torch.Size([64, 10, 32, 32])

In [145]:
class MaxPool_Layer(nn.Module):
    def __init__(self, input_size, filter_size, stride):
        super(MaxPool_Layer, self).__init__()
        self.input_size = input_size # N ( = W = H)
        self.filter_size = filter_size # F
        self.stride = stride # S
        self.output_size = int((self.input_size - self.filter_size)/self.stride) + 1 # N2 ( = W2 = H2)
        self.parameters_count = 0 
        
    def forward(self, x):
        pool = torch.zeros(x.shape[0], x.shape[1], self.output_size, self.output_size)
        # print(f'{pool.shape = }')
        for i in range(self.output_size):
            for j in range(self.output_size):
                x_slice = x[:, :, i*self.stride:i*self.stride + self.filter_size, j*self.stride:j*self.stride + self.filter_size]
                # print(f'{x_slice.shape = }') # (B, K, F, F) 
                x_slice = x_slice.reshape(x.shape[0], x.shape[1], -1) # (B, K, F*F) 
                # print(f'{x_slice.shape = }')
                max_pool = torch.max(x_slice, dim=2).values # (B, K)
                # print(f'{max_pool.shape = }') 
                pool[:, :, i, j] = max_pool 
                # print(x_slice[0, :, :])
                # print(max_pool[0]) 
                # print(pool[0, :, i, j])

        return pool
    
maxpool_layer = MaxPool_Layer(
    input_size = 32,
    filter_size = 2,
    stride = 2
    )
print(activ2.shape)
pool = maxpool_layer(activ2)
pool.shape

torch.Size([64, 10, 32, 32])


torch.Size([64, 10, 16, 16])

In [153]:
class cifarSubNet(nn.Module):
    def __init__(self, C, N, F1, S1, P1, K1, F2, S2):
        super(cifarSubNet, self).__init__()
        self.cnn_layer = CNN_Layer(
            n_channels = C, 
            input_size = N, 
            filter_size = F1,
            stride = S1,
            padding = P1,
            kernals = K1
            )
        self.relu_layer = ReLU_Layer()
        self.cnn_layer2 = CNN_Layer(
            n_channels = K1,
            input_size = N, 
            filter_size = F1,
            stride = S1,
            padding = P1,
            kernals = K1
            )
        self.relu_layer2 = ReLU_Layer()
        self.maxpool_layer = MaxPool_Layer(
            input_size = N,
            filter_size = F2,
            stride = S2 
            )
        self.parameters_count = self.cnn_layer.parameters_count + self.maxpool_layer.parameters_count 
        
    def forward(self, x):
        print(f'-------SUBNET-------')
        print(f'INP  {x.shape = }')
        x = self.cnn_layer(x)
        print(f'CONV {x.shape = }')
        x = self.relu_layer(x)
        print(f'RELU {x.shape = }')
        x = self.cnn_layer2(x)
        print(f'CONV {x.shape = }')
        x = self.relu_layer2(x)
        print(f'RELU {x.shape = }')
        x = self.maxpool_layer(x)
        print(f'POOL {x.shape = }')
        return x

cifar_subnet = cifarSubNet(
    C = 3, 
    N = 32,
    F1 = 5,
    S1 = 1,
    P1 = 2,
    K1 = 10,
    F2 = 2,
    S2 = 2
    )

Xbtr , ybtr = next(iter(train_loader))
subnet_activ = cifar_subnet(Xbtr)
subnet_activ.shape

-------SUBNET-------
INP  x.shape = torch.Size([64, 3, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
POOL x.shape = torch.Size([64, 10, 16, 16])


torch.Size([64, 10, 16, 16])

In [165]:
class FullyConnected_Layer(nn.Module):
    def __init__(self, input_size, output_size):
        super(FullyConnected_Layer, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.weights = nn.Parameter(torch.rand(self.input_size, self.output_size))
        self.biases = nn.Parameter(torch.rand(self.output_size))
        self.parameters_count = self.weights.numel() + self.biases.numel()
        
    def forward(self, x):
        # print(f'{x.shape = }')
        x = x.reshape(x.shape[0], -1)
        # print(f'{x.shape = }')
        return torch.mm(x, self.weights) + self.biases
    
fc_layer = FullyConnected_Layer(
    input_size = subnet_activ.shape[1]*subnet_activ.shape[2]*subnet_activ.shape[3],
    output_size = 40
    )

fc_activ = fc_layer(subnet_activ)
fc_activ.shape

fc_layer2 = FullyConnected_Layer(
    input_size = 10*4,
    output_size = 10
    )

fc_activ2 = fc_layer2(fc_activ)
fc_activ2.shape

torch.Size([64, 40])

torch.Size([64, 10])

In [166]:
fc_activ2[0]

tensor([1.4169e+10, 1.2616e+10, 1.4106e+10, 1.0178e+10, 1.3369e+10, 1.4128e+10,
        1.6455e+10, 1.3351e+10, 1.3239e+10, 1.3133e+10],
       grad_fn=<SelectBackward0>)

In [167]:
class cifarNet(nn.Module):
    def __init__(self, C, N, F1, S1, P1, K1, F2, S2, FC1, FC2, y):
        super(cifarNet, self).__init__()
        self.subnet1 = cifarSubNet(C, N, F1, S1, P1, K1, F2, S2)
        C2 = K1 
        N2 = int((N - F2)/S2) + 1 
        self.subnet2 = cifarSubNet(C2, N2, F1, S1, P1, K1, F2, S2)
        C3 = K1
        N3 = int((N2 - F2)/S2) + 1 
        self.subnet3 = cifarSubNet(C3, N3, F1, S1, P1, K1, F2, S2)
        C4 = K1
        N4 = int((N3 - F2)/S2) + 1
        self.fc_layer = FullyConnected_Layer(K1 * N4 * N4 , FC1)
        self.fc_relu1 = ReLU_Layer()
        self.fc_layer2 = FullyConnected_Layer(FC1, FC2)
        self.fc_relu2 = ReLU_Layer()
        self.fc_layer3 = FullyConnected_Layer(FC2, y)
        self.parameters_count = self.subnet1.parameters_count + self.subnet2.parameters_count + self.subnet3.parameters_count  \
            + self.fc_layer.parameters_count + self.fc_layer2.parameters_count + self.fc_layer3.parameters_count
        
    
    def forward(self, x):
        print(f'INP  {x.shape = }')
        x = self.subnet1(x)
        print(f'SUB1 {x.shape = }')
        x = self.subnet2(x)
        print(f'SUB2 {x.shape = }')
        x = self.subnet3(x)
        print(f'SUB3 {x.shape = }')        
        x = self.fc_layer(x)
        print(f'------FC Layers------')
        print(f'FC1  {x.shape = }')
        x = self.fc_relu1(x)
        print(f'RELU {x.shape = }')
        x = self.fc_layer2(x)
        print(f'FC2  {x.shape = }')
        x = self.fc_relu2(x)
        print(f'RELU {x.shape = }')
        x = self.fc_layer3(x)
        print(f'FC3  {x.shape = }')
        return x
    
cifar_net = cifarNet(
    C = 3,
    N = 32,
    F1 = 5,
    S1 = 1,
    P1 = 2,
    K1 = 10,
    F2 = 2,
    S2 = 2,
    FC1 = 80,
    FC2 = 40,
    y = 10
    )

Xbtr , ybtr = next(iter(train_loader))
net_activ = cifar_net(Xbtr)
net_activ.shape

INP  x.shape = torch.Size([64, 3, 32, 32])
-------SUBNET-------
INP  x.shape = torch.Size([64, 3, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
POOL x.shape = torch.Size([64, 10, 16, 16])
SUB1 x.shape = torch.Size([64, 10, 16, 16])
-------SUBNET-------
INP  x.shape = torch.Size([64, 10, 16, 16])
CONV x.shape = torch.Size([64, 10, 16, 16])
RELU x.shape = torch.Size([64, 10, 16, 16])
CONV x.shape = torch.Size([64, 10, 16, 16])
RELU x.shape = torch.Size([64, 10, 16, 16])
POOL x.shape = torch.Size([64, 10, 8, 8])
SUB2 x.shape = torch.Size([64, 10, 8, 8])
-------SUBNET-------
INP  x.shape = torch.Size([64, 10, 8, 8])
CONV x.shape = torch.Size([64, 10, 8, 8])
RELU x.shape = torch.Size([64, 10, 8, 8])
CONV x.shape = torch.Size([64, 10, 8, 8])
RELU x.shape = torch.Size([64, 10, 8, 8])
POOL x.shape = torch.Size([64, 10, 4, 4])
SUB3 x.shape = torch.Size([64, 

torch.Size([64, 10])

In [168]:
cifar_net.parameters_count

22310

In [172]:
class CIFARClassifier(nn.Module):
    def __init__(self, C, N, F1, S1, P1, K1, F2, S2, FC1, FC2, y):
        super(CIFARClassifier, self).__init__()
        self.cifar_net = cifarNet(C, N, F1, S1, P1, K1, F2, S2, FC1, FC2, y)
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.cifar_net(x)
        return self.softmax(x) 
    
cifar_classifier = CIFARClassifier(
    C = 3,
    N = 32,
    F1 = 5,
    S1 = 1,
    P1 = 2,
    K1 = 10,
    F2 = 2,
    S2 = 2,
    FC1 = 80,
    FC2 = 40,
    y = len(labels)
)

Xbtr , ybtr = next(iter(train_loader))
pred = cifar_classifier(Xbtr)
pred.shape
pred[0]

INP  x.shape = torch.Size([64, 3, 32, 32])
-------SUBNET-------
INP  x.shape = torch.Size([64, 3, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
CONV x.shape = torch.Size([64, 10, 32, 32])
RELU x.shape = torch.Size([64, 10, 32, 32])
POOL x.shape = torch.Size([64, 10, 16, 16])
SUB1 x.shape = torch.Size([64, 10, 16, 16])
-------SUBNET-------
INP  x.shape = torch.Size([64, 10, 16, 16])
CONV x.shape = torch.Size([64, 10, 16, 16])
RELU x.shape = torch.Size([64, 10, 16, 16])
CONV x.shape = torch.Size([64, 10, 16, 16])
RELU x.shape = torch.Size([64, 10, 16, 16])
POOL x.shape = torch.Size([64, 10, 8, 8])
SUB2 x.shape = torch.Size([64, 10, 8, 8])
-------SUBNET-------
INP  x.shape = torch.Size([64, 10, 8, 8])
CONV x.shape = torch.Size([64, 10, 8, 8])
RELU x.shape = torch.Size([64, 10, 8, 8])
CONV x.shape = torch.Size([64, 10, 8, 8])
RELU x.shape = torch.Size([64, 10, 8, 8])
POOL x.shape = torch.Size([64, 10, 4, 4])
SUB3 x.shape = torch.Size([64, 

torch.Size([64, 10])

tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 0.], grad_fn=<SelectBackward0>)

In [173]:
pred[0] , ybtr[0]

(tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 0.], grad_fn=<SelectBackward0>),
 tensor(4))