# RESNET

The Pytorch implementation for the ResNet paper: Deep Residual Learning for Image Recognition 

### Paper 
<a href="https://arxiv.org/abs/1512.03385">Deep Residual Learning for Image Recognition</a>

### Implementation
- The image is resized with its shorter side randomly sampled in [256, 480] for scale augmentation. 
- A 224×224 crop is randomly sampled from an image or its horizontal flip. 
- Adopt batch normalization (BN) right after each convolution and before activation. (Does not need bias term)
- Do not use dropout.

### ResNet-50 Schematic diagram

<a href="https://drive.google.com/file/d/1B3T3Yv5_f4MvXlu5nG-AeByEMCV2N9QY/view?usp=sharing">ResNet-50 Schematic Diagram</a>

<div align="center">
  <img src="img/ResNet50.png" width="100%"/>
</div>

### Structure

<div align="center">
  <img src="img/ResNet-structure.jpeg" width="100%"/>
</div>

<div align="center">
  <img src="img/ResNet-bottleneck.jpeg" width="50%"/>
</div>


In [2]:
import torch
import torch.nn as nn

# Residual Block

In [98]:
class Block(nn.Module):
    '''A "bottleneck" building block for ResNet-50/101/152
    '''
    def __init__(self, in_channels, intermediate_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        
        # ResNet use 4 times expansion in the entire design
        self.expansion = 4
        
        self.identity_downsample = identity_downsample
        self.stride = stride
        
        # 1x1 Conv 
        self.conv1 = nn.Conv2d(in_channels, intermediate_channels, 
                               kernel_size=1, stride=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        
        # 3x3 Conv (bottleneck)
        self.conv2 = nn.Conv2d(intermediate_channels, intermediate_channels, 
                               kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
        
        # 1x1 Conv
        self.conv3 = nn.Conv2d(intermediate_channels, intermediate_channels * self.expansion,
                               kernel_size=1, stride=1, padding=0, bias=False)
        self.bn3 = nn.BatchNorm2d(intermediate_channels * self.expansion)
        
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x)

        # When the start point and end point dimension is different...
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

# ResNet Structure

In [108]:
class ResNet(nn.Module):
    '''
        torch.nn.Conv2d(in_channels, out_channels, 
                    kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')
    
        torch.nn.BatchNorm2d(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
        torch.nn.MaxPool2d(kernel_size, stride=None, padding=0, dilation=1, return_indices=False, ceil_mode=False)
        torch.nn.AdaptiveAvgPool2d(output_size = (n,m))
        torch.nn.Linear(in_features, out_features, bias=True)
    '''
    def __init__(self, Block, layers, image_channels=3, num_classes=1000):
        super(ResNet, self).__init__()
        
        # Conv1 (7,7,64), s=2, p=3
        self.in_channels = 64
        self.Conv1 = nn.Sequential(
            nn.Conv2d(image_channels, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU()
        ) 
        
        # 3x3 Max Pooling, s=2, p=1
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet layers (Residual block layers staking)
        self.layer1 = self._make_layer(Block, layers[0], intermediate_channels=64,  stride=1)
        self.layer2 = self._make_layer(Block, layers[1], intermediate_channels=128, stride=2)
        self.layer3 = self._make_layer(Block, layers[2], intermediate_channels=256, stride=2)
        self.layer4 = self._make_layer(Block, layers[3], intermediate_channels=512, stride=2)

        # Adaptive Average Pooling (1x1 outputsize)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # FC (2048 -> 1000)
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        
        x = self.Conv1(x)        
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)


        # AVG Pool & FC
        # count how many batch and reshape it to one-dimension
        # i.g. (batch_size,2048,1,1) -> (batch_size,2048)
        x = self.avgpool(x) 
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, Block, num_residual_blocks, intermediate_channels, stride):
        '''Creating the numbers of blocks stacked
        i.g.
            ResNet-50:  [3,4,6,3]
            ResNet-101: [3,4,23,3]
            ResNet-152: [3,8,36,3]
        '''
        identity_downsample = None
        layers = []

        
        # Adjust the channel size for the first block in each layer (Do identiry downsample)
        # we need to adapt the Identity (skip connection) 
        # so it can be able to be added to the layer that's ahead
        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, intermediate_channels * 4, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(intermediate_channels * 4),
            )

        layers.append(
            Block(self.in_channels, intermediate_channels, identity_downsample, stride)
        )

        # Update in_channels size for the following block
        # The expansion size is always 4 for ResNet 50,101,152
        self.in_channels = intermediate_channels * 4 

        # For example for first ResNet layer, sencond ~ last block: 
        # 256 will be mapped to 64 as intermediate layer, then finally back to 256(64*4),
        # Hence no identity downsample is needed
        #                     (256,56,56)
        # -> conv(1x1,64)  -> (64,56,56)
        # -> conv(3x3,64)  -> (64,56,56)
        # -> conv(1x1,256) -> (256,56,56)

        for i in range(num_residual_blocks - 1):
            layers.append(Block(self.in_channels, intermediate_channels))

        return nn.Sequential(*layers)



