<a href="https://colab.research.google.com/github/anirbanmukherjee2709/tsai_end2.0_Session_3/blob/main/END_Session_3_PyTorch_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import the required libraries
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
seed = 42

torch.manual_seed(seed)
torch.set_grad_enabled(True)

<torch.autograd.grad_mode.set_grad_enabled at 0x7f0aefc85390>

In [2]:
# Check if GPU is available
use_cuda = torch.cuda.is_available()

# Assign GPU if available else assign CPU
device = torch.device("cuda:0" if use_cuda else "cpu")

# enables benchmark mode in cudnn.
torch.backends.cudnn.benchmark = True

In [3]:
# Download train and test datasets for MNIST
training_data = torchvision.datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=transforms.ToTensor()
)
test_data = torchvision.datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=transforms.ToTensor()
)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=9912422.0), HTML(value='')))


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=28881.0), HTML(value='')))


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=1648877.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=4542.0), HTML(value='')))


Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw

Processing...
Done!


  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [4]:
# Create random number for train and test data set to add it with the prediction
random_num_train = torch.randint(low=0, high=10, size=(len(training_data), ), dtype=torch.float)
random_num_test = torch.randint(low=0, high=10, size=(len(test_data), ), dtype=torch.float)

In [5]:
# custom dataloder to stitch mnist ad random generated number
class CustomDataset(Dataset):
    def __init__(self, data_set, train=False):
        self.labels = data_set.targets
        self.img = data_set.data
        self.train = train # separation of train and test random number tensor

    def __len__(self):
        return len(self.img)

    def __getitem__(self, idx):
        X = self.img[idx]
        y = self.labels[idx]
        
        num = random_num_train[idx] if self.train else random_num_test[idx]

        return (X/255.).reshape([1, 28, 28]), num, y, y+num

In [6]:
# data generation parameters
params = {'batch_size': 128,
          'shuffle': True,
          'num_workers': 2}

In [7]:
# create object of custom dataloader to pass into the model.
training_set = CustomDataset(training_data, train = True)
training_generator = DataLoader(training_set, **params)

test_set = CustomDataset(test_data)
test_generator = DataLoader(test_set, **params)

## Model
For MNIST predection we have tried to implement LeNet as it is a great predictor for hand written digits.

Model initialised with He et al. initializer.

Activation function being ReLU for CNN layers and sigmoid for Dense/Linear layers.

In [8]:
class Model(nn.Module):
    def __init__(self, classes=10):
        super(Model, self).__init__()

        # LAYERS FOR IMAGE RECOGNITION
        self.conv_1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=(5,5), stride=1)
        nn.init.kaiming_uniform_(self.conv_1.weight, nonlinearity='relu')
        self.actv_1 = nn.ReLU() # 24x24

        self.pool_1 = nn.AvgPool2d(kernel_size=(2, 2)) # 12x12

        self.conv_2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=(5,5), stride=1)
        nn.init.kaiming_uniform_(self.conv_2.weight, nonlinearity='relu')
        self.actv_2 = nn.ReLU() # 8x8
        
        self.pool_2 = nn.AvgPool2d(kernel_size=(2, 2)) # 4x4
        self.flatten = nn.Flatten()

        self.dense_1 = nn.Linear(in_features=256, out_features=120)
        nn.init.kaiming_uniform_(self.dense_1.weight, nonlinearity='relu')
        self.actv_3 = nn.Sigmoid()

        self.dense_2 = nn.Linear(in_features=120, out_features=84)
        nn.init.kaiming_uniform_(self.dense_2.weight, nonlinearity='relu')
        self.actv_4 = nn.Sigmoid()

        self.dense_3 = nn.Linear(in_features=84, out_features=classes)
        nn.init.kaiming_uniform_(self.dense_3.weight, nonlinearity='relu')
        self.actv_5 = nn.Softmax(dim=1)


        # LAYERS FOR ADDITION
        self.dense_4 = nn.Linear(in_features=2, out_features=10)
        nn.init.kaiming_uniform_(self.dense_4.weight, nonlinearity='relu')
        self.actv_6 = nn.Sigmoid()

        self.dense_5 = nn.Linear(in_features=10, out_features=6)
        nn.init.kaiming_uniform_(self.dense_5.weight, nonlinearity='relu')
        self.actv_7 = nn.Sigmoid()

        self.dense_6 = nn.Linear(in_features=6, out_features=19)
        nn.init.kaiming_uniform_(self.dense_6.weight, nonlinearity='relu')

    def forward(self, img, num):
        # forward pass
        x = self.actv_1(self.conv_1(img))
        x = self.pool_1(x)
        x = self.actv_2(self.conv_2(x))
        x = self.pool_2(x)
        x = self.flatten(x)
        x = self.actv_3(self.dense_1(x))
        x = self.actv_4(self.dense_2(x))
        x = self.actv_5(self.dense_3(x))

        # x predicts the mnist images to its labels 
        
        # x1 stacks the predicted label with randomly generated number for input into the 2nd part of prediction.
        x1 = torch.stack((x.argmax(dim=1), num), dim=-1)

        x1 = self.actv_6(self.dense_4(x1))
        x1 = self.actv_7(self.dense_5(x1))
        x1 = self.dense_6(x1)

        # retuns both label and the sum prediction
        return x, x1

