In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim 
import torchvision
import torchvision.transforms as transforms

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import json
import pdb

from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter 
from IPython.display import display, clear_output

from collections import OrderedDict
from collections import namedtuple
from itertools import product

torch.set_printoptions(linewidth = 120) 
torch.set_grad_enabled(True) 

In [None]:
class bottleNeck(nn.Module):

    expansion = 4
    def __init__(self, in_channels, out_channels, stride = 1, dim_change = None):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = 1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size = 1) #expanding to dim of 4 
        self.bn3 = nn.BatchNorm2d(out_channels*self.expansion)
        self.dim_change = dim_change

    def forward(self,x):
        res = x
        
        output = F.relu(self.bn1(self.conv1(x)))
        output = F.relu(self.bn2(self.conv2(output)))
        output = self.bn3(self.conv3(output))

        if self.dim_change is not None:
            res = self.dim_change(res)
        
        output += res
        output = F.relu(output)
        return output

In [None]:
class ResNet(nn.Module):

    def __init__(self, block, num_layers, classes=10):

        super().__init__()
   
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3,64,kernel_size=3,stride=1,padding=1)
        self.bn1 = nn.BatchNorm2d(64)

        self.layer1 = self._layer(block,64,num_layers[0],stride=1)
        self.layer2 = self._layer(block,128,num_layers[1],stride=2)
        self.layer3 = self._layer(block,256,num_layers[2],stride=2)
        self.layer4 = self._layer(block,512,num_layers[3],stride=2 )
        self.averagePool = nn.AvgPool2d(kernel_size=4,stride=1)
        self.fc = nn.Linear(512*block.expansion,classes)
    
    def _layer(self,block,out_channels,num_layers,stride=1):
        
        dim_change = None
        if stride!=1 or out_channels != self.in_channels*block.expansion:
            dim_change = nn.Sequential(
                                nn.Conv2d(self.in_channels,out_channels*block.expansion,kernel_size=1,stride=stride),nn.BatchNorm2d(out_channels*block.expansion))
            
        netLayers =[]
        netLayers.append(block(self.in_channels,out_channels,stride=stride,dim_change=dim_change))
        self.in_channels = out_channels * block.expansion
        for i in range(1,num_layers):
            netLayers.append(block(self.in_channels,out_channels))
            self.in_channels = out_channels * block.expansion
        
        return nn.Sequential(*netLayers)

    def forward(self,x):
        x = F.relu(self.bn1(self.conv1(x)))

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = F.avg_pool2d(x,4)
        x = x.view(x.size(0),-1)
        x = self.fc(x)

        return x

In [None]:
def test():
        #To convert data from PIL to tensor
    transform = transforms.Compose(
        [transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
        )

    #Load train and test set:
    train = torchvision.datasets.CIFAR10(root='./data',train=True,download=True,transform=transform)
    trainset = torch.utils.data.DataLoader(train,batch_size=128,shuffle=True)

    test = torchvision.datasets.CIFAR10(root='./data',train=False,download=True,transform=transform)
    testset = torch.utils.data.DataLoader(test,batch_size=128,shuffle=False)
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)

    #ResNet-18 
    #net = ResNet(baseBlock,[2,2,2,2],10)

    #ResNet-50
    net =  ResNet(bottleNeck,[3,4,6,3])
    net.to(device)
    costFunc = torch.nn.CrossEntropyLoss()
    optimizer =  torch.optim.SGD(net.parameters(),lr=0.02,momentum=0.9)

    for epoch in range(100):
        closs = 0
        for i,batch in enumerate(trainset,0):
            data,output = batch
            data,output = data.to(device),output.to(device)
            prediction = net(data)
            loss = costFunc(prediction,output)
            closs = loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            #print every 1000th time
            if i%100 == 0:
                print('[%d  %d] loss: %.4f'% (epoch+1,i+1,closs/1000))
                closs = 0
        correctHits=0
        total=0
        for batches in testset:
            data,output = batches
            data,output = data.to(device),output.to(device)
            prediction = net(data)
            _,prediction = torch.max(prediction.data,1)  #returns max as well as its index
            total += output.size(0)
            correctHits += (prediction==output).sum().item()
        print('Accuracy on epoch ',epoch+1,'= ',str((correctHits/total)*100))

    correctHits=0
    total=0
    for batches in testset:
        data,output = batches
        data,output = data.to(device),output.to(device)
        prediction = net(data)
        _,prediction = torch.max(prediction.data,1)  #returns max as well as its index
        total += output.size(0)
        correctHits += (prediction==output).sum().item()
    print('Accuracy = '+str((correctHits/total)*100))

if __name__ == '__main__':
    test()

In [None]:
class baseBlock(nn.Module):

    expansion = 1
 
    def __init__(self, in_channels, out_channels, stride = 1, dim_change = None):
        super().__init__()  
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride = stride)
        self.bn1 = nn.BatchNorm2d(out_channels) 
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride = 1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.dim_change = dim_change

    def forward(self, x):
        #save the residue`
        res = x 
        output = F.relu(self.bn1(self.conv1(x))) 
        output = self.bn2(self.conv2(output)) 

#if we pass nothing to class then dim_change = None, as result identity will not changed
#When we pass dim_change = "some convolution layer" as class constructor argument, It will dim_change the identity by passing it to a 1x1 convolution layer to sucessfully perform addition. this layer will dim_change the identity through code as mentioned
        
        if self.dim_change is not None:
            res = self.dim_change(res)

        output += res 
        output = F.relu(output)

        return output 
