# Convolutional networks - ResNet

A network with an architecure inspired by [ResNet](https://arxiv.org/pdf/1512.03385.pdf).

In [1]:
skip_training = False  # Set this flag to True before validation and submission

In [2]:
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torchvision
import torchvision.transforms as transforms

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import tools
import tests

In [3]:
# When running on your own computer, you can specify the data directory by:
# data_dir = tools.select_data_dir('/your/local/data/directory')
data_dir = tools.select_data_dir()

The data directory is ../data


In [4]:
# Select the device for training (use GPU if you have one)
#device = torch.device('cuda:0')
device = torch.device('cpu')

In [5]:
if skip_training:
    # The models are always evaluated on CPU
    device = torch.device("cpu")

## FashionMNIST dataset

The dataset is used is FashionMNIST. It consists of 60,000 training images of 10 classes: 'T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot'.

In [6]:
transform = transforms.Compose([
    transforms.ToTensor(),  # Transform to tensor
    transforms.Normalize((0.5,), (0.5,))  # Min-max scaling to [-1, 1]
])

trainset = torchvision.datasets.FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
testset = torchvision.datasets.FashionMNIST(root=data_dir, train=False, download=True, transform=transform)

classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal',
           'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=False)

## ResNet

Here I created a network with an architecure inspired by [ResNet](https://arxiv.org/pdf/1512.03385.pdf).

### ResNet block
ResNet consists of blocks with two convolutional layers and a skip connection.

In the most general case, the implementation should have:

<img src="images/resnet_block_04.png" width=220 style="float: right;">

* Two convolutional layers with:
    * 3x3 kernel
    * no bias terms
    * padding with one pixel on both sides
    * 2d batch normalization after each convolutional layer.

* **The first convolutional layer also (optionally) has:**
    * different number of input channels and output channels
    * change of the resolution with stride.

* The skip connection:
    * simply copies the input if the resolution and the number of channels do not change.
    * if either the resolution or the number of channels change, the skip connection should have one convolutional layer with:
        * 1x1 convolution **without bias**
        * change of the resolution with stride (optional)
        * different number of input channels and output channels (optional)
    * if either the resolution or the number of channels change, the 1x1 convolutional layer is followed by 2d batch normalization.

* The ReLU nonlinearity is applied after the first convolutional layer and at the end of the block.

<div class="alert alert-block alert-warning">
<b>Note:</b> Batch normalization is expected to be right after a convolutional layer.
</div>

<img src="images/resnet_blocks_123.png" width=650 style="float: top;">

The implementation should also handle specific cases such as:

Left: The number of channels and the resolution do not change.
There are no computations in the skip connection.

Middle: The number of channels changes, the resolution does not change.

Right: The number of channels does not change, the resolution changes.

In [8]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        """
        Args:
          in_channels (int):  Number of input channels.
          out_channels (int): Number of output channels.
          stride (int):       Controls the stride.
        """
        super(Block, self).__init__()
        self.layer1 = nn.Sequential()
        self.layer1.add_module("Conv1", nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1,stride = stride,bias = False))
        self.layer1.add_module("BN1", nn.BatchNorm2d(num_features=out_channels))# eps=1e-05, momentum=0.1, affine=True, track_running_stats=True))
        self.layer1.add_module("Relu5", nn.ReLU(inplace=False))
        
        self.layer2 = nn.Sequential()
        self.layer2.add_module("Conv2", nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3,padding=1,bias = False))
        self.layer2.add_module("BN2", nn.BatchNorm2d(num_features=out_channels))# eps=1e-05, momentum=0.1, affine=True, track_running_stats=True))
        
        self.shortcut = False
        if in_channels != out_channels or stride != 1:
            self.shortcut = nn.Sequential()
            self.shortcut.add_module("Conv", nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, padding=0,stride = stride,bias = False))
            self.shortcut.add_module("BN", nn.BatchNorm2d(num_features=out_channels))#

        
    def forward(self, x):
        y = self.layer1(x)
        y = self.layer2(y)
        if self.shortcut:
            x = self.shortcut(x)
        y = F.relu(y+x)
        return y

### Group of blocks

ResNet consists of several groups of blocks. The first block in a group may change the number of channels (often multiples the number by 2) and subsample (using strides).

<img src="images/resnet_group.png" width=500 style="float: left;">

In [9]:
# implement a group of blocks
class GroupOfBlocks(nn.Module):
    def __init__(self, in_channels, out_channels, n_blocks, stride=1):
        super(GroupOfBlocks, self).__init__()

        first_block = Block(in_channels, out_channels, stride)
        other_blocks = [Block(out_channels, out_channels) for _ in range(1, n_blocks)]
        self.group = nn.Sequential(first_block, *other_blocks)

    def forward(self, x):
        return self.group(x)

In [10]:
# Let's print a block
group = GroupOfBlocks(in_channels=10, out_channels=20, n_blocks=3)
print(group)

