In [None]:
# Calls all the heavenly GODS

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
!pip install torchsummary
from torchsummary import summary

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Checks if CUDA is available and sets it as device. If unavailable, CPU is used.

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
device

device(type='cuda')

In [None]:
# Pipeline which reads MNIST data from torch hosted datasets, applies transformations to standardize the images.
# The output of this pipeline is then sent to the model for training

batch_size = 128 # defines the smallest grouped unit of images

# Data is divided into 2 sections -
# 1) train dataset - this is used in training the model
# 2) test dataset - this is used in testing the model

# creates a data pre-processing pipeline for train dataset
train_loader = torch.utils.data.DataLoader(                          # reads data from torch hosted api
    datasets.MNIST('../data', train=True, download=True,             # download=True creates local copy of data, destination location for downloading images is given as input along with train=True which reads training data
                    transform=transforms.Compose([                   # creates a pipeline to apply transformations
                        transforms.ToTensor(),                       # function to convert to tensor data type
                        transforms.Normalize((0.1307,), (0.3081,))   # function to normalize images based on mean and std dev
                    ])),
    batch_size=batch_size, shuffle=True)                             # the smallest group that is read at a time and then its shuffled before transforming

# creates a data pre-processing pipeline for test dataset
test_loader = torch.utils.data.DataLoader(                                      # reads data from torch hosted api
    datasets.MNIST('../data', train=False, transform=transforms.Compose([       # reads test data and creates a pipeline to apply transformations
                        transforms.ToTensor(),                                  # function to convert to tensor data type
                        transforms.Normalize((0.1307,), (0.3081,))              # function to normalize images based on mean and std dev
                    ])),
    batch_size=batch_size, shuffle=True)                                        # the smallest group that is read at a time and then its shuffled before transforming

# Some Notes on our naive model

We are going to write a network based on what we have learnt so far.

The size of the input image is 28x28x1. We are going to add as many layers as required to reach RF = 32 "atleast".

In [None]:
# Model structure

class FirstDNN(nn.Module):
  def __init__(self):
    super(FirstDNN, self).__init__()
    # r_in:1, n_in:28, j_in:1, s:1, r_out:3, n_out:28, j_out:1
    self.conv1 = nn.Conv2d(1, 32, 3, padding=1)
    # r_in:3 , n_in:28 , j_in:1 , s:1 , r_out:5 , n_out:28 , j_out:1
    self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
    # r_in:5 , n_in:28 , j_in:1 , s:2 , r_out:6 , n_out:14 , j_out:2
    self.pool1 = nn.MaxPool2d(2, 2)
    # r_in:6 , n_in:14 , j_in:2 , s:1 , r_out:10 , n_out:14 , j_out:2
    self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
    # r_in:10 , n_in:14 , j_in:2 , s:1 , r_out:14 , n_out:14 , j_out:2
    self.conv4 = nn.Conv2d(128, 256, 3, padding = 1)
    # r_in:14 , n_in:14 , j_in:2 , s:2 , r_out:16 , n_out:7 , j_out:4
    self.pool2 = nn.MaxPool2d(2, 2)
    # r_in:16 , n_in:7 , j_in:4 , s:1 , r_out:24 , n_out:7 , j_out:4
    self.conv5 = nn.Conv2d(256, 512, 3)
    # r_in:24 , n_in:7 , j_in:4 , s:1 , r_out:32 , n_out:7 , j_out:4
    self.conv6 = nn.Conv2d(512, 1024, 3)
    # r_in:32 , n_in:7 , j_in:4 , s:1 , r_out:40 , n_out:7 , j_out:4
    self.conv7 = nn.Conv2d(1024, 10, 3)

  def forward(self, x):
    x = self.pool1(F.relu(self.conv2(F.relu(self.conv1(x)))))
    x = self.pool2(F.relu(self.conv4(F.relu(self.conv3(x)))))
    x = F.relu(self.conv6(F.relu(self.conv5(x))))
    x = F.relu(self.conv7(x))
    x = x.view(-1, 10)
    return F.log_softmax(x)


In [None]:
# create model and move it to the device
model = FirstDNN().to(device)

In [None]:
# view summary of the model - the different layers, shape of their output and number of parameters

summary(model, input_size=(1, 28, 28))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 28, 28]             320
            Conv2d-2           [-1, 64, 28, 28]          18,496
         MaxPool2d-3           [-1, 64, 14, 14]               0
            Conv2d-4          [-1, 128, 14, 14]          73,856
            Conv2d-5          [-1, 256, 14, 14]         295,168
         MaxPool2d-6            [-1, 256, 7, 7]               0
            Conv2d-7            [-1, 512, 5, 5]       1,180,160
            Conv2d-8           [-1, 1024, 3, 3]       4,719,616
            Conv2d-9             [-1, 10, 1, 1]          92,170
Total params: 6,379,786
Trainable params: 6,379,786
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 1.51
Params size (MB): 24.34
Estimated Total Size (MB): 25.85
-------------------------------------

  return F.log_softmax(x)


In [None]:
from tqdm import tqdm

# trains the model
def train(model, device, train_loader, optimizer, epoch):
    model.train() # Ensures gradient tracking is on
    pbar = tqdm(train_loader)

    # Here, we use enumerate instead of iter so that we can track the batch index and do some intra-epoch reporting
    for batch_idx, (data, target) in enumerate(pbar):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad() # Zero gradients for every batch!
        output = model(data)  # Make predictions for this batch
        loss = F.nll_loss(output, target) # Compute the loss
        loss.backward() # Compute the gradients
        optimizer.step() # Adjust learning weights
        pbar.set_description(desc= f'loss={loss.item()} batch_id={batch_idx}') # for reporting

# performs validation/test on the model using validation/test data
def test(model, device, test_loader):
    model.eval() # Sets the model to evaluation mode, disables dropout and uses population statistics for batch normalization.
    test_loss = 0
    correct = 0
    with torch.no_grad():  # Disables gradient computation and reduces memory consumption.
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data) # predict output
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    # calculate average loss over the validation/test set
    test_loss /= len(test_loader.dataset)

    # report average loss over the validation/test set
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [None]:
# Optimizers specified in the torch.optim package. Learning rate determines the size of the steps the optimizer takes.
# Momentum nudges the optimizer in the direction of strongest gradient over multiple steps.
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# train and test the model through several epochs. 1 epoch = 1 run through the entire dataset.
for epoch in range(1, 2):
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)

  return F.log_softmax(x)
loss=1.445028305053711 batch_id=468: 100%|██████████| 469/469 [00:33<00:00, 13.99it/s]



Test set: Average loss: 1.5995, Accuracy: 4114/10000 (41%)

