<a href="https://colab.research.google.com/github/harvard-visionlab/psy1410/blob/master/psy1410_week02_anns_answers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Helpers

Here we'll define any helper functions that we need/want as we go. We'll probably add to this as we find a need for new helper functions.

In [None]:
%config InlineBackend.figure_format = 'retina'

In [None]:
import numpy as np 
from PIL import Image 
from IPython.core.debugger import set_trace 

def show_image(img):
  return Image.fromarray( (img * 256).squeeze().numpy().astype(np.uint8) )

## A Minimal ANN

Let's start by defining a very minimal artificial neural network, with a single fully-connected linear layer that directly maps the input (1x28x28 pixels) to the output categories (10 digit categories).

In [None]:
import torch
import torch.nn as nn

In [None]:
class MyNet(nn.Module):
  def __init__(self):
    super(MyNet, self).__init__()
    # in_features = 784, because the input image is 1x28x28 = 784
    # out_features = 10, because there are 10 output categories (digits 0-9)
    self.fc = nn.Linear(in_features=784, out_features=10)
  
  def forward(self, x):
    # in the "forward pass", we take an input (a batch of images, x)
    # then first we flatten it into batchSize x 784, 
    batchSize = x.shape[0] # first dimension of x is "batchSize"
    x = x.view(batchSize, -1) # the -1 tells pytorch to flatten the tensor to be batchSize x "whatever size fits"

    # finally, we pass the flattened input into our fully-connected layer 
    # which will compute the weighted sum of the input for each of the 10 
    # categories
    x = self.fc(x)

    return x

In [None]:
# create an instance of MyNet
model = MyNet()
model

In [None]:
# test on random data (100 random images)
fake_imgs = torch.rand(100,1,28,28)
out = model(fake_imgs)
out.shape

In [None]:
# why is the output shape "100x10"?

## Inspect/visualize the weights of your randomly intialized network

Let's write a function that takes the weights of our model and visualizes them.

In [None]:
model.fc.weight.shape, model.fc.bias.shape

In [None]:
w = model.fc.weight[0].detach().reshape(28,28)
w.shape

In [None]:
import matplotlib.pyplot as plt

plt.imshow(w, extent=[0, 1, 0, 1], cmap='coolwarm');

In [None]:
def show_weights(model):
  idx = -1
  fig, axs = plt.subplots(2, 5, figsize=(15, 6))
  for row in axs:
    for ax in row:
      idx += 1
      w = model.fc.weight[idx].detach().reshape(28,28)
      ax.imshow(w, extent=[0, 1, 0, 1], cmap='coolwarm')
      ax.set_title(f"label={idx}")
      ax.grid(True)
      ax.axes.get_xaxis().set_visible(False)
      ax.axes.get_yaxis().set_visible(False)
  plt.show()  

In [None]:
show_weights(model)

## Let's Train this Model!