In [9]:
# instantiating the model class and moving it to the GPU
model = Model()
print(model)
model = model.to(device)

Model(
  (conv_1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (actv_1): ReLU()
  (pool_1): AvgPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0)
  (conv_2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (actv_2): ReLU()
  (pool_2): AvgPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (dense_1): Linear(in_features=256, out_features=120, bias=True)
  (actv_3): Sigmoid()
  (dense_2): Linear(in_features=120, out_features=84, bias=True)
  (actv_4): Sigmoid()
  (dense_3): Linear(in_features=84, out_features=10, bias=True)
  (actv_5): Softmax(dim=1)
  (dense_4): Linear(in_features=2, out_features=10, bias=True)
  (actv_6): Sigmoid()
  (dense_5): Linear(in_features=10, out_features=6, bias=True)
  (actv_7): Sigmoid()
  (dense_6): Linear(in_features=6, out_features=19, bias=True)
)


In [10]:
# Check if model is on GPU ot CPU
display('Training on GPU' if model.cuda() else 'model on GPU')

'Training on GPU'

In [11]:
# initialize the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [12]:
# function to calculate accuracy of predicted labels for both the outputs
def get_num_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()

In [13]:
# declare number of epochs for which the model has to be trained
max_epochs = 50

# Model training
for epoch in range(max_epochs):
    # variables to calculate loss, correct mnist labels, and correct sum labels for each epoch
    total_loss = 0
    total_correct_labels = 0
    total_correct_sum = 0
    
    for img, num, label, label_num in training_generator:
        # move variables to GPU from CPU
        img, num, label, label_num = img.to(device), num.to(device), label.to(device), label_num.to(device)
        label_num = label_num.type(torch.long)

        pred_labels, pred_sum = model(img, num)

        loss1 = F.cross_entropy(pred_labels, label)
        loss2 = F.cross_entropy(pred_sum, label_num)

        optimizer.zero_grad()
        loss1.backward()
        loss2.backward()
        optimizer.step()

        total_loss += loss1.item() + loss2.item() 
        total_correct_labels += get_num_correct(pred_labels, label)
        total_correct_sum += get_num_correct(pred_sum, label_num)

    print(f"Epoch: {epoch + 1} | total_correct_labels: {total_correct_labels/60000*100 :.1f}% | total_correct_sum: {total_correct_sum/60000*100 :.1f}% | CELoss: {total_loss:.2f}")

Epoch: 1 | total_correct_labels: 85.8% | total_correct_sum: 31.4% | CELoss: 1782.70
Epoch: 2 | total_correct_labels: 97.4% | total_correct_sum: 69.7% | CELoss: 1313.23
Epoch: 3 | total_correct_labels: 98.0% | total_correct_sum: 82.3% | CELoss: 1136.17
Epoch: 4 | total_correct_labels: 98.1% | total_correct_sum: 84.8% | CELoss: 1059.88
Epoch: 5 | total_correct_labels: 98.2% | total_correct_sum: 85.8% | CELoss: 1017.86
Epoch: 6 | total_correct_labels: 98.3% | total_correct_sum: 87.0% | CELoss: 990.73
Epoch: 7 | total_correct_labels: 98.4% | total_correct_sum: 87.7% | CELoss: 970.51
Epoch: 8 | total_correct_labels: 98.3% | total_correct_sum: 88.3% | CELoss: 960.93
Epoch: 9 | total_correct_labels: 98.3% | total_correct_sum: 88.6% | CELoss: 951.96
Epoch: 10 | total_correct_labels: 98.6% | total_correct_sum: 89.8% | CELoss: 931.87
Epoch: 11 | total_correct_labels: 98.4% | total_correct_sum: 90.7% | CELoss: 929.08
Epoch: 12 | total_correct_labels: 98.4% | total_correct_sum: 91.8% | CELoss: 923

In [14]:
# Check if the model is working in the same way on the test dataset.
total_correct_labels = 0
total_correct_sum = 0
with open('a.txt', 'w') as f:
    for img, num, label, label_num in test_generator:
        img, num, label, label_num = img.to(device), num.to(device), label.to(device), label_num.to(device)
        label_num = label_num.type(torch.long)

        pred_labels, pred_sum = model(img, num)

        total_correct_labels += get_num_correct(pred_labels, label)
        total_correct_sum += get_num_correct(pred_sum, label_num)

        for a, b, c, d in zip(label, pred_labels.argmax(dim=1), label_num, pred_sum.argmax(dim=1)):
            # writing the outputs to a file to check later
            f.write(f'{a},{b},{c},{d}\n')
    
print(f'total_correct_labels: {total_correct_labels/10000*100:.1f}% | total_correct_sum: {total_correct_sum/10000*100:.1f}%')

total_correct_labels: 97.4% | total_correct_sum: 96.6%
