<font size="5">Mini Project on ResNet</font>

In this mini project, our task is to train a deep convolutional neural network to perform image classification.

We will train ResNet using the CIFAR10 dataset, which consists of 60000 32x32 colour images in 10 classes, with 6000 images per class. The classes are: {airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck}.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets

import os
import argparse
import torch.utils.data as data
from torchsummary import summary

import json
import copy

<font size="5">Loading and Preparing the Data</font>

Our dataset is made up of color images but three color channels (red, green and blue). To normalize our data we need to calculate the means and standard deviations for each of the color channels independently, and normalize them.

In [None]:
ROOT = '.data'
train_data = datasets.CIFAR10(root = ROOT, 
                              train = True, 
                              download = True)

Compute means and standard deviations along the R,G,B channel

In [None]:
means = train_data.data.mean(axis = (0,1,2)) / 255
stds = train_data.data.std(axis = (0,1,2)) / 255

Next, we will do data augmentation. For each training image we will randomly flip it horizontally, padding it by 4 and randomly crop part of it (32*32). Finally we will normalize each color channel using the means/stds we calculated above.

In [None]:
train_transforms = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean = means, std = stds)
])

test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean = means, std = stds)
])

Next, we'll load the dataset along with the transforms defined above.

We will also create a validation set with 10% of the training samples. The validation set will be used to monitor loss along different epochs, and we will pick the model along the optimization path that performed the best, and report final test accuracy numbers using this model.

In [None]:
classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

train_data = datasets.CIFAR10(ROOT, 
                              train = True, 
                              download = True, 
                              transform = train_transforms)

test_data = datasets.CIFAR10(ROOT, 
                             train = False, 
                             download = True, 
                             transform = test_transforms)

In [None]:
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(train_data, 
                                           [n_train_examples, n_valid_examples])

In [None]:
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms

 Create data loaders for train_data, valid_data, test_data,

In [None]:
#BATCH_SIZE = 128
BATCH_SIZE = 256
#BATCH_SIZE = 512

train_iterator = data.DataLoader(train_data,batch_size=BATCH_SIZE,shuffle=True)
valid_iterator = data.DataLoader(valid_data,batch_size=BATCH_SIZE,shuffle=False)
test_iterator = data.DataLoader(test_data,batch_size=BATCH_SIZE,shuffle=False)


<font size="5">Defining the Model</font>

Next up is defining the model.
ResNet will have the following architecture:
ReLU(S(x) + F(x))
where S(x) refers to the skipped connection and F(x) is a block that implements conv -> BN -> relu -> conv -> BN

Also, by reading this paper https://arxiv.org/abs/1603.05027
we have a new design F(x) implements -> BN -> relu -> conv -> BN -> relu -> conv (then do addition without relu)

Our code references https://github.com/FrancescoSaverioZuppichini/ResNet

<font size="4">Basic Block</font>

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()  

        ## design 1
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)

        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        
        self.bn2 = nn.BatchNorm2d(planes)
        
        ## design 2
        # self.bn1 = nn.BatchNorm2d(in_planes)
        # self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3,
        #                        stride=stride, padding=1, bias=False)
        # self.bn2 = nn.BatchNorm2d(planes)
        # self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
        #                        stride=1, padding=1, bias=False)   

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        ## design 1
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        
        ## design 2
        # out = F.relu(self.bn1(x))
        # out = F.relu(self.bn2(self.conv1(x)))
        # out = self.conv2(out)
        # out += self.shortcut(x)
        return out

<font size="5">ResNet</font>

In [None]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 128, num_blocks[3], stride=2)
        self.linear = nn.Linear(128*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

We'll create an instance of our model with the desired amount of classes.

In [None]:
model = ResNet(BasicBlock, [3, 4, 23, 3]) 
#[3, 4, 23, 3] resnet-101
#[3, 8, 36, 3] resnet-152
#[3, 24, 36, 3] resnet-200
summary(model.cuda(), (3, 32, 32))

<font size="5">Train the Model</font>

We then define the loss function we want to use, the device we'll use and place our model and criterion on to our device.

In [None]:
best_acc = 0  # best test accuracy

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = model.to(device)
if device == 'cuda':
    model = torch.nn.DataParallel(model)
    cudnn.benchmark = True


criterion = nn.CrossEntropyLoss()

##SGD
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
##ADAM
#optimizer = optim.Adam(model.parameters(), lr = 1e-3)

scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

define a function to calculate accuracy, train, evaluate

In [None]:
train_record = {}
valid_record = {}

def train(epoch):
    print('\nEpoch: %d' % epoch)
    train_record[epoch]=[]
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(train_iterator):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        acc = 100.*correct/total
        train_record[epoch]+=[acc]
        print('train b_id: ',batch_idx,' Loss: ', train_loss/(batch_idx+1),', Acc:', acc,' ;', correct,'/', total)  

def valid(epoch):
    valid_record[epoch]=[]
    global best_acc
    model.eval()
    valid_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(valid_iterator):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            valid_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
            acc = 100.*correct/total
            valid_record[epoch]+=[acc]
            print('valid b_id: ',batch_idx,' Loss: ', valid_loss/(batch_idx+1),', Acc:', acc,' ;', correct,'/', total)
    # Save checkpoint.
    acc = 100.*correct/total
    if acc > best_acc:
        print('Saving..')
        torch.save(model.state_dict(), 'best-model.pt')
        best_acc = acc
        
def evaluate(model, iterator, criterion, device):
    test_loss = 0
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(iterator):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    return test_loss/(batch_idx+1), 100.*correct/total

Then, finally, we train our model.

In [None]:
#EPOCHS = 25
#EPOCHS = 35
#EPOCHS = 45
EPOCHS = 85

start_epoch = 0  # start from epoch 0 or last checkpoint epoch

for epoch in range(start_epoch, start_epoch+EPOCHS):
    train(epoch)
    valid(epoch)
    scheduler.step()

# save data
train_record_file = open("train_record.json", "w")  
json.dump(train_record, train_record_file)  
train_record_file.close()  

test_record_file = open("test_record.json", "w")  
json.dump(valid_record, test_record_file)  
test_record_file.close() 

Evaluating the model

In [None]:
best_model = ResNet(BasicBlock, [3, 4, 23, 3])
best_model.load_state_dict(torch.load('best-model.pt'))
best_model = best_model.to(device)
test_loss, test_acc = evaluate(best_model, test_iterator, criterion, device)
print('Loss: ', test_loss,', Acc:', test_acc)