## CV Project: ResNet-18
@author: Hantao Li

This code file constructs the ResNet-18 model and uses it to carry out relevant experiments of the MNIST dataset.
This code refers to the code of the official ResNet of PyTorch [1] and the example code given by Prof. Boyang Li in the AI 6103 course.
Since the main part of this experiment is the construction of MLP network, CNN is only used as improvement, where coding is not the main task; therefore, the repeatability with the two reference codes is Relatively high.


[1] https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms

import os
import argparse
import pandas as pd  
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
def train(epoch, net, criterion, trainloader, scheduler):
    device = 'cuda'
    print('\nEpoch: %d' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        if (batch_idx+1) % 50 == 0:
          print("iteration : %3d, loss : %0.4f, accuracy : %2.2f" % (batch_idx+1, train_loss/(batch_idx+1), 100.*correct/total))

    scheduler.step()
    return train_loss/(batch_idx+1), 100.*correct/total

def test(epoch, net, criterion, testloader):
    device = 'cuda'
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.inference_mode():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    return test_loss/(batch_idx+1), 100.*correct/total

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        # This is the "stem"
        # For CIFAR (32x32 images), it does not perform downsampling
        # It should downsample for ImageNet
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        # four stages with three downsampling
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

    
def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

In [None]:
transform_train = transforms.Compose([
    transforms.ToTensor(),
    torchvision.transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.02, 3.3), value=(0))
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
])

trainset = torchvision.datasets.MNIST(
    root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=128, shuffle=True, num_workers=2)

testset = torchvision.datasets.MNIST(
    root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=256, shuffle=False, num_workers=2)

### Basic Procedure

In [None]:
# main body

loopnumber = 1
epochnumber = 300
task_name = 'lr001_wd00005_cos_random_noflip'

train_loss_all = []
train_acc_all = []
test_loss_all = []
test_acc_all = []

for loop in range(1,loopnumber+1):
    
    config = {
    'lr': 0.01,
    'momentum': 0.9,
    'weight_decay': 0.0005
    }

    net = ResNet18().to('cuda')
    criterion = nn.CrossEntropyLoss().to('cuda')
    optimizer = optim.SGD(net.parameters(), lr=config['lr'],
                          momentum=config['momentum'], weight_decay=config['weight_decay'])
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochnumber)
    #scheduler = torch.optim.lr_scheduler.StepLR(optimizer, epochnumber, gamma=1, last_epoch=-1)
    
    train_loss = []
    train_acc = []
    test_loss = []
    test_acc = []
    
    print("###### The "+str(loop)+" Loop Starting...")
    for epoch in range(1, epochnumber+1):
        train_loss_cur, train_acc_cur = train(epoch, net, criterion, trainloader, scheduler)
        test_loss_cur, test_acc_cur = test(epoch, net, criterion, testloader)

        print(("Epoch : %3d, training loss : %0.4f, training accuracy : %2.2f, test loss " + \
          ": %0.4f, test accuracy : %2.2f") % (epoch, train_loss_cur, train_acc_cur, test_loss_cur, test_acc_cur))

        train_loss.append(train_loss_cur)
        test_loss.append(test_loss_cur)
        train_acc.append(train_acc_cur)
        test_acc.append(test_acc_cur)
        
    train_loss_all.append(train_loss)
    test_loss_all.append(test_loss)
    train_acc_all.append(train_acc)
    test_acc_all.append(test_acc)

### Plot Curves

In [None]:
#Draw the curve of episodes vs. rewards
train_loss_x = np.vstack((train_loss_all[0], train_loss_all[1]))
for i in range(2, loopnumber):
    train_loss_x = np.vstack((train_loss_x, train_loss_all[i]))
    
test_loss_x = np.vstack((test_loss_all[0], test_loss_all[1]))
for i in range(2, loopnumber):
    test_loss_x = np.vstack((test_loss_x, test_loss_all[i]))

df1 = pd.DataFrame(train_loss_x).melt(var_name='Epochs',value_name='Loss')
df2 = pd.DataFrame(test_loss_x).melt(var_name='Epochs',value_name='Loss')

plt.figure(dpi=600)
sns.lineplot(x="Epochs", y="Loss", data=df1)
sns.lineplot(x="Epochs", y="Loss", data=df2)

plt.xlabel("Number of epochs")
plt.ylabel("Loss")
plt.title("Loss vs Number of epochs")
plt.legend(['train', 'test'])

plt.savefig("./Output/"+task_name+"_loss.png", dpi=600)
plt.show()


train_acc_x = np.vstack((train_acc_all[0], train_acc_all[1]))
for i in range(2, loopnumber):
    train_acc_x = np.vstack((train_acc_x, train_acc_all[i]))
    
test_acc_x = np.vstack((test_acc_all[0], test_acc_all[1]))
for i in range(2, loopnumber):
    test_acc_x = np.vstack((test_acc_x, test_acc_all[i]))

df3 = pd.DataFrame(train_acc_x).melt(var_name='Epochs',value_name='Accuracy')
df4 = pd.DataFrame(test_acc_x).melt(var_name='Epochs',value_name='Accuracy')

plt.figure(dpi=600)
sns.lineplot(x="Epochs", y="Accuracy", data=df3)
sns.lineplot(x="Epochs", y="Accuracy", data=df4)

plt.xlabel("Number of epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Number of epochs")
plt.legend(['train', 'test'])

plt.savefig("./Output/"+task_name+"_acc.png", dpi=600)
plt.show()

### Calculate Average Data

In [None]:
average_train_acc = sum(train_acc_x[:,14])/loopnumber
average_test_acc = sum(test_acc_x[:,14])/loopnumber
average_train_loss = sum(train_loss_x[:,14])/loopnumber
average_test_loss = sum(test_loss_x[:,14])/loopnumber

print(("training loss : %0.4f, training accuracy : %2.2f, test loss " + \
          ": %0.4f, test accuracy : %2.2f") % (average_train_loss, average_train_acc, average_test_loss, average_test_acc))

### Plot Curves for single epoch

In [None]:
plt.figure(dpi=600)
plt.plot(range(len(train_loss)), train_loss, 'b')
plt.plot(range(len(test_loss)), test_loss, 'r')
plt.xlabel("Number of epochs")
plt.ylabel("Loss")
plt.title("Loss vs Number of epochs")
plt.legend(['train', 'test'])
plt.savefig("./Output/"+task_name+"_loss.png", dpi=600)
plt.show()

plt.figure(dpi=600)
plt.plot(range(len(train_acc)), train_acc, 'b')
plt.plot(range(len(test_acc)), test_acc, 'r')
plt.xlabel("Number of epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Number of epochs")
plt.legend(['train', 'test'])
plt.savefig("./Output/"+task_name+"_acc.png", dpi=600)
plt.show()

If we want to use the torchvision.models.resnet18, we should reformatt the image first, 
which means that [128, 1, 28, 28] -> [64, 3, 7, 7]. We need to do a convolution first.

In [None]:
import torchvision
model = torchvision.models.resnet18() 