In [128]:
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np

In [129]:
transform = transforms.Compose([transforms.ToTensor()])
    
train_data = datasets.MNIST('./data', download=True, transform=transform, train=True)
test_data = datasets.MNIST('./data', download=True, transform=transform, train=False)

### HyperParameters

In [130]:
in_channels = 1
input_size = 28
num_layers = 2
epochs = 2
batch_size = 64
num_classes = 10
learning_rate = 0.001

### Creating the loaders

In [131]:
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, num_workers=0)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size)

### Model

In [132]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels*self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
    def forward(self, x):
        identity = x
        
        # A whole block
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        # Time to add the identity
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
        x += identity
        x = self.relu(x)
        
        return x        

In [133]:
# layers will be a list telling how many times to use the block
# In ResNet50 layers would be [3,4,6,3]
class ResNet(nn.Module): 
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        
        #Not ResNet layers yet, just initialization
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2,padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        #ResNet layers
        self.layer1 = self._make_layer(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], out_channels=512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512*4, num_classes)
    def forward(self, x):
        # First part
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        # Into the Resnet blocks
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # Putting it through the average pooling to make sure its in a correct shape (1,1)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        
        return x
    
    
    def _make_layer(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        # Going to add the layers to a list
        layers = []
        
        # If the stride is not 1 or if the number of channels when the layer is over * 4 
        # is not equal to the number of channels inputted initially,
        # then the size is changed after going through the block
        # Which means we have to change the identity_downsample to match the output
        
        if stride != 1 or self.in_channels != out_channels * 4:
            
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels*4, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels*4)
            )
        #This first block is responsable for changing the number of channels
        layers.append(block(self.in_channels, out_channels, identity_downsample, stride))    
        
        self.in_channels = out_channels * 4 #256 channels
        
        for i in range(num_residual_blocks - 1): # -1 because already computted one block
            layers.append(block(self.in_channels, out_channels)) #256, 64 in the second block
            
        return nn.Sequential(*layers)    

### Function that creates ResNet50. Creating ResNet101, for example, would only require to change the layers list values

In [134]:
def ResNet50(img_channels, num_classes):
    return ResNet(Block, [3, 4, 6, 3], img_channels, num_classes)

#### Checking if the output size is correct

In [135]:
def test():
    net = ResNet50(1, 10)
    x = torch.rand(2, 1, 28, 28)
    y = net(x)
    print(y.shape)

In [136]:
test()

torch.Size([2, 10])


### Creating model

In [137]:
model = ResNet50(in_channels, num_classes)

### Loss function and Optimizer

In [138]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

#### Training function

In [139]:
import time
def train(model, loss_func, optimizer_func, train_loader, num_epochs):
    time_epoch = time.time()
    for epoch in range(num_epochs):
        time_100_batch = time.time()
        for i, (images, labels) in enumerate(train_loader):
            out = model(images)
            loss = loss_func(out, labels)
            optimizer_func.zero_grad()
            loss.backward()
            optimizer.step()
            
            if i%101==100:
                print(f'Epoch:{epoch}, Batch:{i}, Loss:{loss.item()}, Time Spent:{time.time()-time_100_batch}')
    print(f'Done Training. Total Time: {time.time()-time_epoch}')        

In [140]:
train(model, criterion, optimizer, train_loader, epochs)

KeyboardInterrupt: 

### Testing accuracy

In [164]:
final_model = nn.Sequential(model,
                           nn.Softmax(num_classes))

def check_accuracy(model, loader):
    correct = 0
    total = 0
    model.eval()
    
    with torch.no_grad():
        for x, y in loader:
            
            x = x.squeeze(1)
            
            scores = model(x)
            
            _, predictions = scores.max(1)
            correct += (predictions==y).sum()
            total += predictions.size(0)
        print(f'Got {correct} / {total} with accuracy {float(correct)/float(total) * 100:.2f}') 
    
    model.train()    

In [165]:
check_accuracy(model, test_loader)

RuntimeError: Expected 4-dimensional input for 4-dimensional weight [64, 1, 7, 7], but got 3-dimensional input of size [64, 28, 28] instead