# ResNet Model

In [100]:
def ResNet_test(img_channel=3, num_classes=1000):
    '''fake structure just for test
    '''
    return ResNet(Block, [2, 1, 1, 1], img_channel, num_classes)

def ResNet50(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 4, 6, 3], img_channel, num_classes)


def ResNet101(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 4, 23, 3], img_channel, num_classes)


def ResNet152(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 8, 36, 3], img_channel, num_classes)

# Unit Test

In [101]:
# testing for block 
block_src = Block(in_channels=256, intermediate_channels=64)
block_src

Block(
  (conv1): Conv2d(256, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
)

In [102]:
net = ResNet_test(img_channel=3, num_classes=1000)
net

ResNet(
  (Conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Block(
      (identity_downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=

In [117]:
# testing for whole ResNet
def test():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    net = ResNet101(img_channel=3, num_classes=1000)
    y = net(torch.randn(4, 3, 224, 224)).to(device)
    print(y.size())
    
test()

torch.Size([4, 1000])


# Training

In [119]:
import torchvision 
import torchvision.transforms as transforms 

transform = transforms.Compose(
    [transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

#Load train and test set:
data = torchvision.datasets.CIFAR10(
    root='../CIFAR10',
    train=True,
    download=True,
    transform=transform
)

train_loader = torch.utils.data.DataLoader(data,batch_size=128,shuffle=True)
test_loader = torch.utils.data.DataLoader(data,batch_size=128,shuffle=False)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Files already downloaded and verified


In [116]:
# Optimizer and loss function
model = ResNet101(img_channel=3, num_classes=1000)
optimizer = torch.optim.SGD(model.parameters(), lr=0.02, momentum=0.9)
loss_function = nn.CrossEntropyLoss()

In [None]:
epochs = 2
for epoch in range(epochs):
    closs = 0
    
    for i,batch in enumerate(train_loader):
        
        inputs, output = batch
        inputs = inputs.to(device)
        output = output.to(device)
        
        # Forward
        prediction = model(inputs)

        # Backward
        optimizer.zero_grad()
        loss = loss_function(prediction, output)
        closs = loss.item()
        loss.backward()
        optimizer.step()
        
        # Show progress for every 100th times
        if i%100 == 0:
            print('[{}/{}] Loss: {}'.format(epoch+1,epochs,closs/100))
            closs = 0
            
            
    correctHits=0
    total=0     
    for i,batch in enumerate(test_loader):
        inputs, output = batch
        inputs = inputs.to(device)
        output = output.to(device)
        
        # Forward
        prediction = model(inputs)
        # returns max as well as its index
        _,prediction = torch.max(prediction.data,1)  
        total += output.size(0)
        correctHits += (prediction==output).sum().item()
    print('Accuracy on epoch ',epoch+1,'= ',str((correctHits/total)*100))

[1/2] Loss: 0.030661647319793702
[1/2] Loss: 0.024976458549499512
