<a href="https://colab.research.google.com/github/BhardwajArjit/Research-Paper-Replication/blob/main/ResNet_Replication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## This notebook replicates the research paper titled "**Deep Residual Learning for Image Recognition**" with PyTorch.

The link to the paper: https://arxiv.org/abs/1512.03385

ResNet (Residual Network) is a deep neural network architecture that uses skip connections to facilitate training of very deep convolutional neural networks.

## 0. Get setup

In [1]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms, datasets
from torchsummary import summary
from torch.utils.data import Dataset, DataLoader, random_split

In [2]:
import os
import shutil
from collections import OrderedDict

In [17]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
device

device(type='cuda')

In [4]:
!nvidia-smi

Thu Nov  9 14:31:39 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   61C    P8    10W /  70W |      3MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [6]:
class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)

In [7]:
class BasicConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, option='A'):
        super(BasicConvBlock, self).__init__()

        self.features = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)),
            ('bn1', nn.BatchNorm2d(out_channels)),
            ('act1', nn.ReLU()),
            ('conv2', nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)),
            ('bn2', nn.BatchNorm2d(out_channels))
        ]))

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels:
            if option == 'A':
                # Use identity shortcuts with zero padding to increase channel dimension.
                pad_to_add = out_channels//4
                self.shortcut = LambdaLayer(lambda x:
                            F.pad(x[:, :, ::2, ::2], (0,0, 0,0, pad_to_add, pad_to_add, 0,0)))
            if option == 'B':
                self.shortcut = nn.Sequential(OrderedDict([
                    ('s_conv1', nn.Conv2d(in_channels, 2*out_channels, kernel_size=1, stride=stride, padding=0, bias=False)),
                    ('s_bn1', nn.BatchNorm2d(2*out_channels))
                ]))

    def forward(self, x):
        out = self.features(x)
        # sum it up with shortcut layer
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [8]:
class ResNet(nn.Module):
    def __init__(self, block_type, num_blocks):
        super(ResNet, self).__init__()

        self.in_channels = 16

        self.conv0 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn0 = nn.BatchNorm2d(16)

        self.block1 = self.__build_layer(block_type, 16, num_blocks[0], starting_stride=1)

        self.block2 = self.__build_layer(block_type, 32, num_blocks[1], starting_stride=2)

        self.block3 = self.__build_layer(block_type, 64, num_blocks[2], starting_stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.linear = nn.Linear(64, 10)

    def __build_layer(self, block_type, out_channels, num_blocks, starting_stride):
        strides_list_for_current_block = [starting_stride] + [1]*(num_blocks-1)

        # print('strides_list_for_current_block ', strides_list_for_current_block)
        layers = []

        for stride in strides_list_for_current_block:
            layers.append(block_type(self.in_channels, out_channels, stride))
            self.in_channels = out_channels

        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn0(self.conv0(x)))
        out = self.block1(out)
        out = self.block2(out)
        out = self.block3(out)
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.linear(out)
        return out

In [9]:
def ResNet56():
    return ResNet(block_type=BasicConvBlock, num_blocks=[9,9,9])

In [18]:
model = ResNet56()
model.to(device)
summary(model, (3, 32, 32))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 32, 32]             432
       BatchNorm2d-2           [-1, 16, 32, 32]              32
            Conv2d-3           [-1, 16, 32, 32]           2,304
       BatchNorm2d-4           [-1, 16, 32, 32]              32
              ReLU-5           [-1, 16, 32, 32]               0
            Conv2d-6           [-1, 16, 32, 32]           2,304
       BatchNorm2d-7           [-1, 16, 32, 32]              32
    BasicConvBlock-8           [-1, 16, 32, 32]               0
            Conv2d-9           [-1, 16, 32, 32]           2,304
      BatchNorm2d-10           [-1, 16, 32, 32]              32
             ReLU-11           [-1, 16, 32, 32]               0
           Conv2d-12           [-1, 16, 32, 32]           2,304
      BatchNorm2d-13           [-1, 16, 32, 32]              32
   BasicConvBlock-14           [-1, 16,

In [24]:
import torch
import torch.nn as nn

# Define the basic building block for ResNet
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = nn.ReLU()(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = nn.ReLU()(out)
        return out

# Define the ResNet-18 architecture
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = nn.ReLU()(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = nn.AdaptiveAvgPool2d(1)(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

# Create an instance of ResNet-18
def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

# Initialize the ResNet-18 model
resnet18 = ResNet18()


In [25]:
model = ResNet18()
model.to(device)
summary(model, (3, 32, 32))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
            Conv2d-3           [-1, 64, 32, 32]          36,864
       BatchNorm2d-4           [-1, 64, 32, 32]             128
            Conv2d-5           [-1, 64, 32, 32]          36,864
       BatchNorm2d-6           [-1, 64, 32, 32]             128
        BasicBlock-7           [-1, 64, 32, 32]               0
            Conv2d-8           [-1, 64, 32, 32]          36,864
       BatchNorm2d-9           [-1, 64, 32, 32]             128
           Conv2d-10           [-1, 64, 32, 32]          36,864
      BatchNorm2d-11           [-1, 64, 32, 32]             128
       BasicBlock-12           [-1, 64, 32, 32]               0
           Conv2d-13          [-1, 128, 16, 16]          73,728
      BatchNorm2d-14          [-1, 128,

In [12]:
def dataloader_cifar():
    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.5], std=[0.5])])

    train_dataset = datasets.CIFAR10(root="data", train=True, download=True, transform=transform)
    test_dataset = datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    # Split dataset into training set and validation set.
    train_dataset, val_dataset = random_split(train_dataset, (45000, 5000))

    print("Image shape of a random sample image : {}".format(train_dataset[0][0].numpy().shape), end = '\n\n')

    print("Training Set:   {} images".format(len(train_dataset)))
    print("Validation Set:   {} images".format(len(val_dataset)))
    print("Test Set:       {} images".format(len(test_dataset)))

    BATCH_SIZE = 32

    # Generate dataloader
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=10000, shuffle=True)

    return train_loader, val_loader, test_loader

