# **NHẬP MÔN HỌC MÁY**
## THÔNG TIN NHÓM:
- 20120547: VÕ THÀNH PHONG
- 20120578: PHẠM QUỐC THÁI
- 20120125: BÙI ANH KIỆT
- 20120109: TRƯƠNG NGỌC HUY
- 20120166: NGUYỄN DƯƠNG TUẤN PHƯƠNG

# LIÊN HỆ TÁC GIẢ COLAB:
- Email: 20120547@student.hcmus.edu.vn

# **TRIỂN KHAI KIẾN TRÚC MẠNG RESNET CHO TÁC VỤ NHẬN DẠNG ẢNH TRÊN BỘ DỮ LIỆU CIFAR-10**

**LƯU Ý QUAN TRỌNG: Các thư mục cũng như file được tạo ra trong quá trình chạy file colab này đều là local, do đó muốn tải các thư mục, file về máy hãy chọn icon folder bên thanh menu bên trái và tải về trước khi tắt file colab đi.**


In [None]:
#Import needed libaries

#libaries for building model
import torch
import torch.nn as nn
import torch.nn.functional as F

#libaries for training model
import torch
import ssl
import os
import argparse
import torchvision.transforms as T
import numpy as np
from torchvision.datasets.cifar import CIFAR10
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime
from tqdm import tqdm

#libaries for ploting results
import os
import numpy as np
from matplotlib import pyplot as plt

# 1. BUILDING MODEL

Xây dựng PLainNet và ResNet với kiến trúc tổng quát như sau:

!['image'](https://res.cloudinary.com/vtphong/image/upload/v1683601236/Intro2ML/Screenshot_2023-05-09_095922.png)

In [None]:
def init_weights(m):
    #Function to initial model'weights
    def f1(module):
        if isinstance(module, nn.Conv2d):
            torch.nn.init.kaiming_normal_(module.weight.data, nonlinearity='relu')

    def f2(module):
        if isinstance(module, DoubleConvBlock):
            ds = module.conv_downsample
            if ds:
                ds.weight.data.fill_(1 / module.in_channels)
                ds.bias.data.fill_(0)

    m.apply(f1)         
    m.apply(f2)

In [None]:
class Footer(nn.Module):
    #This is the class that represents the last layers in the structure
    #including average pooling layer and fully connected layer
    def __init__(self, in_channels, in_size, out_labels):
        super().__init__()
        self.in_channels = in_channels
        self.in_size = in_size
        self.model = nn.Sequential(
            nn.AvgPool2d(in_size),
            nn.Flatten(start_dim=1),
            nn.Linear(in_channels, out_labels)
        )

    def forward(self, x):
        if x.shape[2] != self.in_size or x.shape[3] != self.in_size:
            raise ValueError(f'Expected input shape (*, {self.in_channels}, {self.in_size}, {self.in_size}), '
                             f'got {tuple(x.shape)}')
        return self.model.forward(x)

Trong bài báo, khi tác giả thực hiện đánh giá trên bộ Cifar-10 thì trong khối 6n convolutional layers sẽ chia nhỏ theo từng nhóm 2n layers với số lượng bộ lọc khác nhau.

Do đó ta sẽ xây dựng một class DoubleConvBlock để thiết kế khối nhỏ gồm 2 lớp tích chập để tiện cho việc xây dựng 2n layers khác nhau chỉ ở số bộ lọc được sử dụng

In [None]:
class DoubleConvBlock(nn.Module):
    def __init__(self, in_channels, in_size, shortcut=True, down_sample=False, option=None):
        super().__init__()

        """
        Padding is calculated as follows:
            (IN_DIM - F + 2P) / S + 1 = OUT_DIM
        F = filter size
        P = padding
        S = stride
        """

        assert option in {None, 'A', 'B'}, f"'{option}' is an invalid option"
        self.in_channels = in_channels
        self.in_size = in_size
        self.down_sample = down_sample
        self.shortcut = shortcut
        self.option = option
        self.conv_downsample = None
        if self.down_sample:
            if shortcut:
                assert option is not None, 'Must specify either option A or B when ' \
                                           'downsampling with a residual shortcut'
            out_channels = in_channels * 2
            self.model = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 3, stride=2, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                nn.Conv2d(out_channels, out_channels, 3, padding=1),
                nn.BatchNorm2d(out_channels)
            )
            self.conv_downsample = nn.Conv2d(in_channels, out_channels, 1, stride=2)
        else:
            self.model = nn.Sequential(
                nn.Conv2d(in_channels, in_channels, 3, padding=1),
                nn.BatchNorm2d(in_channels),
                nn.ReLU(),
                nn.Conv2d(in_channels, in_channels, 3, padding=1),
                nn.BatchNorm2d(in_channels)
            )
    def forward(self, x):
        if not self.shortcut:               # No residual
            result = self.model.forward(x)
        elif not self.down_sample:          # Simple residual
            result = self.model.forward(x) + x
        elif self.option == 'A':            # Zero padding
            y = self.model.forward(x)
            x = F.max_pool2d(x, 1, 2)
            padded = torch.cat((x, torch.zeros_like(x)), dim=1)
            result = y + padded
        else:                               # Linear projection
            result = self.model.forward(x) + self.conv_downsample.forward(x)
        return F.relu(result)

