In [None]:
from IPython.display import clear_output

In [None]:
# Download the required libraries (needed when running outside colab where the environment doesn't come pre-loaded with libraries)

%pip install torch
%pip install matplotlib
%pip install torchvision

clear_output()

In [None]:
import torch

import torch.nn as nn

from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
from torch.optim import Adam

from torchvision.transforms.functional import to_tensor

import matplotlib.pyplot as plt

#Contents:

1. Implementation of 1-2 layer NN fin pytorch which classifies MNIST dataset

About MNIST:

the dataset consists of images of 28x28 size. The image each contains a handwritten digit from 0 to 9. Our model needs to take this image and classify it to the correct digit.


You need to know:

1. **pytorch** (for impelementation)
2. a little bit of **matplotlib** (for visualization)


Good to have knowledge of:

1. torch dataset and dataloader

In [None]:
# MNIST function fetches the MNIST dataset. Without any transform param, the returned object is a Pillow image but we want to convert it to numerical form
# that is to say, a numpy array/torch tensor

# to_tensor is used to avoid errors when creating data loader later. we'll convert them to numpy arrays when the time comes
train_data = MNIST(root='./datasets', train=True, download=True, transform=to_tensor)
test_data  = ?

clear_output()

In [None]:
batch_size = ?

# Dataloaders are used to easily create batches of data so we can perform batch gradient descent for faster learning
train_loader = ?
test_loader = ?

## Models

let's create the architecture of our models

Instead of sigmoid activation, we'll use softmax activation

The difference is:
- sigmoid brings each value in the range 0-1
- softmax takes a vector and changes the value into probabilities: i.e the sum of those values = 1. The highest value in the original vector retains the highest value in softmax output

Here's the formula for softmax:

$$
\text{Softmax}(x)_i = \frac{e^{x_i}}{\sum_{j} e^{x_j}}
$$

In [None]:
class NN1Layer(nn.Module):

  def __init__(self, num_inp, num_out):

    super(NN1Layer, self).__init__()

    self.layer_1 = nn.Linear(num_inp, num_out)
    self.softmax = nn.Softmax(dim=1)

  def forward(self, x):

    z = ?
    a = ?

    return a


class NN2Layer(nn.Module):

  def __init__(self, num_inp, num_hidden, num_out):

    super(NN2Layer, self).__init__()

    self.layer_1 = ?
    self.layer_2 = ?

    self.hidden_activation = nn.ReLU()  # We can change the hidden activation (activation in between layer 1 and 2) here
    self.softmax = ?  # dim 0 is normally batch size, we don't want to apply softmax across batch size

  def forward(self, x):

    z1 = ?
    a1 = ?
    z2 = ?
    a2 = ?

    return a2

## The main training loop, with batch gradient descent.



Declare model, it's params, optimizers and criterion etc

We'll also declare a device here. This will let us use GPU
you can see how much difference a GPU makes by changing the device param to cpu and cuda

In [None]:
num_epochs = ?
lr = ?

device = 'cuda' if torch.cuda.is_available() else 'cpu'  # checks if machine supports cuda and if it does, we use that, otherwise cpu

train_losses = []
val_losses = []

# model = NN1Layer(?, 10)
model = NN2Layer(?, ?, ?)  # The 2 layer one is equivalent to the one we implemented in numpy

optimizer = Adam(?, lr=?)
criterion = ?  # multi-class

model.to(device)  # we need to send all input tensors as well as our model to this device. by default they are on cpu

print(f'Using device {device}')

Pre-train performance

In [None]:
%%time
model.eval()
correctly_labelled = 0

with torch.no_grad():

  val_epoch_weighted_loss = 0

  for val_batch_X, val_batch_y in test_loader:

    val_batch_X = val_batch_X.view(-1, ?).to(device)
    val_batch_y = val_batch_y.to(device)

    val_batch_y_probs = model(?)

    loss = criterion(?, ?)
    val_epoch_weighted_loss += (len(val_batch_y)*loss.item())

    val_batch_y_pred = val_batch_y_probs.argmax(dim=1)  # convert probailities to labels by picking the label (index) with the highest prob

    correctly_labelled += (val_batch_y_pred == ?).sum().item()  # item converts tensor to float/int/list

val_epoch_loss = val_epoch_weighted_loss/len(test_loader.dataset)
val_losses.append(val_epoch_loss)

