## Reproduction of *Deep Residual Learning for Image Recognition*

Paper here: https://arxiv.org/pdf/1512.03385

In [288]:
import torch
import torch.nn as nn
from torchsummary import summary
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

In [175]:
def kaiming_init(m):
    if isinstance(m, (nn.Conv2d, nn.Linear)):
        nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
        if m.bias is not None:
            nn.init.zeros_(m.bias)

In [274]:
class ResidualBlockConv(nn.Module):
    def __init__(
            self, 
            in_channels=None,
            out_channels=None,
            num_layers=2, 
            kernel_size=3,
            projection=False,
            activation=nn.ReLU()
    ) -> None:
        super(ResidualBlockConv, self).__init__()
        assert in_channels is not None, "Input channels is set to None"
        assert out_channels is not None, "Output channels is set to None"

        self.projection = projection
        self.layers = nn.Sequential()
        ß
        if self.projection:
            self.downsample_conv = nn.Conv2d(
                kernel_size=1, 
                in_channels=in_channels,
                out_channels=out_channels,
                stride=2
            )

        layers = []
        for i in range(num_layers+1):
            if i == 0 and self.projection:
                layers.append(nn.Conv2d(
                    in_channels=in_channels,
                    out_channels=out_channels,
                    kernel_size=kernel_size,
                    padding=1,
                    stride=2
                ))
            elif i == num_layers - 1:
                layers.append(activation)
            else:
                layers.append(nn.Conv2d(
                    in_channels=out_channels,
                    out_channels=out_channels,
                    kernel_size=kernel_size,
                    padding=1
                ))
        
        self.layers = nn.Sequential(*layers)
        self.last_relu = activation


    def downsample(self, x):
        if self.downsample_conv is not None:
            return self.downsample_conv(x)
        return x

        
    def forward(self, x):
        residual = x
        
        out = self.layers(x)
             
        # check if we need to downsample
        if self.projection:
             residual = self.downsample(residual)
        
        # perform the residual mapping
        out += residual
        out = self.last_relu(out)
        return out 