**Xây dựng class CifarResNet**

In [None]:
def common_str(obj):
    strings = [
        obj.__class__.__name__,
        str(obj.locals['n']),
        'R' if obj.locals['residual'] else 'P'
    ]
    option = obj.locals['option']
    if option is not None:
        strings.append(option)
    return '-'.join(strings)


class CifarResNet(nn.Module):
    def __init__(self, n, residual=True, option=None):
        self.locals = locals()
        super().__init__()

        num_layers = {20, 32, 44, 56, 110}
        assert n in num_layers, f'N must be in {list(sorted(num_layers))}'
        k = (n - 2) // 6

        modules = [nn.Conv2d(3, 16, 3, padding=1)]
        modules += [DoubleConvBlock(16, 32, shortcut=residual) for _ in range(k)]

        modules.append(DoubleConvBlock(16, 32, shortcut=residual, down_sample=True, option=option))
        modules += [DoubleConvBlock(32, 16, shortcut=residual) for _ in range(k - 1)]

        modules.append(DoubleConvBlock(32, 16, shortcut=residual, down_sample=True, option=option))
        modules += [DoubleConvBlock(64, 8, shortcut=residual) for _ in range(k - 1)]

        modules.append(Footer(64, 8, 10))
        self.model = nn.Sequential(*modules)
        init_weights(self)

    def forward(self, x):
        return self.model.forward(x)

    @staticmethod
    def transform(x):
        return x - torch.mean(x, (1, 2), keepdim=True)

    def __str__(self):
        return common_str(self)

# 2. TRAINING MODEL

In [None]:
def create_component(large, n, residual, option):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    writer = SummaryWriter()
    now = datetime.now()

    model = CifarResNet(n, residual, option).to(device)
    loss_function = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(
        model.parameters(),
        lr=0.01 if large else 0.1,
        weight_decay=0.0001,
        momentum=0.9
    )
    scheduler = torch.optim.lr_scheduler.MultiStepLR(
        optimizer,
        milestones=(32_000, 48_000),
        gamma=0.1                       
    )
    return device, writer, now, model, loss_function, optimizer, scheduler

In [None]:
def create_data_loader(train_set, test_set, batch_size):
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=batch_size)
    return train_loader, test_loader

