In [None]:
import tarfile
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder

In [None]:
cinic_tar_path = './CINIC-10.tar.gz'

In [None]:
dataset_tar = tarfile.open(cinic_tar_path)
dataset_tar.extractall('./cinic10root')
dataset_tar.close()

In [None]:
cinic_root_dir = './cinic10root'
cinic_mean = [0.47889522, 0.47227842, 0.43047404]
cinic_std = [0.24205776, 0.23828046, 0.25874835]

In [None]:
tf = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean=cinic_mean,std=cinic_std)])

In [None]:
train_data = ImageFolder(cinic_root_dir + '/train', transform=tf)
valid_data = ImageFolder(cinic_root_dir + '/valid', transform=tf)
test_data = ImageFolder(cinic_root_dir + '/test', transform=tf)

In [None]:
train_loader = DataLoader(train_data, batch_size=256, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=256, shuffle=True)
test_loader = DataLoader(test_data, batch_size=256, shuffle=True)

In [None]:
from random import sample

examples = sample(test_data.imgs, 25)

fig = plt.figure(figsize=(10, 8))
n_rows = 5
n_cols = 5
for index in range(1, n_rows * n_cols + 1):
  plt.subplot(n_rows, n_cols, index)
  plt.axis('off')
  plt.imshow(test_data.loader(examples[index - 1][0]))
  title = f'{test_data.classes[examples[index - 1][1]]}'
  plt.title(title, fontsize=8)
plt.suptitle('CINIC-10 dataset')

In [None]:
def train_step(model, train_loader, loss_fn, optimizer, device):
  model.train()
  running_loss = 0
  num_correct_pred = 0

  for X, y_true in train_loader:
    optimizer.zero_grad()
    X = X.to(device)
    y_true = y_true.to(device)

    y_hat = model(X)

    loss = loss_fn(y_hat, y_true)

    pred_labels = torch.argmax(y_hat, 1)
    num_correct_pred += (pred_labels == y_true).sum()

    running_loss += loss.item() * X.size(0)

    loss.backward()
    optimizer.step()

  epoch_loss = running_loss / len(train_loader.dataset)
  epoch_accuracy = num_correct_pred / len(train_loader.dataset)
  return model, optimizer, epoch_loss, epoch_accuracy

In [None]:
def valid_step(model, valid_loader, loss_fn, device):
  model.eval()
  running_loss = 0
  num_correct_pred = 0

  for X, y_true in valid_loader:
    X = X.to(device)
    y_true = y_true.to(device)

    y_hat = model(X)

    loss = loss_fn(y_hat, y_true)

    pred_labels = torch.argmax(y_hat, 1)
    num_correct_pred += (pred_labels == y_true).sum()

    running_loss += loss.item() * X.size(0)
  
  epoch_loss = running_loss / len(valid_loader.dataset)
  epoch_accuracy = num_correct_pred / len(valid_loader.dataset)
  return model, epoch_loss, epoch_accuracy

In [None]:
def training_loop(model, train_loader, valid_loader, loss_fn, optimizer, n_epochs, scheduler, device):

  train_losses = []
  valid_losses = []

  best_accuracy = 0

  for epoch in tqdm(range(n_epochs), desc='Epoch'):
    model, optimizer, train_loss, train_accuracy = train_step(model, train_loader, loss_fn, optimizer, device)
    train_losses.append(train_loss)

    with torch.no_grad():
      model, valid_loss, valid_accuracy = valid_step(model, test_loader, loss_fn, device)
      valid_losses.append(valid_loss)

    print(f'Train loss:{train_loss:.4f} Validation loss:{valid_loss:.4f} Train accuracy:{train_accuracy * 100:.2f}% Validation accuracy:{valid_accuracy * 100:.2f}%')

    if valid_accuracy > best_accuracy:
      best_accuracy = valid_accuracy
      torch.save({
                  'epoch': epoch,
                  'model_state_dict': model.state_dict(),
                  'optimizer_state_dict': optimizer.state_dict(),
                  'loss': valid_loss,
                 }, './checkpt.tar')
    
    scheduler.step()

  return model, optimizer, train_losses, valid_losses

In [None]:
def test(model, test_loader, loss_fn, device):
  checkpoint = torch.load('./checkpt.tar')
  model.load_state_dict(checkpoint['model_state_dict'])
  model.eval()
  running_loss = 0
  num_correct_pred = 0

  for X, y_true in test_loader:
    X = X.to(device)
    y_true = y_true.to(device)

    y_hat = model(X)

    loss = loss_fn(y_hat, y_true)

    pred_labels = torch.argmax(y_hat, 1)
    num_correct_pred += (pred_labels == y_true).sum()

    running_loss += loss.item() * X.size(0)
  
  test_loss = running_loss / len(test_loader.dataset)
  test_accuracy = num_correct_pred / len(test_loader.dataset)
  return model, test_loss, test_accuracy

In [None]:
'''
  ResNet18 from a popular repository
  url:https://github.com/kuangliu/pytorch-cifar/blob/master/models/resnet.py
  Modification:
  * a dropout layer for randomization
'''

class BasicBlock(nn.Module):
  expansion = 1

  def __init__(self, in_planes, planes, stride=1):
    super(BasicBlock, self).__init__()
    self.conv1 = nn.Conv2d(
        in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(planes)
    self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                            stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(planes)

    self.shortcut = nn.Sequential()
    if stride != 1 or in_planes != self.expansion*planes:
        self.shortcut = nn.Sequential(
            nn.Conv2d(in_planes, self.expansion*planes,
                      kernel_size=1, stride=stride, bias=False),
            nn.BatchNorm2d(self.expansion*planes)
        )

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.bn2(self.conv2(out))
    out += self.shortcut(x)
    out = F.relu(out)
    return out

'''
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
'''

class ResNet(nn.Module):
  def __init__(self, block, num_blocks, num_classes=10):
    super(ResNet, self).__init__()
    self.in_planes = 64

    self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                           stride=1, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(64)
    self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
    self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
    self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
    self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
    self.linear = nn.Linear(512*block.expansion, num_classes)

  def _make_layer(self, block, planes, num_blocks, stride):
    strides = [stride] + [1]*(num_blocks-1)
    layers = []
    for stride in strides:
        layers.append(block(self.in_planes, planes, stride))
        self.in_planes = planes * block.expansion
    return nn.Sequential(*layers)

  def forward(self, x):
    out = F.relu(self.bn1(self.conv1(x)))
    out = self.layer1(out)
    out = self.layer2(out)
    out = self.layer3(out)
    out = self.layer4(out)
    out = F.avg_pool2d(out, 4)
    out = out.view(out.size(0), -1)
    out = self.linear(out)
    return out


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
model = ResNet18().to(device)

In [None]:
model

In [None]:
n_epochs = 300
initial_lr = 0.1

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=initial_lr,
                      momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=300)

In [None]:
model, train_losses, valid_losses = training_loop(model, train_loader, valid_loader, loss_fn, optimizer, n_epochs, scheduler, device)

In [None]:
torch.save(model.state_dict(), 'resnet18_cinic.pth')

In [None]:
model, test_loss, test_accuracy = test(model, test_loader, loss_fn, device)
print(f'Test loss:{test_loss:.4f} Test accuracy:{test_accuracy * 100:.2f}%')

In [None]:
torch.save(model.state_dict(), 'resnet18_cinic_fin.pth')