In [None]:
import torch
from network import components as lay
from network import functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [None]:
import numpy as np
from IPython.display import clear_output
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import math

# Load Data

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),  # convert to tensor
    #transforms.Lambda(lambda x: x.view(-1))  # flatten into vector
])

In [None]:
def load_data():
    mnist_trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    mnist_testset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
    trainloader = DataLoader(mnist_trainset, batch_size=128, shuffle=False)
    test_loader = DataLoader(mnist_testset, batch_size=128, shuffle=False)
    return trainloader, test_loader

In [None]:
train, test = load_data()

# Model

In [None]:
class Convolution(object):
    def __init__(self, input_size, kernel_size : tuple = (2,2), stride : int = 1, padding : tuple = (0,0), channels : int = 1):
        super(Convolution, self).__init__()
        
        self.input_size = input_size
        self.kernel_size = kernel_size
        self.padding = padding
        self.stride = stride
        self.channels = channels
        self.intial = True
        
        self.size_x = math.floor(((input_size[1] - kernel_size[1] + padding[1] * 2)/stride) + 1)
        self.size_y = math.floor(((input_size[0] - kernel_size[0] + padding[0] * 2)/stride) + 1)
        
        self.kernels = torch.rand((channels, kernel_size[0], kernel_size[1]), dtype=torch.float)
        self.bias = torch.rand((channels, self.size_y, self.size_x), dtype=torch.float)
        
        self.output_feature = torch.zeros((channels, self.size_y, self.size_x),dtype=torch.float)
        
        return
    
    def forward(self, input_matrix): 
        
        #normalise
        _mean = torch.mean(input_matrix, dim = (-2,-1))
        _std = torch.std(input_matrix, dim = (-2,-1))
        for i in range(input_matrix.shape[0]):
            input_matrix[i] = (input_matrix[i] - _mean[i])/ _std[i]
        
        #match channels
        if (input_matrix.shape[0] > 1) and self.intial:
            self.channels = input_matrix.shape[0]
            self.kernels = torch.rand((self.channels, self.kernel_size[0], self.kernel_size[1]), dtype=torch.float)
            self.bias = torch.rand((self.channels, self.size_y, self.size_x), dtype=torch.float)
            self.intial = False
            
        #pad x
        if self.padding[0] > 0:
            pad_tensor = torch.zeros((input_matrix.shape[0], input_matrix.shape[1] + self.padding[0] * 2, input_matrix.shape[2]), dtype=torch.float)
            pad_tensor[:, self.padding[0]: - self.padding[0], :] = input_matrix[:, :, :]
            input_matrix = pad_tensor
        
        #pad y
        if self.padding[1] > 0:
            pad_tensor = torch.zeros((input_matrix.shape[0], input_matrix.shape[1], input_matrix.shape[2] + self.padding[1] * 2), dtype=torch.float)
            pad_tensor[:, :, self.padding[1]: - self.padding[1]] = input_matrix[:, :, :]
            input_matrix = pad_tensor
        
        self.input = input_matrix
        self.output_feature = torch.zeros((self.channels, self.size_y, self.size_x),dtype=torch.float)
        col = 0
        for j in range(self.size_y):
            row = 0
            for i in range(self.size_x):
                
                window = input_matrix[:, col:col+self.kernel_size[0], row:row+self.kernel_size[1]]
                _sum = torch.sum(window * self.kernels[:,], dim = (-2,-1))
                self.output_feature[:, j, i] = _sum + self.bias[:, j, i]
                
                row += self.stride
                        
            col += self.stride

        return self.output_feature
    
    
    
    
    
    
    
    
    #########################################
    
    def back(self, gradient, lr:float=0.01):
        
        self.delta_k = torch.zeros((self.channels, self.kernel_size[0], self.kernel_size[1]),dtype=torch.float)
        col = 0
        for j in range(self.kernel_size[0]):
            row = 0
            for i in range(self.kernel_size[1]):
                
                window = self.input[:, col:col+gradient.shape[1], row:row+gradient.shape[2]]
                self.delta_k[:, j, i] = torch.sum(window * gradient[:,], dim = (-2,-1))
                
                row += self.stride
                        
            col += self.stride
        
        self.kernels = self.kernels - self.delta_k * lr
        self.bias = self.bias - gradient * lr
        
        rot_kernal = torch.rot90(self.kernels, 1, [-2, -1])
        rot_kernal = torch.rot90(rot_kernal, 1, [-2, -1])
        
        self.delta_x = torch.zeros((self.channels, self.input_size[0],self.input_size[1]),dtype=torch.float)
        padding = (self.input.shape[1] - self.kernels.shape[1], self.input.shape[2] - self.kernels.shape[2])
        
        pad_tensor = torch.zeros((rot_kernal.shape[0], rot_kernal.shape[1] + padding[0] * 2, rot_kernal.shape[2] + padding[1] * 2), dtype=torch.float)
        pad_tensor[:, padding[0]: -padding[0], padding[1]: - padding[1]] = rot_kernal[:, :, :]
        rot_kernal = pad_tensor
        
        col = 0
        for j in range(self.input_size[0]):
            row = 0
            for i in range(self.input_size[1]):
                
                window = rot_kernal[:, col:col+gradient.shape[1], row:row+gradient.shape[2]]
                self.delta_x[:, j, i] = torch.sum(window * gradient[:,], dim = (-2,-1))
                
                row += self.stride
                        
            col += self.stride
        return self.delta_x