GroupOfBlocks(
  (group): Sequential(
    (0): Block(
      (layer1): Sequential(
        (Conv1): Conv2d(10, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (BN1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (Relu5): ReLU()
      )
      (layer2): Sequential(
        (Conv2): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (BN2): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (shortcut): Sequential(
        (Conv): Conv2d(10, 20, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (BN): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): Block(
      (layer1): Sequential(
        (Conv1): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (BN1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (Relu5): ReL

### ResNet

Next implement a ResNet with the following architecture. It contains three groups of blocks, each group having two basic blocks.

<img src="images/resnet.png" width=900 style="float: left;">

In [11]:
class ResNet(nn.Module):
    def __init__(self, n_blocks, n_channels=64, num_classes=10):
        """
        Args:
          n_blocks (list):   A list with three elements which contains the number of blocks in 
                             each of the three groups of blocks in ResNet.
                             For instance, n_blocks = [2, 4, 6] means that the first group has two blocks,
                             the second group has four blocks and the third one has six blocks.
          n_channels (int):  Number of channels in the first group of blocks.
          num_classes (int): Number of classes.
        """
        assert len(n_blocks) == 3, "The number of groups should be three."
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_channels, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(n_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.group1 = GroupOfBlocks(n_channels, n_channels, n_blocks[0])
        self.group2 = GroupOfBlocks(n_channels, 2*n_channels, n_blocks[1], stride=2)
        self.group3 = GroupOfBlocks(2*n_channels, 4*n_channels, n_blocks[2], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=4, stride=1)
        self.fc = nn.Linear(4*n_channels, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, np.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x, verbose=False):
        """
        Args:
          x of shape (batch_size, 1, 28, 28): Input images.
          verbose: True if you want to print the shapes of the intermediate variables.
        
        Returns:
          y of shape (batch_size, 10): Outputs of the network.
        """
        if verbose: print(x.shape)
        x = self.conv1(x)
        if verbose: print('conv1:  ', x.shape)
        x = self.bn1(x)
        if verbose: print('bn1:    ', x.shape)
        x = self.relu(x)
        if verbose: print('relu:   ', x.shape)
        x = self.maxpool(x)
        if verbose: print('maxpool:', x.shape)

        x = self.group1(x)
        if verbose: print('group1: ', x.shape)
        x = self.group2(x)
        if verbose: print('group2: ', x.shape)
        x = self.group3(x)
        if verbose: print('group3: ', x.shape)

        x = self.avgpool(x)
        if verbose: print('avgpool:', x.shape)

        x = x.view(-1, self.fc.in_features)
        if verbose: print('x.view: ', x.shape)
        x = self.fc(x)
        if verbose: print('out:    ', x.shape)

        return x

## Training 

In [12]:
# This function computes the accuracy on the test dataset
def compute_accuracy(net, testloader):
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

In [13]:
# Create the network
n_blocks = [2, 2, 2]  # number of blocks in the three groups
net = ResNet(n_blocks, n_channels=16)
net.to(device)

ResNet(
  (conv1): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (group1): GroupOfBlocks(
    (group): Sequential(
      (0): Block(
        (layer1): Sequential(
          (Conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (BN1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (Relu5): ReLU()
        )
        (layer2): Sequential(
          (Conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (BN2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): Block(
        (layer1): Sequential(
          (Conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(

In [14]:
if not skip_training:
    optimizer = torch.optim.Adam(net.parameters(),lr=0.01)
    n_epochs = 10
    loss_method = nn.CrossEntropyLoss()

    for epoch in range(n_epochs):
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            optimizer.zero_grad()
            net.zero_grad()
            outputs = net(inputs)
            loss = loss_method(outputs,labels)
            loss.backward()
            optimizer.step() 

        test_accuracy = compute_accuracy(net, testloader)
        print('Train Epoch {}: Loss: {:.6f} Test accuracy {:.2f}'.format(epoch, loss.item(), test_accuracy))  

Train Epoch 0: Loss: 0.164480 Test accuracy 0.86
Train Epoch 1: Loss: 0.160116 Test accuracy 0.88
Train Epoch 2: Loss: 0.198986 Test accuracy 0.89
Train Epoch 3: Loss: 0.407016 Test accuracy 0.89
Train Epoch 4: Loss: 0.329441 Test accuracy 0.89
Train Epoch 5: Loss: 0.117978 Test accuracy 0.91
Train Epoch 6: Loss: 0.188071 Test accuracy 0.90
Train Epoch 7: Loss: 0.508887 Test accuracy 0.91
Train Epoch 8: Loss: 0.272448 Test accuracy 0.90
Train Epoch 9: Loss: 0.089040 Test accuracy 0.91


## Save model to disk

In [15]:
if not skip_training:
    tools.save_model(net, '4_resnet.pth')
else:
    net = ResNet(n_blocks, n_channels=16)
    tools.load_model(net, '4_resnet.pth', device)

Do you want to save the model (type yes to confirm)? yes
Model saved to 4_resnet.pth.


## Evaluate model performance on test set

In [16]:
# Compute the accuracy on the test set
accuracy = compute_accuracy(net, testloader)
print('Accuracy of the network on the test images: %.3f' % accuracy)

Accuracy of the network on the test images: 0.914
