In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader



**Define A Residual Block**


In [21]:
class ResidualBlock(nn.Module):
  def __init__(self,in_channels,out_channels,stride=1):
    super().__init__()
    #first convolution 3x3

    self.conv1=nn.Conv2d(in_channels,out_channels,kernel_size=3,stride=stride,padding=1, bias=False)
    self.bn1=nn.BatchNorm2d(out_channels)
    #second convolution  3x3
    self.conv2=nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=1,padding=1, bias=False)
    self.bn2=nn.BatchNorm2d(out_channels)

    #shortcut connection
    self.shortcut=nn.Sequential() #gives x itself , it is empty container
    if stride !=1 or in_channels!=out_channels:
      self.shortcut=nn.Sequential(
          nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=stride, bias=False),
          nn.BatchNorm2d(out_channels)    #if stride isnt 1 or channels differ we need to project input x to correct size, ie use 1x1 convolutions

      )
  def forward(self,x):
    out=F.relu(self.bn1(self.conv1(x)))
    out=self.bn2(self.conv2(out))
    out+=self.shortcut(x)
    out=F.relu(out)
    return out


In [22]:
# blocks is class - Residual Block
#num_blocks is a list with how many blocks to put in each layer
#num_classes is number of output classes


class ResNet(nn.Module):
  def __init__(self,block,num_blocks,num_classes=10):
    super(ResNet,self).__init__()
    self.in_channels = 16

    #initial conv
    # Adjusted stride and padding for 28x28 input (MNIST)
    self.conv1=nn.Conv2d(1,16,kernel_size=3,stride=1,padding=1,bias=False) #in_channels are 1 , out_channels are 16, stride=1 and padding =1 preserves Spatial Size ( 28x28 to 28x28)

    self.bn1=nn.BatchNorm2d(16) #we put bias=False because BatchNorm has its own affine shift


    #Residual Layers
    self.layer1 = self._make_layer(block, 16,  num_blocks[0], stride=1) #outputs 16 channels , use stride =1 (no downsampling)
    self.layer2 = self._make_layer(block, 32,  num_blocks[1], stride=2) # outputs 32 channels , first block stride =2 ( downsamples )
    self.layer3 = self._make_layer(block, 64,  num_blocks[2], stride=2) # outputs 64 channels , further downsampling

    #each nummblocks[i] says how many block instances to put in that layer
    #classifier
    self.linear=nn.Linear(64,num_classes)

  def _make_layer(self,block,out_channels,num_blocks,stride): # helps build on stage ( a sequence of num_blocks residual blocks)\
    strides= [stride]+[1]*(num_blocks-1) # use stride for first block and 1 for rest
    layers=[]
    for s in strides:
      layers.append(block(self.in_channels,out_channels,stride))
      self.in_channels=out_channels
    return nn.Sequential(*layers)
  def forward(self,x):
    out=F.relu(self.bn1(self.conv1(x)))
    out=self.layer1(out)    # 28x28
    out=self.layer2(out)    # 14 x 14
    out=self.layer3(out)   # 7x7  , 64 channels
    # Adjusted kernel size for average pooling for 28x28 input
    out=F.avg_pool2d(out,2)     # 7x7 to 1 feature map has B,64,1,1 # should be 4x4 to 1x1 for 28x28 input
    out=out.view(out.size(0),-1)     #flatten to B,64
    out=self.linear(out)
    return out
def ResNetMNIST():
  return ResNet(ResidualBlock,[2,2,2],num_classes=10)

*Load Data*

In [23]:
#Compose combines multiple transformations
# comvert 28x28 into tensor and normalize by x-0.5/ 0.5

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

#load mnist
train_dataset=torchvision.datasets.MNIST(root='./data',train=True,download=True,transform=transform)
test_dataset=torchvision.datasets.MNIST(root='./data',train=False,download=True,transform=transform)

#Data Loader
train_loader=DataLoader(train_dataset,batch_size=64,shuffle=True)
test_loader=DataLoader(test_dataset,batch_size=1000,shuffle=False)


**train the model**

In [24]:
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

#initialize model,loss,optimizer

model=ResNetMNIST().to(device)

criterion=nn.CrossEntropyLoss()

optimizer=torch.optim.Adam(model.parameters(),lr=0.001)

for epoch in range(10):
  model.train()
  for batch_idx, (images,labels) in enumerate(train_loader):
    images,labels=images.to(device),labels.to(device)
    #forward pass
    outputs=model(images)
    loss=criterion(outputs,labels)
    #backward and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  print(f"Epoch [{epoch+1}/5], Loss: {loss.item():.4f}")



Epoch [1/5], Loss: 0.0163
Epoch [2/5], Loss: 0.0254
Epoch [3/5], Loss: 0.0764
Epoch [4/5], Loss: 0.0026
Epoch [5/5], Loss: 0.0028
Epoch [6/5], Loss: 0.0150
Epoch [7/5], Loss: 0.0241
Epoch [8/5], Loss: 0.0002
Epoch [9/5], Loss: 0.0002
Epoch [10/5], Loss: 0.0037


In [27]:
# Evaluation
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")


Test Accuracy: 99.05%
