# Lab2: Residual Networks for Image Classification
### INF581: Advanced Topics in Articial Intelligence



Cifar10 is a classic dataset for deep learning, consisting of 32x32 images split in 50k train images and 10k test images, belonging to 10 different classes (plane, car, bird, cat, deer, dog, frog, horse, ship, truck). Cifar10 resembles MNIST — both have 10 classes and tiny images. However, while getting 90% accuracy on MNIST is trivial, getting 90% on Cifar10 requires serious work. 

The task is then to implement a Residual Network (ResNet), using pytorch library. (see figure at the end)

In [None]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
from torch.utils.data.sampler import SubsetRandomSampler

import torchvision
import torchvision.transforms as transforms

import numpy as np
import os
import argparse

from tqdm import tqdm


Task 1: 
The torchvision package allows you to introduce data augmentation layers to your data. Create two dataloaders for training and testing data using:
1. zero padding of 4 pixels and cropping randomly a 32x32 patch
2. Random Horizontal Flip 

as a data augmentation method. 


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

net_best_acc = 0  # best test accuracy on the self supervised task
net_start_epoch = 0  # start from epoch 0 or last checkpoint epoch

# Data
print('==> Preparing data..')
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
exclude_list=[0,1,3,4,5,6,7,8] # keep only the 'bird' and the 'ship' class
# exclude_list=[] # uncomment this line to include all data

class CIFAR10_INF581(torchvision.datasets.CIFAR10):
    """
    Custom Dataset build on top of CIFAR10
    """
    def __init__(self, *args, exclude_list=[], **kwargs):
        super(CIFAR10_INF581, self).__init__(*args, **kwargs)

        if exclude_list == []:
            return

        targets = np.array(self.targets)
        exclude = np.array(exclude_list).reshape(1, -1)
        mask = ~(targets.reshape(-1, 1) == exclude).any(axis=1)
        self.data = self.data[mask]
        self.targets = targets[mask].tolist()

trainset = CIFAR10_INF581(root='./data', train=True, download=True, exclude_list=exclude_list, transform=transform_train)
testset = CIFAR10_INF581(root='./data', train=False, download=True, exclude_list=exclude_list, transform=transform_test)

# Original dataset
# trainset = CIFAR10(root='./data', train=True, download=True, exclude_list=exclude_list, transform=transform_train)
# testset = CIFAR10(root='./data', train=False, download=True, exclude_list=exclude_list, transform=transform_test)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=0)
testloader = torch.utils.data.DataLoader(testset, batch_size=100,shuffle=False,  num_workers=0)


### Visualisation
Run the following script to visualise some of the data and familiarise yourself with the problem

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# functions to show an image


def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))


# get some random training images
dataiter = iter(trainloader)
images, labels = dataiter.next()

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join('%5s' % classes[labels[j]] for j in range(4)))

Once the network is ready we need the following train and test functions, as well as the learning rate adjust function for the training. 

In [None]:
def adjust_learning_rate(optimizer, epoch, init_lr, rate=0.2, adjust_frequency=30):
    """Sets the learning rate to the initial LR decayed by <rate> every <adjust_frequency> epochs"""
    lr = init_lr * (rate ** (epoch // adjust_frequency))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr



def train(epoch,net=None):
    global lr, optimizer

    print('\nEpoch: %d' % epoch)
    adjust_learning_rate(optimizer, epoch, lr, rate=0.2, adjust_frequency=30)

    net.train()
    correct = 0
    total = 0
    
    count = 0
    total_loss = 0
    for batch_idx, data in enumerate(tqdm(trainloader),0):
        inputs, targets = data
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()*inputs.size(0)
        count+=inputs.size(0)
        
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        # if batch_idx>2:
            # break

    print('Epoch %d, Train, Loss: %.3f, Acc: %.3f' % (epoch, total_loss/count , 100.*correct/total))


def test(epoch,net=None):
    global net_best_acc

    net.eval()

    count = 0
    total_loss = 0
    
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(tqdm(testloader)):
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            total_loss += loss.item()*inputs.size(0)
            count+=inputs.size(0)
        
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

        print('Epoch %d, Test,  Loss: %.3f, Acc: %.3f' % (epoch, total_loss/count, 100.*correct/total))

### Simple Example
#### MLP
Pytorch already offers multiple predefined modules that we can use to build neural networks. In case the available modules do not satisfy our needs, we can inherit the base module class and define our own module. 

In the following example, we create a simple MLP with predefined modules. We found that if the input is not shaped in a friendly format we can reshape it using our custom module Flatten.

In [None]:
print('==> Building MLP...')

class Flatten(torch.nn.Module):
    def forward(self, x):
        batch_size = x.shape[0]
        return x.view(batch_size, -1)

mlp = nn.Sequential(
    Flatten(),
    nn.Linear(3*32*32,100),
    nn.ReLU(),
    nn.Linear(100,10)
)
mlp = mlp.to(device)



criterion = nn.CrossEntropyLoss()

# lr and optimizer for the self supervised task
lr=0.1

optimizer = optim.SGD(mlp.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)

n_epochs_net = 10
print('Training mlp for {} epochs'.format( n_epochs_net))
for epoch in range(net_start_epoch, n_epochs_net):
    train(epoch,net=mlp)
    test(epoch,net=mlp)

### Task:
We used the same basic principles as in the MLP to define a ResNet. As a task try to define the blocks of our ResNet by filling the missing code.
The ResNet implementation is composed of "layers" which maintain the same number of input and output planes.
Each layer is composed from a a cascade of a number of blocks. A ResNet block is the building block of a ResNet and it's defined as:
$$
g\left(BN(W_2\star g\left(BN\left(W_1 \star x \right)\right) + x\right)
$$
where
$
W_1\in\mathbb{R}^{in\_planes \times planes \times 3 \times 3}
$,
$
W_2\in\mathbb{R}^{planes \times planes \times 3 \times 3}
$,
$g\left(x\right)= ReLU\left(x\right)$  and 
$BN $ stands for Batch Normalisation.
##Commented out solution.

In [None]:

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


We put the blocks together to define a ResNet module:

In [None]:



class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        sp = 4       # Increase the number of features to increase performance
        self.in_planes = sp

        self.conv1 = nn.Conv2d(3, sp, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(4)
        self.layer1 = self._make_layer(block, sp, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 2*sp, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 4*sp, num_blocks[2], stride=2)
#         self.layer4 = self._make_layer(block, 4*sp, num_blocks[3], stride=2)
        self.linear = nn.Linear(4*sp*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x, return_last_layer=False, return_both = False,second_head=False):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # out = self.layer4(out) #Increase the number of layers to increase performance
        if return_last_layer:
            return out
        out = F.adaptive_avg_pool2d(out, 1)
        out_pre = out.view(out.size(0), -1)
        out = self.linear(out_pre)

        return out


def ResNet14():
    return ResNet(BasicBlock, [2,2,2])


In [None]:
print('==> Building MLP...')

net= ResNet14()
net.to(device)



criterion = nn.CrossEntropyLoss()

# lr and optimizer for the self supervised task
lr=0.1

optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)

n_epochs_net = 10
print('Training mlp for {} epochs'.format( n_epochs_net))
for epoch in range(net_start_epoch, n_epochs_net):
    train(epoch,net=net)
    test(epoch,net=net)

#### Hint 
The network topology of the desired network is sketched in the following graph:

![title](visualisation.png)

you can visualise the graph of your network using, the following:

In [None]:
from torchviz import make_dot

random_input=torch.randn(32,3,32,32)
out=net(random_input)
dot=make_dot(out.mean())
dot.render("visualisation")