<div class="alert alert-block alert-info">
<b>Deadline:</b> March 15, 2023 (Wednesday) 23:00
</div>

# Exercise 3. Convolutional networks. ResNet.

In the third part you need to train a convolutional neural network with a ResNet architecture [(He et al, 2016)](https://arxiv.org/abs/1512.03385).

In [83]:
skip_training = True  # Set this flag to True before validation and submission

In [84]:
# During evaluation, this cell sets skip_training to True
# skip_training = True

import tools, warnings
warnings.showwarning = tools.customwarn

In [85]:
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torchvision
import torchvision.transforms as transforms

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import tools
import tests

In [86]:
# When running on your own computer, you can specify the data directory by:
# data_dir = tools.select_data_dir('/your/local/data/directory')
data_dir = tools.select_data_dir()

The data directory is /coursedata


In [87]:
# Select the device for training (use GPU if you have one)
#device = torch.device('cuda:0')
device = torch.device('cpu')

In [88]:
if skip_training:
    # The models are always evaluated on CPU
    device = torch.device("cpu")

## FashionMNIST dataset

Let us use the FashionMNIST dataset. It consists of 60,000 training images of 10 classes: 'T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot'.

In [89]:
transform = transforms.Compose([
    transforms.ToTensor(),  # Transform to tensor
    transforms.Normalize((0.5,), (0.5,))  # Scale images to [-1, 1]
])

trainset = torchvision.datasets.FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
testset = torchvision.datasets.FashionMNIST(root=data_dir, train=False, download=True, transform=transform)

classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal',
           'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=False)

## ResNet

We create a network with an architecure inspired by [ResNet](https://arxiv.org/pdf/1512.03385.pdf).

### ResNet block
Our ResNet consists of blocks with two convolutional layers and a skip connection.

In the most general case, our implementation should have:

<img src="resnet_block_04.png" width=220 style="float: right;">

* Two convolutional layers with:
    * 3x3 kernel
    * no bias terms
    * padding with one pixel on both sides
    * 2d batch normalization after each convolutional layer.

* **The first convolutional layer also (optionally) has:**
    * different number of input channels and output channels
    * change of the resolution with stride.

* The skip connection:
    * simply copies the input if the resolution and the number of channels do not change.
    * if either the resolution or the number of channels change, the skip connection should have one convolutional layer with:
        * 1x1 convolution **without bias**
        * change of the resolution with stride (optional)
        * different number of input channels and output channels (optional)
    * if either the resolution or the number of channels change, the 1x1 convolutional layer is followed by 2d batch normalization.

* The ReLU nonlinearity is applied after the first convolutional layer and at the end of the block.

<div class="alert alert-block alert-warning">
<b>Note:</b> Batch normalization is expected to be right after a convolutional layer.
</div>

<img src="resnet_blocks_123.png" width=650 style="float: top;">

The implementation should also handle specific cases such as:

Left: The number of channels and the resolution do not change.
There are no computations in the skip connection.

Middle: The number of channels changes, the resolution does not change.

Right: The number of channels does not change, the resolution changes.

Your task is to implement this block. You should use the implementations of layers in `nn.Conv2d`, `nn.BatchNorm2d` as the tests rely on those implementations.

In [90]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        """
        Args:
          in_channels (int):  Number of input channels.
          out_channels (int): Number of output channels.
          stride (int):       Controls the stride.
        """
        super(Block, self).__init__()
        # YOUR CODE HERE
        #raise NotImplementedError()

        # Convolutional layer
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, 
        # dilation=1, groups=1, bias=True, padding_mode='zeros', device=None, dtype=None)
        
        # Max pooling layer
        # torch.nn.MaxPool2d(kernel_size, stride=None, padding=0, dilation=1, 
        #                    return_indices=False, ceil_mode=False)
        
        # ReLU function
        # torch.nn.ReLU(inplace=False)
        
        # Fully connected (fc) layer
        # torch.nn.Linear(in_features, out_features, bias=True, device=None, dtype=None)
        
        # torch.nn.BatchNorm2d(num_features, eps=1e-05, momentum=0.1, affine=True, 
        #                      track_running_stats=True, device=None, dtype=None)
        # num_features (int) – C from the 4D shape input (N, C, H, W)
        
        # torch.nn.AvgPool2d(kernel_size, stride=None, padding=0, ceil_mode=False, 
        #                    count_include_pad=True, divisor_override=None)
        
        # Two convolutional layers
        # 3x3 kernel
        # no bias terms
        # padding with one pixel
        # 2d batch normalization
        
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.batchnorm1 = nn.BatchNorm2d(num_features=out_channels)
        
        self.relu1 = nn.ReLU()
        
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.batchnorm2 = nn.BatchNorm2d(num_features=out_channels)
        
        self.relu2 = nn.ReLU()
        
        # First convolutional layer
        # different input and output channels
        # change of resolution with stride 
        
        # The skip connection:

        # if either the resolution or the number of channels change, the skip connection should have one convolutional layer with:
        # 1x1 convolution **without bias**
        # change of the resolution with stride (optional)
        # different number of input channels and output channels (optional)
        # if either the resolution or the number of channels change, the 1x1 convolutional layer is followed by 2d batch normalization.

        if stride != 1 or in_channels != out_channels:
            self.skip_model = nn.Sequential(
               nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride, padding=0, bias=False),
               nn.BatchNorm2d(num_features=out_channels)
           )
        else:
            # simply copies the input if the resolution and the number of channels do not change.
            # This case happens if neither the resolution or the number of channels change, 

            # torch.nn.Identity(*args, **kwargs)[SOURCE]
            # A placeholder identity operator that is argument-insensitive.
            # nn.Identity does is forwarding the input given to it (basically no-op)
            self.skip_model = nn.Identity()
        
            
    def forward(self, x):
        # YOUR CODE HERE
        # raise NotImplementedError()
        # YOUR CODE HERE
        
        # The ReLU nonlinearity is applied after the first convolutional layer 
        # and at the end of the block.
        y = self.conv1(x)
        y = self.batchnorm1(y)
        y = self.relu1(y)
        
        y = self.conv2(y)
        y = self.batchnorm2(y)
        
        # The skip connection in a residual block is implemented using an element-wise addition 
        # operation between the output of the last convolutional layer (y) and the original input 
        # to the block (x). This addition operation is performed only if the input and output 
        # of the block have the same spatial dimensions. If the dimensions are different, 
        # the input is first passed through a 1x1 convolutional layer and batch normalization 
        # to match the output dimensions before being added to the output. 
        
        y = y + self.skip_model(x)
        y = self.relu2(y)
        
        return y

In [91]:
def test_Block_shapes():

    # The number of channels and resolution do not change
    batch_size = 20
    x = torch.zeros(batch_size, 16, 28, 28)
    block = Block(in_channels=16, out_channels=16)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels
    block = Block(in_channels=16, out_channels=32)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Decrease the resolution
    block = Block(in_channels=16, out_channels=16, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels and decrease the resolution
    block = Block(in_channels=16, out_channels=32, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_Block_shapes()

Success


In [92]:
tests.test_Block(Block)
tests.test_Block_relu(Block)
tests.test_Block_batch_norm(Block)

Success
Success
Success


### Group of blocks

ResNet consists of several groups of blocks. The first block in a group may change the number of channels (often multiples the number by 2) and subsample (using strides).

<img src="resnet_group.png" width=500 style="float: left;">

In [93]:
# We implement a group of blocks in this cell
class GroupOfBlocks(nn.Module):
    def __init__(self, in_channels, out_channels, n_blocks, stride=1):
        super(GroupOfBlocks, self).__init__()

        first_block = Block(in_channels, out_channels, stride)
        other_blocks = [Block(out_channels, out_channels) for _ in range(1, n_blocks)]
        self.group = nn.Sequential(first_block, *other_blocks)

    def forward(self, x):
        return self.group(x)

In [94]:
# Let's print a block
group = GroupOfBlocks(in_channels=10, out_channels=20, n_blocks=3)
print(group)

GroupOfBlocks(
  (group): Sequential(
    (0): Block(
      (conv1): Conv2d(10, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batchnorm1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu1): ReLU()
      (conv2): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batchnorm2): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu2): ReLU()
      (skip_model): Sequential(
        (0): Conv2d(10, 20, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): Block(
      (conv1): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batchnorm1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu1): ReLU()
      (conv2): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), paddin

### ResNet

Next we implement a ResNet with the following architecture. It contains three groups of blocks, each group having two basic blocks.

<img src="resnet.png" width=900 style="float: left;">

The cell below contains the implementation of our ResNet.

In [95]:
class ResNet(nn.Module):
    def __init__(self, n_blocks, n_channels=64, num_classes=10):
        """
        Args:
          n_blocks (list):   A list with three elements which contains the number of blocks in 
                             each of the three groups of blocks in ResNet.
                             For instance, n_blocks = [2, 4, 6] means that the first group has two blocks,
                             the second group has four blocks and the third one has six blocks.
          n_channels (int):  Number of channels in the first group of blocks.
          num_classes (int): Number of classes.
        """
        assert len(n_blocks) == 3, "The number of groups should be three."
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_channels, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(n_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.group1 = GroupOfBlocks(n_channels, n_channels, n_blocks[0])
        self.group2 = GroupOfBlocks(n_channels, 2*n_channels, n_blocks[1], stride=2)
        self.group3 = GroupOfBlocks(2*n_channels, 4*n_channels, n_blocks[2], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=4, stride=1)
        self.fc = nn.Linear(4*n_channels, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, np.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x, verbose=False):
        """
        Args:
          x of shape (batch_size, 1, 28, 28): Input images.
          verbose: True if you want to print the shapes of the intermediate variables.
        
        Returns:
          y of shape (batch_size, 10): Outputs of the network.
        """
        if verbose: print(x.shape)
        x = self.conv1(x)
        if verbose: print('conv1:  ', x.shape)
        x = self.bn1(x)
        if verbose: print('bn1:    ', x.shape)
        x = self.relu(x)
        if verbose: print('relu:   ', x.shape)
        x = self.maxpool(x)
        if verbose: print('maxpool:', x.shape)

        x = self.group1(x)
        if verbose: print('group1: ', x.shape)
        x = self.group2(x)
        if verbose: print('group2: ', x.shape)
        x = self.group3(x)
        if verbose: print('group3: ', x.shape)

        x = self.avgpool(x)
        if verbose: print('avgpool:', x.shape)

        x = x.view(-1, self.fc.in_features)
        if verbose: print('x.view: ', x.shape)
        x = self.fc(x)
        if verbose: print('out:    ', x.shape)

        return x

In [96]:
def test_ResNet_shapes():
    # Create a network with 2 block in each of the three groups
    n_blocks = [2, 2, 2]  # number of blocks in the three groups
    net = ResNet(n_blocks, n_channels=10)
    net.to(device)

    # Feed a batch of images from the training data to test the network
    with torch.no_grad():
        images, labels = next(iter(trainloader))
        images = images.to(device)
        print('Shape of the input tensor:', images.shape)

        y = net.forward(images, verbose=True)
        print(y.shape)
        assert y.shape == torch.Size([trainloader.batch_size, 10]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_ResNet_shapes()

Shape of the input tensor: torch.Size([32, 1, 28, 28])
torch.Size([32, 1, 28, 28])
conv1:   torch.Size([32, 10, 28, 28])
bn1:     torch.Size([32, 10, 28, 28])
relu:    torch.Size([32, 10, 28, 28])
maxpool: torch.Size([32, 10, 14, 14])
group1:  torch.Size([32, 10, 14, 14])
group2:  torch.Size([32, 20, 7, 7])
group3:  torch.Size([32, 40, 4, 4])
avgpool: torch.Size([32, 40, 1, 1])
x.view:  torch.Size([32, 40])
out:     torch.Size([32, 10])
torch.Size([32, 10])
Success


# Train the network

In [97]:
# This function computes the accuracy on the test dataset
def compute_accuracy(net, testloader):
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

### Training loop

In the cell below, implement the training loop. The recommended hyperparameters:
* Adam optimizer with learning rate 0.01.
* Cross-entropy loss. Note that we did not use softmax nonlinearity in the final layer of our network. Therefore, we need to use a loss function with log_softmax implemented, such as [nn.CrossEntropyLoss](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html#torch.nn.CrossEntropyLoss).
* Number of epochs: 10

We recommend you to use function `compute_accuracy()` defined above to track the accaracy during training. The test accuracy should be above 0.9.

**Note: function `compute_accuracy()` sets the network into the evaluation mode which changes the way the batch statistics are computed in batch normalization. You need to set the network into the training mode (by calling `net.train()`) when you want to perform training.**

In [98]:
# Create the network
n_blocks = [2, 2, 2]  # number of blocks in the three groups
net = ResNet(n_blocks, n_channels=16)
net.to(device)

ResNet(
  (conv1): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (group1): GroupOfBlocks(
    (group): Sequential(
      (0): Block(
        (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (batchnorm1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU()
        (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (batchnorm2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU()
        (skip_model): Identity()
      )
      (1): Block(
        (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (batchnorm1): BatchNorm2

In [99]:
if not skip_training:
    # YOUR CODE HERE
    #raise NotImplementedError()

    # Zero out the gradients
    net.zero_grad()
    
    # The loss function
    crossEntropyLoss = nn.CrossEntropyLoss()
    
    # The optimizers
    optimizer = optim.Adam(net.parameters(), lr=0.01)
    
    # Number of training epochs 
    epochs = 10
    
    # Training over the dataset multiple times
    for epoch in range(epochs):
        # Before training the network, we call net.train() 
        net.train()
        for images, labels in trainloader:
            optimizer.zero_grad()
            labels_pred = net(images)
            loss = crossEntropyLoss(labels_pred, labels)
            loss.backward()
            optimizer.step()
        
        accuracy = compute_accuracy(net,testloader)
        print(f"Accuracy at epoch {epoch}: {accuracy}")

In [100]:
# Save the model to disk (the pth-files will be submitted automatically together with your notebook)
# Set confirm=False if you do not want to be asked for confirmation before saving.
if not skip_training:
    tools.save_model(net, '3_resnet.pth', confirm=False)

In [101]:
if skip_training:
    net = ResNet(n_blocks, n_channels=16)
    tools.load_model(net, '3_resnet.pth', device)

Model loaded from 3_resnet.pth.


In [102]:
# Compute the accuracy on the test set
accuracy = compute_accuracy(net, testloader)
print('Accuracy of the network on the test images: %.3f' % accuracy)
n_blocks = sum(type(m) == Block for _, m in net.named_modules())
assert n_blocks == 6, f"Wrong number ({n_blocks}) of blocks used in the network."

assert accuracy > 0.9, "Poor accuracy ({:.3f})".format(accuracy)
print('Success')

Accuracy of the network on the test images: 0.922
Success