We'll need:
- [x] a model
- [ ] a dataset (MNIST)
- [ ] a loss function (Cross Entropy Loss)
- [ ] an optimizer (which will do all of the `back-propogation of errors` that we need to modify the weights
- [ ] we need a training function
- [ ] useful to have a validation function too

## MNIST Dataset

- we'll start with the standard MNIST dataset

In [None]:
from torchvision import datasets
from torchvision import transforms 

transform = transforms.Compose([
  transforms.ToTensor(),
])

In [None]:
train_dataset = datasets.MNIST('./data/MNIST', train=True, download=True, transform=transform)
train_dataset

In [None]:
test_dataset = datasets.MNIST('./data/MNIST', train=False, download=True, transform=transform)
test_dataset

In [None]:
train_dataset[0][0].shape

In [None]:
from torch.utils.data import DataLoader

DataLoader?

In [None]:
train_loader = DataLoader(train_dataset, batch_size=256, pin_memory=True, shuffle=True)
train_loader

In [None]:
test_loader = DataLoader(test_dataset, batch_size=256, pin_memory=True, shuffle=False)
test_loader

In [None]:
imgs, labels = next(iter(train_loader))

In [None]:
imgs.shape, labels.shape

In [None]:
output = model(imgs)
output.shape

In [None]:
idx = 10
actual = labels[idx].item()
print(actual)
show_image(imgs[idx])

In [None]:
softmax = output[idx].exp()/output[idx].exp().sum()
softmax

In [None]:
predicted = softmax.argmax().item() 
print(f"predicted={predicted}, actual={actual}")

## Loss Function

Let's use the standard cross-entropy loss function

In [None]:
import torch 
import torch.nn as nn

In [None]:
# create a fresh instance of your model 
model = MyNet()

In [None]:
# define loss function (criterion)
criterion = nn.CrossEntropyLoss()

In [None]:
# pass some images through your model, get the outputs
# why is the output 256 x 10?
imgs, labels = next(iter(train_loader))
output = model(imgs)
output.shape

In [None]:
loss = criterion(output, labels)
loss 

## Define the Optimizer

In [None]:
# define the optimizer
# this updates the weights for us using gradient descent


## The training loop

In [None]:
def train(model, train_loader, criterion, optimizer, mb=None):
  device = 'cuda' if torch.cuda.is_available() else 'cpu'
  model.train()
  model.to(device)
  criterion.to(device)

  losses = []
  for imgs,labels in progress_bar(train_loader, parent=mb):
    imgs = imgs.to(device)
    labels = labels.to(device)

    # forward pass 
    output = model(imgs)
    loss = criterion(output, labels)

    # backward pass (compute gradients, do backprop)
    optimizer.zero_grad() # zero out any existing gradients
    loss.backward()
    optimizer.step()

    losses.append(loss.item())

  return torch.tensor(losses).mean().item()

## The "test" or "validation" loop

In [None]:
def validate(model, test_loader, criterion, optimizer, mb=None):
  # use gpu if available
  device = 'cuda' if torch.cuda.is_available() else 'cpu'
  model.to(device)
  criterion.to(device)

  # place the model in "eval" mode (do not compute gradients during testing) 
  model.train()  

  # iterate over batches, compute loss and accuracy for each batch
  losses = []
  correct = []
  for imgs,labels in progress_bar(test_loader, parent=mb):
    imgs = imgs.to(device)
    labels = labels.to(device)

    # forward pass 
    output = model(imgs)

    # calculate loss and classification accuracy
    loss = criterion(output, labels)
    _, correct_k = accuracy(output, labels, topk=(1,))             

    losses.append(loss.item())
    correct.append(correct_k)

  top1 = torch.cat(correct).mean()

  return torch.tensor(losses).mean().item(), top1.mean().item()

def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        acc = []
        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float()
            acc.append(correct_k)            
            res.append(correct_k.sum(0, keepdim=True).mul_(100.0 / batch_size))
        return res, acc[0]

In [None]:
# val_loss, top1 = validate(model, test_loader, criterion, optimizer)

In [None]:
from fastprogress.fastprogress import master_bar, progress_bar 
num_epochs = 10

mb = master_bar( range(num_epochs) )
mb.names = ['train_loss', 'val_loss']
xs,y1,y2 = [], [], []
for epoch in mb:
  train_loss = train(model, train_loader, criterion, optimizer, mb=mb)
  val_loss, top1 = validate(model, test_loader, criterion, optimizer, mb=mb)
  # print(f"Epoch {epoch}: Train Loss {train_loss}, Val Loss {val_loss} Top1 {top1}")

  # graph results
  xs.append(epoch)
  y1.append(train_loss)
  y2.append(val_loss)
  graphs = [[xs,y1], [xs,y2]]
  x_bounds = [0, num_epochs]
  y_bounds = [0,2]
  mb.update_graph(graphs, x_bounds, y_bounds)
print("All Done!")
print(f"Epoch {epoch}: Train Loss {train_loss}, Val Loss {val_loss} Top1 {top1}")

In [None]:
show_weights(model)

## Exercise 1 - Improve your Model by training longer on the GPU (e.g., compare peroformance for 10 epochs vs. 30 epochs)

Goto "Runtime", select "runtime type" and choose "GPU".

In [None]:
torch.cuda.is_available()

In [None]:
from fastprogress.fastprogress import master_bar, progress_bar 

def train_model(num_epochs):
  mb = master_bar( range(num_epochs) )
  mb.names = ['train_loss', 'val_loss']
  xs,y1,y2 = [], [], []
  for epoch in mb:
    train_loss = train(model, train_loader, criterion, optimizer, mb=mb)
    val_loss, top1 = validate(model, test_loader, criterion, optimizer, mb=mb)
    # print(f"Epoch {epoch}: Train Loss {train_loss}, Val Loss {val_loss} Top1 {top1}")

    # graph results
    xs.append(epoch)
    y1.append(train_loss)
    y2.append(val_loss)
    graphs = [[xs,y1], [xs,y2]]
    x_bounds = [0, num_epochs]
    y_bounds = [0,max(max(y1),max(y2))*1.1]
    mb.update_graph(graphs, x_bounds, y_bounds)
  print("All Done!")
  print(f"Epoch {epoch}: Train Loss {train_loss}, Val Loss {val_loss} Top1 {top1}")

In [None]:
model = MyNet()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=.03)
train_model(num_epochs=10)