In [340]:
class ResNet34(nn.Module):
    def __init__(
            self,
            net_blocks,
            num_classes=10,
            activation=nn.ReLU(),
        ) -> None:
        super(ResNet34, self).__init__()

        self.layers = nn.ModuleList()
        self.layers.append(nn.Conv2d(
            in_channels=3,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3
        ))

        self.layers.append(nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
        self.layers.append(activation)
        self.layers.append(self._make_block(net_blocks[0], 64, 64))
        self.layers.append(self._make_block(net_blocks[1], 64, 128))
        self.layers.append(self._make_block(net_blocks[2], 128, 256))
        self.layers.append(self._make_block(net_blocks[3], 256, 512))
        self.layers.append(nn.AdaptiveAvgPool2d(output_size=(1, 1)))
        self.layers.append(nn.Flatten(start_dim=1))
        self.layers.append(nn.Linear(in_features=512, out_features=num_classes))

        self.apply(kaiming_init)
    
    def _make_block(self, num_blocks, in_channels, out_channels):
        new_block = nn.Sequential()
        for i in range(num_blocks):
            if in_channels != out_channels:
                if i == 0:
                    new_block.append(ResidualBlockConv(
                        in_channels=in_channels,
                        out_channels=out_channels,
                        projection=True
                    ))
                else:
                    new_block.append(ResidualBlockConv(
                        in_channels=out_channels,
                        out_channels=out_channels
                    ))
            else:
                new_block.append(ResidualBlockConv(
                    in_channels=out_channels,
                    out_channels=out_channels
                ))

            return new_block

    def forward(self, x):
        out = x
        for layer in self.layers:
            out = layer(out)
        return out 
        

In [276]:
test_resnet = ResNet34([3, 4, 6, 3])

In [292]:
summary(test_resnet, input_size=(3, 32, 32), batch_size=32)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [32, 64, 16, 16]           9,472
         MaxPool2d-2             [32, 64, 8, 8]               0
              ReLU-3             [32, 64, 8, 8]               0
            Conv2d-4             [32, 64, 8, 8]          36,928
              ReLU-5             [32, 64, 8, 8]               0
              ReLU-6             [32, 64, 8, 8]               0
              ReLU-7             [32, 64, 8, 8]               0
              ReLU-8             [32, 64, 8, 8]               0
              ReLU-9             [32, 64, 8, 8]               0
             ReLU-10             [32, 64, 8, 8]               0
             ReLU-11             [32, 64, 8, 8]               0
             ReLU-12             [32, 64, 8, 8]               0
           Conv2d-13             [32, 64, 8, 8]          36,928
             ReLU-14             [32, 6

Create a plain network to constrast the residual learning net with a plain deep one

In [341]:
class Plain34(nn.Module):
    def __init__(
            self,
            num_classes=10,
            activation=nn.ReLU()
        ) -> None:
        super(Plain34, self).__init__()
        
        self.layers = nn.ModuleList()
        self.layers.append(nn.Conv2d(
            in_channels=3,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3
        ))

        self.layers.append(nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
        self.layers.append(activation)

        for _ in range(6):
            self.layers.append(nn.Conv2d(
                in_channels=64,
                out_channels=64,
                kernel_size=3,
                padding=1
            ))
            self.layers.append(activation)
        
        self.layers.append(nn.Conv2d(
            in_channels=64, 
            out_channels=128,
            kernel_size=3, 
            stride=2,
            padding=1
        ))
        self.layers.append(activation)

        for _ in range(7):
            self.layers.append(nn.Conv2d(
                in_channels=128,
                out_channels=128,
                kernel_size=3,
                padding=1
            ))
            self.layers.append(activation)
        
        self.layers.append(nn.Conv2d(
            in_channels=128, 
            out_channels=256,
            kernel_size=3, 
            stride=2,
            padding=1
        ))
        self.layers.append(activation)

        for _ in range(11):
            self.layers.append(nn.Conv2d(
                in_channels=256,
                out_channels=256,
                kernel_size=3,
                padding=1
            ))
            self.layers.append(activation)

        self.layers.append(nn.Conv2d(
            in_channels=256, 
            out_channels=512,
            kernel_size=3, 
            stride=2,
            padding=1
        ))
        self.layers.append(activation)

        for _ in range(5):
            self.layers.append(nn.Conv2d(
                in_channels=512,
                out_channels=512,
                kernel_size=3,
                padding=1
            ))
            self.layers.append(activation)

        self.layers.append(nn.AdaptiveAvgPool2d(
            output_size=(1, 1)
        ))

        self.layers.append(nn.Flatten(start_dim=1))

        self.layers.append(nn.Linear(
            in_features=512,
            out_features=num_classes
        ))
        
        # self.layers.append(nn.LogSoftmax(dim=1))

        self.apply(kaiming_init)

    def forward(self, x):
        out = x 
        for layer in self.layers:
            out = layer(out)
        return out        

In [325]:
plain_net = Plain34()

In [326]:
summary(plain_net, input_size = (3, 32, 32), batch_size= 32)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [32, 64, 16, 16]           9,472
         MaxPool2d-2             [32, 64, 8, 8]               0
              ReLU-3             [32, 64, 8, 8]               0
            Conv2d-4             [32, 64, 8, 8]          36,928
              ReLU-5             [32, 64, 8, 8]               0
            Conv2d-6             [32, 64, 8, 8]          36,928
              ReLU-7             [32, 64, 8, 8]               0
            Conv2d-8             [32, 64, 8, 8]          36,928
              ReLU-9             [32, 64, 8, 8]               0
           Conv2d-10             [32, 64, 8, 8]          36,928
             ReLU-11             [32, 64, 8, 8]               0
           Conv2d-12             [32, 64, 8, 8]          36,928
             ReLU-13             [32, 64, 8, 8]               0
           Conv2d-14             [32, 6

In [327]:
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.optim as optim
from torchvision.transforms import v2
from yaml import load, Loader

In [329]:
with open('config.yaml', 'r') as f:
    config = load(f, Loader)

num_classs = config['num_classes']
starting_lr = config['starting_lr']
input_size = config['input_size']
epochs = config['epochs']
batch_size = config['batch_size']

In [333]:
transformation_train = v2.Compose([
    v2.ToImage(),
    v2.RandomResizedCrop(size=(224, 224), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomRotation(degrees=90),
    v2.ColorJitter(brightness=.5, hue=.3),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [334]:
transformation_test = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [335]:
cifar10_train = CIFAR10('data/', train=True, transform=transformation_train, download=True)
cifar10_test = CIFAR10('data/', transform=transformation_test, download=True)

trainloader = DataLoader(cifar10_train, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = DataLoader(cifar10_test, batch_size=batch_size, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [336]:
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [337]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(plain_net.parameters(), lr=starting_lr)

In [339]:
for epoch in range(1):
    
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data

        optimizer.zero_grad()

        outputs = plain_net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # repurposed from https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#define-a-loss-function-and-optimizer
        running_loss += loss.item()
        if i % 2000 == 1999:    
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0


print(running_loss)


KeyboardInterrupt: 