print(f'val_loss={val_epoch_loss}. labelled {correctly_labelled}/{len(test_loader.dataset)} correctly ({correctly_labelled/len(test_loader.dataset)*100}% accuracy)')

Training the model

In [None]:
%%time
for epoch_no in range(num_epochs):

  model.train()  # convert to train model. This turns out train-specific layers in the model (if you dont know about them, an example of them is dropout. more on this later)

  epoch_weighted_loss = 0

  for batch_X, batch_y in ?:

    batch_X = batch_X.view(-1, 28*28).to(device)  # convert to [N, 28*28] shape where N is batch_size
    batch_y = batch_y.to(device)

    batch_y_probs = model(?)  # outputs [N, 10] where each [:, 10] is probabilities for class (0-9)

    loss = ?

    optimizer.zero_grad()  # need to clear out gradients from previous batch
    loss.backward()  # calculate new gradients
    optimizer.step()  # update weights

    epoch_weighted_loss += (len(batch_y)*loss.item())

  epoch_loss = epoch_weighted_loss/len(train_loader.dataset)
  train_losses.append(epoch_loss)    # add loss for tracking. we'll visualize the loss trajectory later


  # validation time

  model.eval()  # take model to evaluation mode. turn off train-only layers
  correctly_labelled = 0

  with torch.no_grad():  # this makes our model to NOT track gradients

    val_epoch_weighted_loss = 0

    for val_batch_X, val_batch_y in test_loader:

      val_batch_X = val_batch_X.view(-1, ?).to(device)
      val_batch_y = val_batch_y.to(device)

      val_batch_y_probs = ?

      loss = criterion(?, val_batch_y)
      val_epoch_weighted_loss += (len(val_batch_y)*loss.item())

      val_batch_y_pred = val_batch_y_probs.argmax(dim=1)  # convert probailities to labels by picking the label (index) with the highest prob

      correctly_labelled += (? == val_batch_y).sum().item()  # item converts tensor to float/int/list

  val_epoch_loss = val_epoch_weighted_loss/len(test_loader.dataset)
  val_losses.append(val_epoch_loss)

  print(f'Epoch: {epoch_no}, train_loss={epoch_loss}, val_loss={val_epoch_loss}. labelled {correctly_labelled}/{len(test_loader.dataset)} correctly ({correctly_labelled/len(test_loader.dataset)*100}% accuracy)')

print(f'Training complete on device {device}. Change device variable and run again to see the difference.')

Loss trajectory

In [None]:
plt.plot(train_losses, label='train loss')
plt.plot(val_losses, label='val loss')
plt.xlabel('Epoch')
plt.ylabel('Loss (Cross Entropy)')
plt.legend()
plt.show()

visualizing shapes

In [None]:
print(val_batch_X.shape, val_batch_y.shape)  # 784 is 28*28
print(val_batch_y_probs.shape)  # inference from model
print(val_batch_y_pred.shape)  # probabilities converted

print("\n\nTo verify softmax converts inputs into probabilities (sum of which is 1), let's sum those probabilities and see if we get 1's: \n")
print(f'{val_batch_y_probs.sum(1)=}')

print(f'\n{val_batch_y_probs.sum(1).shape=}')

print('\n\nLets see argmax in action')
print("Here's one of the input to argmax\n")
test_idx = 15
print(val_batch_y_probs[test_idx].cpu())  # .cpu() brings a tensor back to cpu device from any other it might be on (like cuda)
print("\n\nHere's its output")
print(val_batch_y_pred[test_idx].cpu())

Saving and loading the model

In [None]:
# you can load save the model's state dict like this
torch.save(model.state_dict(), 'MNIST_classifier.pt')  # take a look at the storage section if you're on colab

In [None]:
loaded_model = NN2Layer(28*28, 32, 10)  # just an empty model.
loaded_model.eval().to(device)

print('Before loading model')

with torch.no_grad():
  probs = loaded_model(val_batch_X)
  preds = probs.argmax(dim=1)

  print(f'{(preds==val_batch_y).sum()}/{len(preds)} correct')


print('After loading model')
loaded_model.load_state_dict(torch.load('MNIST_classifier.pt'))

with torch.no_grad():
  probs = loaded_model(val_batch_X)
  preds = probs.argmax(dim=1)

  print(f'{(preds==val_batch_y).sum()}/{len(preds)} correct')

END.