In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms

import os
import argparse
import copy

!pip install torchinfo
from torchinfo import summary

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchinfo
  Downloading torchinfo-1.7.1-py3-none-any.whl (22 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.7.1


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
class BasicBlock(nn.Module):
    """
    Basic Block for resnet 18 and resnet 34
    REF: https://github.com/weiaicunzai/pytorch-cifar100/blob/master/models/resnet.py
    https://love2017.asia/2021/08/15/pytorchtrain8/
    https://zhuanlan.zhihu.com/p/54289848
    out_dim = (in_dim + 2p - k)/s + 1
    p: padding size
    s = stride
    k = kernel size
    """

    # BasicBlock and BottleNeck block
    # have different output size
    #we use class attribute expansion to distinct
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        # residual function
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * self.expansion)
        )

        # shortcut
        self.shortcut = nn.Sequential()

        # the shortcut output dimension is not the same with residual function
        # use 1*1 convolution to match the dimension
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

In [4]:
class BottleNeck(nn.Module):
    """
    Residual block for resnet over 50 layers
    BottleNeck apply 1x1 kernel, greatly reduce the calculation amount
    """
    expansion = 4
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False), # 1x1 kernel，reduce dimension
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, stride=stride, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False), # raise dim
            nn.BatchNorm2d(out_channels * self.expansion),
        )

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, stride=stride, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