In [7]:
class Pooling(object):
    def __init__(self, input_size, window_size : tuple = (2,2), stride : int = 1, _type = 'AVG'):
        super(Pooling, self).__init__()
        
        self.input_size = input_size
        self.window_size = window_size
        self.stride = stride
        self._type = _type
        
        return
    
    def forward(self, input_matrix): 
        
        size_x = math.floor(((self.input_size[1] - self.window_size[1])/self.stride) + 1)
        size_y = math.floor(((self.input_size[0] - self.window_size[0])/self.stride) + 1)
        
        output_feature = torch.zeros((input_matrix.shape[0], size_y, size_x),dtype=torch.float)
        
        col = 0
        for j in range(size_y):
            row = 0
            for i in range(size_x):
                
                window = input_matrix[:, col:col+self.window_size[0], row:row+self.window_size[1]]
                win_grad = torch.zeros_like(window)
                if self._type == 'AVG':
                    output_feature[:, j, i] = torch.mean(window, dim = (1,2))
                else:
                    output_feature[:, j, i] = torch.amax(window,dim=(1,2))
                    
                    if torch.count_nonzero(window) != 0:
                         
                        
                        pass

                row += self.stride

            col += self.stride
        
        return output_feature
    
    def back(self, gradient):
        if self._type == 'AVG':
            grad = 1
        else:
            grad = 1
        return grad