In [None]:
model = MyNet()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=.03)
train_model(num_epochs=30)

## Exercise 2 - Improve your Model by using a better optimizer (e.g., Adam, Adadelta), or by varying the learning rate, or both; 

Save a record of the results for each variant you try.

In [None]:
model = MyNet()
criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=.03)
optimizer = torch.optim.Adadelta(model.parameters(), lr=1.0)
train_model(num_epochs=10)

In [None]:
# show_weights(model)

## Exercise 3 - Improve your Model by adding one or more hidden layers, with or without ReLU activations.

In [None]:
class MyNetShallow(nn.Module):
  def __init__(self):
    super(MyNet, self).__init__()
    # in_features = 784, because the input image is 1x28x28 = 784
    # out_features = 10, because there are 10 output categories (digits 0-9)
    self.fc = nn.Linear(in_features=784, out_features=10)
    
  def forward(self, x):
    # in the "forward pass", we take an input (a batch of images, x)
    # then first we flatten it into batchSize x 784, 
    batchSize = x.shape[0] # first dimension of x is "batchSize"
    x = x.view(batchSize, -1) # the -1 tells pytorch to flatten the tensor to be batchSize x "whatever size fits"

    # finally, we pass the flattened input into our fully-connected layer 
    # which will compute the weighted sum of the input for each of the 10 
    # categories
    x = self.fc(x)

    return x

## Exercise 4 - Improve your Model by using convolutional layers

Save a record of the results for each variant you try.

In [None]:
from collections import OrderedDict
# reference: https://github.com/pytorch/examples/blob/master/mnist/main.py
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.cnn_backbone = nn.Sequential(OrderedDict([
             ('conv1', nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1)),
             ('relu1', nn.ReLU()),
             ('conv2', nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1)),
             ('relu2', nn.ReLU()),
             ('pool2', nn.MaxPool2d(2)),
             ('dropout2', nn.Dropout2d(0.25))
        ]))
        self.head = nn.Sequential(OrderedDict([
            ('fc3', nn.Linear(9216, 128)),
            ('relu3', nn.ReLU()),
            ('dropout3', nn.Dropout2d(0.50)),
            ('fc4', nn.Linear(128, 10)),
            ('relu4', nn.ReLU()),
        ]))

    def forward(self, x):
        x = self.cnn_backbone(x)
        x = torch.flatten(x, 1)
        x = self.head(x)
        return x

In [None]:
model = CNN()
print(model)
criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=.03)
optimizer = torch.optim.Adadelta(model.parameters(), lr=1.0)
train_model(num_epochs=10)

## Exercise 5 - Challenge your model by adding position and scale variation, see how this effects learning, generalization.