In [None]:
import torch
import torchvision
import torchvision.transforms as transforms

from torch import nn
from torch.nn import functional as F

import my_utils as mu
from IPython import display

import matplotlib.pyplot as plt
import numpy as np

#Importing the packages

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
import sys
sys.path.append('/content/gdrive/MyDrive/Colab Notebooks')

READING THE DATASET AND REATING DATALOADERS

In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])     

batch_size = 50 # number of samples for each batch that will be used in the training

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,    #This loads the training set
                                          shuffle=False, num_workers=2)  

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,        #This loads the validation set
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck') #These are the 10 classes for cifar 10

*MODEL*

In [None]:
class Block(nn.Module):
    def __init__(self, num_convs, input_channels, output_channels):
        super(Block, self).__init__()
        self.num_convs = num_convs  #it's better to define the values in self.num_convs rather than just num_convs, as it makes it easier for the model to obtain for later use
        for i in range(num_convs):    
            self.add_module('conv{0}'.format(i), nn.Conv2d(input_channels,     #a convolution function is defined for num_convs
                                                           output_channels, kernel_size=3, padding=1))
            input_channels = output_channels       #we need to ensure that the number of input to the next layer is equivalent to the number of outputs
                                                      # from the previous
            self.add_module('relu{0}'.format(i), nn.ReLU())
                #these layers are added to a dictionary within the class, so they can be exracted in the for loop...
                #... of the forward function


            # The relu function converts all negative values to zero. Important for transition of gradient with respect...
            # ...to the model parameters
            
        self.avg_pool = nn.AvgPool2d(kernel_size=2,stride=2)    #pooling is used to reduce the number of spatial dimensions, ulitmately giving a larger receptive field...
                                                                  #...which allows us to have better focus on more abstract features


        
    def forward(self, x):   
        out = x
        for i in range(self.num_convs):    #iterats through the number of convolutions which we have given through the parameters
            out = self._modules['conv{0}'.format(i)](out)    
            out = self._modules['relu{0}'.format(i)](out)
            
        
        out = self.avg_pool(out)



        return out  

In [None]:
class Block_Manager(nn.Module):
    def __init__(self, conv):
        super(Block_Manager, self).__init__()
        in_channels = 3     # due to the dataset containing rgb images, we have 3 channels
        self.conv_arch = conv
        for i, (num_convs, out_channels) in enumerate(conv):

# this iterates through the convs parameters we defined. each convs is a block for the model. 
# the parameters we use are for the model are 2 blocks, each with 1 convolutional layer

          
            self.add_module('Block{0}'.format(i), Block(num_convs, in_channels, out_channels))
            in_channels = out_channels

        self.last = nn.Sequential(nn.Flatten(), nn.Linear(192, 1024),   #this is the mlp layer, at the end of the backbone, which prepares the model to go through...
                                                                           # ...the softmax regressiom classifier which is defined in the loss function  
                                  nn.ReLU(), nn.Linear(1024,1024), 
                                  nn.ReLU(),  nn.Linear(1024, 10))    #the final layer return 10 values which go into the softax
                                                          #...classifer int he loss function. here the image classification for each each sample is predicted
                                                                      
        
        #Sequencing makes it neater to define a list of layers


        
    def forward(self, x):
        out = x
        for i in range(len(self.conv_arch)):
            out = self._modules['Block{0}'.format(i)](out)
        out = self.last(out)
        return out

In [None]:
convs = ((1, 3) , (1,3))      #this model will have 2 blocks in the backbone, each with 1 convolutional layer
net = Block_Manager(convs)

In [None]:
def init_weights(m):
    if isinstance(m, nn.Linear): 
        torch.nn.init.normal_(m.weight, std=0.01)
        torch.nn.init.zeros_(m.bias)

net.apply(init_weights);
print(net)

CREATING LOSS AND OPTIMIZER


In [None]:
import torch.optim as optim

loss = nn.CrossEntropyLoss()
#This essentially calculates the distance between the predicted and actual values of Y. It's better to use this instead...
# ...of MSE loss as it optimizes the maximum liklihood in predicting correct values. It takes into account confidence when prediciting the correct value

wd = 0.0005
lr=0.002



optimizer = optim.Adam(net.parameters(), weight_decay = wd , lr=lr)

#Weight deay reduces the weight values that are very large which could dominate the outcome. So it's used to make the weights...
#...more uniform and prevent overfitting



#this is used to updates the weights of the mmodel


EVALUATION FUNCTIONS