In [None]:
def train(n, residual, option):
    #Create components
    large = n >= 56
    device, writer, now, model, loss_function, optimizer, scheduler=create_component(large, n, residual, option)
    ssl._create_default_https_context = ssl._create_unverified_context      # Patch expired certificate error
    
    #Create dataset
    train_set = CIFAR10(
        root='data', train=True, download=True,
        transform=T.Compose([
            T.ToTensor(),
            model.transform,
            T.RandomCrop(32, padding=4)         # Pad each side by 4 pixels and randomly sample 32x32 image
        ])
    )
    test_set = CIFAR10(
        root='data', train=False,
        transform=T.Compose([
            T.ToTensor(),
            model.transform
        ])
    )
    train_loader, test_loader=create_data_loader(train_set, test_set, 128)

    #Save traning results
    # Create folders
    root = os.path.join(
        'models',
        str(model),
    )
    weight_dir = os.path.join(root, 'weights')
    if not os.path.isdir(weight_dir):
        os.makedirs(weight_dir)
        
    train_losses = np.empty((2, 0))
    test_losses = np.empty((2, 0))
    train_errors = np.empty((2, 0))
    test_errors = np.empty((2, 0))

    def save_metrics():
        np.save(os.path.join(root, 'train_losses'), train_losses)
        np.save(os.path.join(root, 'test_losses'), test_losses)
        np.save(os.path.join(root, 'train_errors'), train_errors)
        np.save(os.path.join(root, 'test_errors'), test_errors)

    #Training model
    for epoch in range(1,161):
        if large and epoch == 1:        # Set learning rate back to 0.1 after warming up training
            for g in optimizer.param_groups:
                g['lr'] = 0.1

        train_loss = 0
        accuracy = 0
        for data, labels in train_loader:
            data = data.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            predictions = model.forward(data)
            loss = loss_function(predictions, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()        # Step scheduler every iteration, not epoch

            train_loss += loss.item() / len(train_loader)
            accuracy += labels.eq(torch.argmax(predictions, 1)).sum().item() / len(train_set)
            del data, labels
        train_losses = np.append(train_losses, [[epoch], [train_loss]], axis=1)
        train_errors = np.append(train_errors, [[epoch], [1 - accuracy]], axis=1)
        writer.add_scalar('Loss/train', train_loss, epoch)
        writer.add_scalar('Error/train', 1 - accuracy, epoch)

        if epoch % 4 == 0:
            with torch.no_grad():
                test_loss = 0
                accuracy = 0
                for data, labels in test_loader:
                    data = data.to(device)
                    labels = labels.to(device)

                    predictions = model.forward(data)
                    loss = loss_function(predictions, labels)

                    test_loss += loss.item() / len(test_loader)
                    accuracy += labels.eq(torch.argmax(predictions, 1)).sum().item() / len(test_set)
                    del data, labels
            test_losses = np.append(test_losses, [[epoch], [test_loss]], axis=1)
            test_errors = np.append(test_errors, [[epoch], [1 - accuracy]], axis=1)
            writer.add_scalar('Loss/test', test_loss, epoch)
            writer.add_scalar('Error/test', 1 - accuracy, epoch)

            save_metrics()
            if epoch % 20 == 0:
                torch.save(model.state_dict(), os.path.join(weight_dir, f'cp_{epoch}'))
        if epoch%40==0:
            print('Epoch '+str(epoch)+' complete')

    save_metrics()
    torch.save(model.state_dict(), os.path.join(weight_dir, 'final'))

In [None]:
ns=[20, 32, 44, 56, 110]
residuals=[False, True]
option='A'

In [None]:
for residual in residuals:
    if residual:
        for n in ns:
            train(n,residual,option)
    else:
        for n in ns:
            train(n, residual, None)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:03<00:00, 43391160.92it/s]


Extracting data/cifar-10-python.tar.gz to data
Epoch 40 complete
Epoch 80 complete
Epoch 120 complete
Files already downloaded and verified
Epoch 40 complete
Epoch 80 complete
Epoch 120 complete
Files already downloaded and verified
Epoch 40 complete
Epoch 80 complete
Epoch 120 complete
Files already downloaded and verified
Epoch 40 complete
Epoch 80 complete
Epoch 120 complete
Files already downloaded and verified


# 3. PLOTING RESULTS

In [None]:
ROOT = 'results'
DPI = 300


def save(fig, path):
    fig.savefig(os.path.join(ROOT, path), dpi=DPI)


def plot(graph, info):
    for path, label, color in info:
        test_errors = np.load(os.path.join(path, 'test_errors.npy'))
        train_errors = np.load(os.path.join(path, 'train_errors.npy'))
        graph.plot(test_errors[0], test_errors[1] * 100, label=label, color=color)
        graph.plot(train_errors[0], train_errors[1] * 100, color=color, alpha=0.2)
    graph.legend()


def format_plot(graph):
    graph.set_ylabel('Error (%)')
    graph.set_ylim(0, 30)
    graph.set_xlabel('Epoch')
    graph.set_xlim(0, 160)
    graph.spines['top'].set_visible(False)
    graph.spines['right'].set_visible(False)

In [None]:
def plain_res_error_plotting():
    fig, axs = plt.subplots(1, 2, figsize=(12, 5))
    for p in axs:
        format_plot(p)
    plain, residual = axs

    sizes = (20, 32, 44, 56)
    colors = ('orange', 'blue', 'red', 'green','black')
    plain_paths = (
        'models/CifarResNet-20-P',
        'models/CifarResNet-32-P',
        'models/CifarResNet-44-P',
        'models/CifarResNet-56-P',
        'models/CifarResNet-110-P'
    )
    plot(plain, zip(plain_paths, [f'Plain-{x}' for x in sizes], colors))

    residual_paths = (
        'models/CifarResNet-20-R-A',
        'models/CifarResNet-32-R-A',
        'models/CifarResNet-44-R-A',
        'models/CifarResNet-56-R-A',
        'models/CifarResNet-110-R-A'
    )
    plot(residual, zip(residual_paths, [f'Residual-{x}' for x in sizes], colors))

    fig.tight_layout()
    save(fig, 'plain_res_error')
    plt.show()


if __name__ == '__main__':
    plain_res_error_plotting()