In [5]:
class ResNet(nn.Module):

    def __init__(self, block, num_block, num_classes=100):
        super().__init__()

        self.in_channels = 64
        # set the first conv bias = FALSE
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        # we use a different inputsize than the original paper
        # so conv2_x's stride is 1
        self.conv2_x = self._make_layer(block, 32, num_block[0], 1)
        # use stride = 2 to down-sample
        self.conv3_x = self._make_layer(block, 64, num_block[1], 2)
        self.conv4_x = self._make_layer(block, 128, num_block[2], 2)
        self.conv5_x = self._make_layer(block, 256, num_block[3], 2)

        # mod
        # self.dpout = nn.Dropout(0.5)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        """
        make resnet layers(by layer i didnt mean this 'layer' was the
        same as a neuron netowork layer, ex. conv layer), one layer may
        contain more than one residual block
        Args:
            block: block type, basic block or bottle neck block
            out_channels: output depth channel number of this layer
            num_blocks: how many blocks per layer
            stride: the stride of the first block of this layer
        Return:
            return a resnet layer
        """

        # we have num_block blocks per layer, the first block
        # could be 1 or 2, other blocks would always be 1
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        output = self.conv1(x)
        output = self.conv2_x(output)
        output = self.conv3_x(output)
        output = self.conv4_x(output)
        output = self.conv5_x(output)

        # mod
        # output = self.dpout(output)

        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)

        return output

In [22]:
import torch.nn.functional as F

class ST_ResNet(nn.Module):
    loc_size = 4 * 4 * 10

    def __init__(self, block, num_block, num_classes=100):
        super().__init__()


        # Spatial transformer localization-network
        self.localization = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=7),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        # Regressor for the 3 * 2 affine matrix
        self.fc_loc = nn.Sequential(
            nn.Linear(self.loc_size, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )

        # Initialize the weights/bias with identity transformation
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))
        self.ST_Norm = nn.BatchNorm2d(3)


        self.in_channels = 64
        # set the first conv bias = FALSE
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        # we use a different inputsize than the original paper
        # so conv2_x's stride is 1
        self.conv2_x = self._make_layer(block, 32, num_block[0], 1)
        # use stride = 2 to down-sample
        self.conv3_x = self._make_layer(block, 64, num_block[1], 2)
        self.dpout1 = nn.Dropout(0.3)
        self.conv4_x = self._make_layer(block, 128, num_block[2], 2)
        self.dpout2 = nn.Dropout(0.5)
        self.conv5_x = self._make_layer(block, 256, num_block[3], 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        """
        make resnet layers(by layer i didnt mean this 'layer' was the
        same as a neuron netowork layer, ex. conv layer), one layer may
        contain more than one residual block
        Args:
            block: block type, basic block or bottle neck block
            out_channels: output depth channel number of this layer
            num_blocks: how many blocks per layer
            stride: the stride of the first block of this layer
        Return:
            return a resnet layer
        """

        # we have num_block blocks per layer, the first block
        # could be 1 or 2, other blocks would always be 1
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    
    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, self.loc_size)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)

        return x


    def forward(self, x):
        # transform the input
        #x = self.stn(x)
        #x = self.ST_Norm(x)

        output = self.conv1(x)
        output = self.conv2_x(output)
        output = self.conv3_x(output)
        output = self.dpout1(output)
        output = self.conv4_x(output)
        output = self.dpout2(output)
        output = self.conv5_x(output)
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)

        return output

In [23]:
def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])
def ResNet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])
def ResNet50():
    return ResNet(BottleNeck, [3, 4, 6, 3])
def ResNet_Adjust():
    return ResNet(BottleNeck, [1, 2, 3, 3])
def ST_ResNetv1():
    return ST_ResNet(BottleNeck, [1, 2, 3, 3])

In [24]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
best_acc = 0  # best test accuracy

In [25]:
print('==> Preparing data, applying data augmentation strategy')
ROOT = '.data'
# need to calculate train mean and std first
train_data = torchvision.datasets.CIFAR10(root = ROOT, 
                              train = True, 
                              download = True)
means = train_data.data.mean(axis = (0,1,2)) / 255
stds = train_data.data.std(axis = (0,1,2)) / 255

train_transforms = transforms.Compose([
                           transforms.RandomRotation(5),
                           transforms.RandomHorizontalFlip(0.5),
                           transforms.RandomCrop(32, padding = 2),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = means, 
                                                std = stds)
                       ])
test_transforms = transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize(mean = means, 
                                                std = stds)
                       ])

train_data = torchvision.datasets.CIFAR10(ROOT, 
                              train = True, 
                              download = True, 
                              transform = train_transforms)

test_data = torchvision.datasets.CIFAR10(ROOT, 
                             train = False, 
                             download = True, 
                             transform = test_transforms)

# split out validation data
VALID_RATIO = 0.85

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = torch.utils.data.random_split(train_data, 
                                           [n_train_examples, n_valid_examples])

valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms


==> Preparing data, applying data augmentation strategy
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [26]:
BATCH_SIZE = 128
trainloader = torch.utils.data.DataLoader(
        train_data, 
        shuffle=True,
        pin_memory=True,
        num_workers=2,
        drop_last=True,
        batch_size=BATCH_SIZE)

validloader = torch.utils.data.DataLoader(
        valid_data, 
        shuffle=True,
        pin_memory=True,
        num_workers=2,
        drop_last=True,
        batch_size=BATCH_SIZE)


testloader = torch.utils.data.DataLoader(
        test_data,
        shuffle=True,
        pin_memory=True,
        num_workers=2,
        drop_last=True,
        batch_size=BATCH_SIZE)

In [27]:
classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

In [28]:
# Model
print('==> Building model..')
#net = ResNet_Adjust()
net = ST_ResNetv1()
net = net.to(device)
if device == 'cuda':
    net = torch.nn.DataParallel(net)
    cudnn.benchmark = True

==> Building model..


In [29]:
print(summary(net))

Layer (type:depth-idx)                        Param #
DataParallel                                  --
├─ST_ResNet: 1-1                              --
│    └─Sequential: 2-1                        --
│    │    └─Conv2d: 3-1                       1,184
│    │    └─MaxPool2d: 3-2                    --
│    │    └─ReLU: 3-3                         --
│    │    └─Conv2d: 3-4                       2,010
│    │    └─MaxPool2d: 3-5                    --
│    │    └─ReLU: 3-6                         --
│    └─Sequential: 2-2                        --
│    │    └─Linear: 3-7                       5,152
│    │    └─ReLU: 3-8                         --
│    │    └─Linear: 3-9                       198
│    └─BatchNorm2d: 2-3                       6
│    └─Sequential: 2-4                        --
│    │    └─Conv2d: 3-10                      1,728
│    │    └─BatchNorm2d: 3-11                 128
│    │    └─ReLU: 3-12                        --
│    └─Sequential: 2-5                        --
│ 

In [30]:
class Optimization:
  def __init__(self, model, loss_fn, optimizer, patience, EPOCH):
    self.model = model
    self.loss_fn = loss_fn
    self.optimizer = optimizer
    self.patience = patience
    self.epochs = EPOCH
    self.train_losses = []
    self.val_losses = []
    self.train_accuracy = []
    self.val_accuracy = []


  def train(self, train_loader, val_loader):
    # apply early stopping
    trigger_time = 0

    for epoch in range(1, self.epochs + 1):
      self.model.train()
      total = 0
      correct = 0
      train_loss = 0
      for batch_idx, (inputs, targets) in enumerate(trainloader):
          inputs, targets = inputs.to(device), targets.to(device)
          optimizer.zero_grad()
          outputs = net(inputs)
          loss = criterion(outputs, targets)
          loss.backward()
          optimizer.step()

          # # L1 regularization
          # l1_lambda = 0.001
          # l1_norm = sum(torch.linalg.norm(p, 1) for p in self.model.parameters())
          # l1 = l1_lambda * l1_norm
          # train_loss += (loss.item() + l1)

          train_loss += loss.item()
          _, predicted = outputs.max(1)
          total += targets.size(0)
          correct += predicted.eq(targets).sum().item()
      acc = 100.*correct/total
      epoch_train_loss = train_loss/total
      self.train_losses.append(epoch_train_loss)
      self.train_accuracy.append(acc)

      # validation
      self.model.eval()
      validation_loss = 0
      valid_correct = 0
      valid_total = 0
      with torch.no_grad():
          for batch_idx, (inputs, targets) in enumerate(validloader):
              inputs, targets = inputs.to(device), targets.to(device)
              outputs = net(inputs)
              loss = criterion(outputs, targets)

              # # L1 regularization
              # l1_lambda = 0.001
              # l1_norm = sum(torch.linalg.norm(p, 1) for p in self.model.parameters())
              # l1 = l1_lambda * l1_norm
              # validation_loss += (loss.item() + l1)

              validation_loss += loss.item()
              _, predicted = outputs.max(1)
              valid_total += targets.size(0)
              valid_correct += predicted.eq(targets).sum().item()

      val_acc = 100.*valid_correct/valid_total
      epoch_validation_loss = validation_loss/valid_total
    
      # early_stop
      if (epoch > 1 and epoch_validation_loss > self.val_losses[-1]):
            trigger_times += 1
            if (trigger_times >= self.patience):
              print('Early stopping!')
              break
      else:
        trigger_times = 0
      self.val_losses.append(epoch_validation_loss)
      self.val_accuracy.append(val_acc)


      print(f"[{epoch}/{self.epochs}] Training Accuracy: {acc:.2f}%\t Validation Accuracy: {val_acc:.2f}%")
      print(f"[{epoch}/{self.epochs}] Training loss: {epoch_train_loss:.8f}\t Validation loss: {epoch_validation_loss:.8f}")
      print('\n')
      #torch.save(self.model.state_dict(), model_path)
    
  def test(self, testloader):
    global best_acc
    self.model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    # Save checkpoint.
    acc = 100.*correct/total

    print(f'test accuracy = {acc}')
    if acc > 80.:
        print('Saving..')
        checkpoint_path = f'/content/drive/MyDrive/DL_Models/ResNet_saved.pkl'
        torch.save(self.model.state_dict(), checkpoint_path)
        best_acc = acc
        


In [31]:
para = {
    'learning_rate':5e-3,
    'weight_decay':2e-4,
    'max_epochs': 100
}

In [32]:
criterion = nn.CrossEntropyLoss()
optimizer_candidate = {
    'SGD':optim.SGD(net.parameters(), lr=para['learning_rate'],
                      momentum=0.9, weight_decay=2e-4),
    'AdamW':optim.AdamW(net.parameters(), lr=para['learning_rate'],weight_decay = para['weight_decay'])
}
optimizer = optimizer_candidate['AdamW']
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

from torch.optim.lr_scheduler import ReduceLROnPlateau
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)

In [33]:
opt = Optimization(model=net, loss_fn=criterion, optimizer=scheduler, patience = 10, EPOCH = para['max_epochs'] )

In [34]:
opt.train(trainloader, 
          validloader)


[1/100] Training Accuracy: 34.65%	 Validation Accuracy: 44.41%
[1/100] Training loss: 0.01380401	 Validation loss: 0.01172747


[2/100] Training Accuracy: 52.96%	 Validation Accuracy: 53.49%
[2/100] Training loss: 0.01007632	 Validation loss: 0.01073987


[3/100] Training Accuracy: 60.57%	 Validation Accuracy: 63.74%
[3/100] Training loss: 0.00860319	 Validation loss: 0.00784090


[4/100] Training Accuracy: 64.75%	 Validation Accuracy: 69.02%
[4/100] Training loss: 0.00768566	 Validation loss: 0.00676601


[5/100] Training Accuracy: 67.99%	 Validation Accuracy: 70.65%
[5/100] Training loss: 0.00703284	 Validation loss: 0.00650714


[6/100] Training Accuracy: 70.58%	 Validation Accuracy: 70.42%
[6/100] Training loss: 0.00646656	 Validation loss: 0.00653656


[7/100] Training Accuracy: 73.28%	 Validation Accuracy: 76.10%
[7/100] Training loss: 0.00588228	 Validation loss: 0.00533581


[8/100] Training Accuracy: 75.66%	 Validation Accuracy: 76.70%
[8/100] Training loss: 0.00542585	 Valida

In [35]:
opt.test(testloader)

test accuracy = 90.00400641025641
Saving..