In [None]:
def accuracy(y_hat, y):  
    """Compute the number of correct predictions."""
    if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
        y_hat = y_hat.argmax(axis=1)     #just in case the values for y is not in the form of labels, and in the form of probabilities...
                                         # ...this argmax function converts those probabilities to labels, because we need both the ground truth...
                                         #... values of y and the predicted values of y (y_hat) to be labels
    cmp = (y_hat.type(y.dtype) == y)  #cmp is a true or false value
    return float(torch.sum(cmp))    #the cmp parameter is expressed as 1 if true, and 0 if false.

    #This function evaluates the accuracy of 1 sample of data from the minibatch
    # we need to add another function below, evaluate_accuracy, which will do this for all samples in the minibatch

In [None]:
class Accumulator:  
    def __init__(self, n):     # "n" is the number of metric we want to keep track of
        self.data = [0.0] * n    # we will have a list with "n" number of zeroes
    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]
    def reset(self):
        self.data = [0.0] * len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

      #this funciton is called by the evaluate_accuracy function below, it keeps a track of the total number of predictions in each
      # minibatch. and the total  number of correct predictions in each minibatch.
      # it is essentially tracking the "accumulated" sum of total number of predictions and number of correct predictions.
      #it is important because we need to take into account the accuracy of all the minibatches, instead of just one minibatch

In [None]:
def evaluate_accuracy(net, data_iter): 
    metric = Accumulator(2) 
    for _, (X, y) in enumerate(data_iter):
        metric.add(accuracy(net(X), y), y.numel()) #the metric function keeps track of the accuracy of all the sample data in the minibatch.
    return metric[0] / metric[1]

  #This function evaluates the accuracy of all the samples of data in the minibatch


In [None]:
evaluate_accuracy(net, testloader)

TRAINING SECTION


In [None]:
def train_epoch_ch3(net, train_iter, loss, optimizer):  
    if isinstance(net, torch.nn.Module):
        net.train()
    metric = Accumulator(3)
    for X, y in train_iter:   #passing the data into the model
        y_hat = net(X)
        l = loss(y_hat, y)       #use the loss function to find the difference between your predicted value (i.e y_hat) and the real value (y)
        optimizer.zero_grad()
        l.backward() #calculates the derivative of the loss with respect to the model parameters (i.e gradient of the loss)
        optimizer.step()   #this updates the weights (using the formula w -> w - lr*gradient)
        metric.add(float(l) * len(y), accuracy(y_hat, y), y.size().numel())   #keeps track of total number of predictions and total number of...
        #...correct predictions. and also keeps track of the loss

    return metric[0] / metric[2], metric[1] / metric[2] 
      #metric[0] / metric[2] returns the average loss. metric[1] / metric[2] returns average training accuracy



    #this function trains the data for only 1 epoch, i.e 1 pass of the dataset

In [None]:
class Animator:  
    def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
                 ylim=None, xscale='linear', yscale='linear',
                 fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
                 figsize=(3.5, 2.5)):
        if legend is None:
            legend = []
        mu.use_svg_display()
        self.fig, self.axes = mu.plt.subplots(nrows, ncols, figsize=figsize)
        if nrows * ncols == 1:
            self.axes = [self.axes, ]
        self.config_axes = lambda: mu.set_axes(
            self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
        self.X, self.Y, self.fmts = None, None, fmts

    def add(self, x, y):
        if not hasattr(y, "__len__"):
            y = [y]
        n = len(y)
        if not hasattr(x, "__len__"):
            x = [x] * n
        if not self.X:
            self.X = [[] for _ in range(n)]
        if not self.Y:
            self.Y = [[] for _ in range(n)]
        for i, (a, b) in enumerate(zip(x, y)):
            if a is not None and b is not None:
                self.X[i].append(a)
                self.Y[i].append(b)
        self.axes[0].cla()
        for x, y, fmt in zip(self.X, self.Y, self.fmts):
            self.axes[0].plot(x, y, fmt)
        self.config_axes()
        display.display(self.fig)
        display.clear_output(wait=True)


    # This function plots the average loss, training accuracy and the testing accuracy after each epoch

In [None]:
def train_ch3(net, train_iter, test_iter, loss, num_epochs, optimizer):  
    animator = Animator(xlabel='epoch', xlim=[0, num_epochs], ylim=[0, 2.5],
                        legend=['train loss', 'train acc', 'test acc'])
    for epoch in range(num_epochs): #Number of passes through dataset
        train_metrics = train_epoch_ch3(net, train_iter, loss, optimizer) # returns the average loss and trainng accuracy, per epoch
        test_acc = evaluate_accuracy(net, test_iter) #retuens testing accuracy 
        animator.add(epoch + 1, train_metrics + (test_acc,))
    train_loss, train_acc = train_metrics



  
# This function trains the model for multiple epochs

GRAPH REPORT

In [None]:
num_epochs = 6
train_ch3(net, trainloader, testloader, loss, num_epochs, optimizer)
# Accuracy: 55%