In [13]:
train_loader, val_loader, test_loader = dataloader_cifar()

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:10<00:00, 15857640.55it/s]


Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Image shape of a random sample image : (3, 32, 32)

Training Set:   45000 images
Validation Set:   5000 images
Test Set:       10000 images


In [14]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [26]:
def train_model():
    EPOCHS = 15
    train_samples_num = 45000
    val_samples_num = 5000
    train_costs, val_costs = [], []

    #Training phase.
    for epoch in range(EPOCHS):

        train_running_loss = 0
        correct_train = 0

        model.train().cuda()

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Start the forward pass
            prediction = model(inputs)

            loss = criterion(prediction, labels)

            # do backpropagation and update weights with step()
            loss.backward()
            optimizer.step()

            # print('outputs on which to apply torch.max ', prediction)
            # find the maximum along the rows, use dim=1 to torch.max()
            _, predicted_outputs = torch.max(prediction.data, 1)

            # Update the running corrects
            correct_train += (predicted_outputs == labels).float().sum().item()

            train_running_loss += (loss.data.item() * inputs.shape[0])


        train_epoch_loss = train_running_loss / train_samples_num

        train_costs.append(train_epoch_loss)

        train_acc =  correct_train / train_samples_num

        # Now check trained weights on the validation set
        val_running_loss = 0
        correct_val = 0

        model.eval().cuda()

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass.
                prediction = model(inputs)

                # Compute the loss.
                loss = criterion(prediction, labels)

                # Compute validation accuracy.
                _, predicted_outputs = torch.max(prediction.data, 1)
                correct_val += (predicted_outputs == labels).float().sum().item()

            # Compute batch loss.
            val_running_loss += (loss.data.item() * inputs.shape[0])

            val_epoch_loss = val_running_loss / val_samples_num
            val_costs.append(val_epoch_loss)
            val_acc =  correct_val / val_samples_num

        info = "[Epoch {}/{}]: train-loss = {:0.6f} | train-acc = {:0.3f} | val-loss = {:0.6f} | val-acc = {:0.3f}"

        print(info.format(epoch+1, EPOCHS, train_epoch_loss, train_acc, val_epoch_loss, val_acc))

        torch.save(model.state_dict(), '/content/checkpoint_gpu_{}'.format(epoch + 1))

    torch.save(model.state_dict(), '/content/resnet-56_weights_gpu')

    return train_costs, val_costs

In [27]:
train_costs, val_costs = train_model()

[Epoch 1/15]: train-loss = 2.497190 | train-acc = 0.101 | val-loss = 0.002810 | val-acc = 0.106
[Epoch 2/15]: train-loss = 2.497436 | train-acc = 0.100 | val-loss = 0.003963 | val-acc = 0.105
[Epoch 3/15]: train-loss = 2.497844 | train-acc = 0.100 | val-loss = 0.003992 | val-acc = 0.106
[Epoch 4/15]: train-loss = 2.496700 | train-acc = 0.100 | val-loss = 0.003867 | val-acc = 0.105
[Epoch 5/15]: train-loss = 2.496780 | train-acc = 0.100 | val-loss = 0.003594 | val-acc = 0.105
[Epoch 6/15]: train-loss = 2.497021 | train-acc = 0.100 | val-loss = 0.004182 | val-acc = 0.105
[Epoch 7/15]: train-loss = 2.497312 | train-acc = 0.100 | val-loss = 0.003598 | val-acc = 0.105
[Epoch 8/15]: train-loss = 2.497635 | train-acc = 0.100 | val-loss = 0.003799 | val-acc = 0.106
[Epoch 9/15]: train-loss = 2.497138 | train-acc = 0.100 | val-loss = 0.003741 | val-acc = 0.105
[Epoch 10/15]: train-loss = 2.497585 | train-acc = 0.101 | val-loss = 0.003136 | val-acc = 0.106
[Epoch 11/15]: train-loss = 2.497258 | 

In [21]:
model = ResNet56()
model.load_state_dict(torch.load('/content/resnet-56_weights_gpu'))

<All keys matched successfully>

In [22]:
test_samples_num = 10000
correct = 0

model.eval().cuda()

with  torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        # Make predictions.
        prediction = model(inputs)

        # Retrieve predictions indexes.
        _, predicted_class = torch.max(prediction.data, 1)

        # Compute number of correct predictions.
        correct += (predicted_class == labels).float().sum().item()

test_accuracy = correct / test_samples_num
print('Test accuracy: {}'.format(test_accuracy))

Test accuracy: 0.1087