In [8]:
class TestModel_old(object):
    def __init__(self, input_size, hidden_size, output_size):
        super(TestModel_old, self).__init__()
        
        self.conv1 = Convolution(input_size, kernel_size=(3,3), channels = 5, padding = (1,1))
        #((height + pad*2 - kernel_height)/stride) + 1
        self.relu1 = lay.ReLu()
        #self.pool1 = Pooling((28,28), window_size=(3,3), _type='MAX')
        self.conv2 = Convolution((28,28), kernel_size=(5,5))
        self.relu2 = lay.ReLu()
        self.fc1 = lay.FullyConnected(576 * 5,hidden_size)
        self.sigmoid = lay.Sigmoid()
        self.fc2 = lay.FullyConnected(hidden_size,output_size)
        self.softmax = lay.Softmax()
        
        return
    
    def forward(self, inp):
        
        out = self.conv1.forward(inp)
        out = self.relu1.forward(out)
        #out = self.pool1.forward(out)
        out = self.conv2.forward(out)
        out = self.relu2.forward(out)
        out = F.stack(out)
        out = self.fc1.forward(out)
        out = self.sigmoid.forward(out)
        out = self.fc2.forward(out)
        out = self.softmax.forward(out)
        
        return out#
    
    def forward_show(self, inp):
        
        out = self.conv1.forward(inp)
        print("CONV1:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        out = self.relu1.forward(out)
        print("RELU1:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        #out = self.pool1.forward(out)
        #print("POOL1:", out.shape)
        #plt.imshow(out[0],cmap='gray')
        #plt.show()
        #plt.imshow(out[1],cmap='gray')
        #plt.show()
        #plt.imshow(out[2],cmap='gray')
        #plt.show()
        #plt.imshow(out[3],cmap='gray')
        #plt.show()
        #plt.imshow(out[4],cmap='gray')
        #plt.show()
        
        out = self.conv2.forward(out)
        print("CONV2:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        out = self.relu2.forward(out)
        print("RELU2:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        out = F.stack(out)
        print(out)
        out = self.fc1.forward(out)
        out = self.sigmoid.forward(out)
        out = self.fc2.forward(out)
        out = self.softmax.forward(out)
        
        return out
    
    def backward(self, loss_grad, lr : float=0.01):
        
        gradient = loss_grad
        gradient = self.softmax.back(gradient)
        gradient = self.fc2.back(gradient, lr=lr)
        gradient = self.sigmoid.back(gradient)
        gradient = self.fc1.back(gradient, lr=lr)
        #print(gradient)
        gradient = gradient.view(5, 24, 24)
        #print(gradient)
        gradient = self.relu2.back(gradient)
        gradient = self.conv2.back(gradient,lr=lr)
        #gradient = self.pool1.back(gradient)
        gradient = self.relu1.back(gradient)
        gradient = self.conv1.back(gradient,lr=lr)
        
    
        return

In [12]:
class TestModel(object):
    def __init__(self, input_size, hidden_size, output_size):
        super(TestModel, self).__init__()
        
        self.conv1 = Convolution(input_size, kernel_size=(3,3), channels = 5, padding = (1,1))
        #((height + pad*2 - kernel_height)/stride) + 1
        self.sigmoid1 = lay.Sigmoid()
        self.fc1 = lay.FullyConnected(784 * 5,hidden_size)
        self.sigmoid2 = lay.Sigmoid()
        self.fc2 = lay.FullyConnected(hidden_size,output_size)
        self.sigmoid3 = lay.Sigmoid()
        
        return
    
    def forward(self, inp):
        
        out = self.conv1.forward(inp)
        out = self.sigmoid1.forward(out)
        out = F.stack(out).unsqueeze(1)
        out = self.fc1.forward(out)
        out = self.sigmoid2.forward(out)
        out = self.fc2.forward(out)
        out = self.sigmoid3.forward(out)
        
        return out#
    
    def forward_show(self, inp):
        
        out = self.conv1.forward(inp)
        print("CONV1:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        out = self.sigmoid1.forward(out)
        print("SIGM1:", out.shape)
        plt.imshow(out[0],cmap='gray')
        plt.show()
        plt.imshow(out[1],cmap='gray')
        plt.show()
        plt.imshow(out[2],cmap='gray')
        plt.show()
        plt.imshow(out[3],cmap='gray')
        plt.show()
        plt.imshow(out[4],cmap='gray')
        plt.show()
        
        out = F.stack(out)
        print(out)
        out = F.stack(out).unsqueeze(1)
        out = self.fc1.forward(out)
        out = self.sigmoid2.forward(out)
        out = self.fc2.forward(out)
        out = self.sigmoid3.forward(out)
        
        return out
    
    def backward(self, loss_grad, lr : float=0.01):
        
        gradient = loss_grad
        gradient = self.sigmoid3.back(gradient)
        gradient = self.fc2.back(gradient, lr=lr)
        gradient = self.sigmoid2.back(gradient)
        gradient = self.fc1.back(gradient, lr=lr)
        gradient = gradient.view(5, 28, 28)
        gradient = self.sigmoid1.back(gradient)
        gradient = self.conv1.back(gradient,lr=lr)
    
        return

In [13]:
model = TestModel_old((28,28), 100, 10)
loss_function = F.MSELoss()

# Train

In [14]:
from tqdm.notebook import tqdm

num = 1
SHOW = True
for epoch in range(5):
    batch_count = 0
    #AVGloss = 0
    batch_num = 0
    for train_batch in train:
        batch_size = np.shape(train_batch[1])[0]
        AVGloss = 0
                
        if batch_count > num:
            break
        batch_count += 1
        batch_num +=1
        print("batch:", batch_num)
        for i in tqdm(range(batch_size)):
                    
            _input = train_batch[0][i]
            _label = torch.zeros(10, dtype=torch.float)
            _label[train_batch[1][i]] = 1.0

            result = model.forward(_input)
            _loss =  loss_function.calculate(result, _label)
            model.backward(loss_function.back(), lr=0.05)
            AVGloss += torch.sum(_loss) / batch_size
            
        print("LOSS:", AVGloss)
            
if SHOW:
    result = model.forward_show(train_batch[0][0])
    plt.bar([0,1,2,3,4,5,6,7,8,9], result)
    plt.show()
    print("LABEL:", train_batch[1][0])

batch: 1


  0%|          | 0/128 [00:00<?, ?it/s]

RuntimeError: permute(sparse_coo): number of dimensions in the tensor input does not match the length of the desired ordering of dimensions i.e. input.dim() = 2 is not equal to len(dims) = 3

In [None]:
limit = 10
count = 0 
total = 0
correct = 0
if SHOW:
    for test_batch in tqdm(test):
        for i in range(np.shape(test_batch)[0]):
            total += 1
            if count < limit:
                result = model.forward_show(test_batch[0][i])
                plt.bar([0,1,2,3,4,5,6,7,8,9], result)
                plt.show()
                print("LABEL:", test_batch[1][i])
                count += 1
            else:
                result = model.forward(test_batch[0][i])
            if torch.argmax(result) == test_batch[1][i]:
                correct += 1

print("ACCURACY:", (correct/total) * 100 )