## 0. Data downloading

In [6]:
# import json
# # set api key
# api_token = {"username":"dd13969","key":""}
# with open('/root/.kaggle/kaggle.json', 'w') as file:
#     json.dump(api_token, file)

In [7]:
# !kaggle competitions download -c nzmsa-2024

In [8]:
# import os
# import zipfile

# # unzip dataset
# def unzipDataset(data_dir):
#     zip_path = data_dir + '.zip'
#     extract_path = os.getcwd()

#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         zip_ref.extractall(extract_path)
# file_name = 'nzmsa-2024'
# unzipDataset(file_name)

## 1. Data loading & preprocessing

In [8]:
import csv
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split

# data preprocessing and augmentation
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

val_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])


test_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])


# define dataset
class CIFAR10Dataset(Dataset):
    """`CIFAR10 Dataset.

    Args:
        data_list (list[str]): The images files paths of the CIFAR10 Dataset.
        label_path (str): The path of label file.
        transform (callable, optional): A function/transform that takes in a PIL image and returns a transformed version.
    """
    def __init__(self, data_list, label_path, transform=None):
        self.data_list = data_list
        self.label_dict = self._csv2dict(label_path)
        self.transform = transform
        if self.transform is None:
            self.transform = transforms.ToTensor()

    def _csv2dict(self, label_path):
        """Load labels from csv file"""
        label_dict = {}
        with open(label_path, mode='r', encoding='utf-8') as csv_file:
            reader = csv.DictReader(csv_file)
            for row in reader:
                label_dict[f'image_{row["id"]}.png'] = int(row['label']) 
        return label_dict

    def __len__(self):
        return len(self.data_list)
     
    def __getitem__(self, idx):
        img_path = self.data_list[idx]
        img = Image.open(img_path)
        img = img.convert("RGB")
        img_transformed = self.transform(img)
        label = self.label_dict[img_path.split('/')[-1]]
        return img_transformed, label

class TestDataset(CIFAR10Dataset):
    """`CIFAR10 test Dataset.

    Args:
        data_list (list[str]): The images files paths of the CIFAR10 Dataset.
        label_path (str): The path of label file.
        transform (callable, optional): A function/transform that takes in a PIL image and returns a transformed version.
    """
    """`CIFAR10 Dataset.

    Args:
        data_list (list[str]): The images files paths of the CIFAR10 Dataset.
        transform (callable, optional): A function/transform that takes in a PIL image and returns a transformed version.
    """
    def __init__(self, data_list, transform=None):
        self.data_list = data_list
        self.transform = transform
        if self.transform is None:
            self.transform = transforms.ToTensor()

    def __getitem__(self, idx):
        img_path = self.data_list[idx]
        img = Image.open(img_path)
        img = img.convert("RGB")
        img_transformed = self.transform(img)
        id = img_path.split('_')[-1][:-4]
        return img_transformed, id


# load dataset
data_root = 'cifar10_images/train'
test_data_root = 'cifar10_images/test'
label_path = 'train.csv'

data_list = ['/'.join([data_root, i]) for i in os.listdir(data_root)]
test_list = ['/'.join([test_data_root, i]) for i in os.listdir(test_data_root)]
train_list, val_list = train_test_split(data_list, test_size=0.2,random_state=101)
print(f'train dataset size: {len(train_list)}, validation dataset size: {len(val_list)}, test datasetsize: {len(test_list)}')

train_dataset = CIFAR10Dataset(train_list, label_path, train_transforms)
val_dataset = CIFAR10Dataset(val_list, label_path, val_transforms)
test_dataset = TestDataset(test_list, test_transforms)

train dataset size: 40000, validation dataset size: 10000, test datasetsize: 5000


## 2. Define the model

### Resnet
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun.

    Deep Residual Learning for Image Recognition
    https://arxiv.org/abs/1512.03385v1

In [1]:
import torch.nn as nn

class BasicBlock(nn.Module):
    """Basic Block for resnet 18 and resnet 34

    """

    #BasicBlock and BottleNeck block
    #have different output size
    #we use class attribute expansion
    #to distinct
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        #residual function
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )

        #shortcut
        self.shortcut = nn.Sequential()

        #the shortcut output dimension is not the same with residual function
        #use 1*1 convolution to match the dimension
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

class BottleNeck(nn.Module):
    """Residual block for resnet over 50 layers

    """
    expansion = 4
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, stride=stride, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels * BottleNeck.expansion),
        )

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BottleNeck.expansion, stride=stride, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels * BottleNeck.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

class ResNet(nn.Module):

    def __init__(self, block, num_block, num_classes=10):
        super().__init__()

        self.in_channels = 64

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))
        #we use a different inputsize than the original paper
        #so conv2_x's stride is 1
        self.conv2_x = self._make_layer(block, 64, num_block[0], 1)
        self.conv3_x = self._make_layer(block, 128, num_block[1], 2)
        self.conv4_x = self._make_layer(block, 256, num_block[2], 2)
        self.conv5_x = self._make_layer(block, 512, num_block[3], 2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        """make resnet layers(by layer i didnt mean this 'layer' was the
        same as a neuron netowork layer, ex. conv layer), one layer may
        contain more than one residual block

        Args:
            block: block type, basic block or bottle neck block
            out_channels: output depth channel number of this layer
            num_blocks: how many blocks per layer
            stride: the stride of the first block of this layer

        Return:
            return a resnet layer
        """

        # we have num_block blocks per layer, the first block
        # could be 1 or 2, other blocks would always be 1
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        output = self.conv1(x)
        output = self.conv2_x(output)
        output = self.conv3_x(output)
        output = self.conv4_x(output)
        output = self.conv5_x(output)
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)

        return output

def resnet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

def resnet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])

def resnet50():
    return ResNet(BottleNeck, [3, 4, 6, 3])

def resnet101():
    return ResNet(BottleNeck, [3, 4, 23, 3])

def resnet152():
    return ResNet(BottleNeck, [3, 8, 36, 3])

### VIT

In [3]:
import torch
from torch import nn

from einops import rearrange, repeat
from einops.layers.torch import Rearrange

# helpers

def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# classes

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.norm = nn.LayerNorm(dim)

        self.attend = nn.Softmax(dim = -1)
        self.dropout = nn.Dropout(dropout)

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        x = self.norm(x)

        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.attend(dots)
        attn = self.dropout(attn)

        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout = 0.):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout),
                FeedForward(dim, mlp_dim, dropout = dropout)
            ]))

    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x

        return self.norm(x)

class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, pool = 'cls', channels = 3, dim_head = 64, dropout = 0., emb_dropout = 0.):
        super().__init__()
        image_height, image_width = pair(image_size)
        patch_height, patch_width = pair(patch_size)

        assert image_height % patch_height == 0 and image_width % patch_width == 0, 'Image dimensions must be divisible by the patch size.'

        num_patches = (image_height // patch_height) * (image_width // patch_width)
        patch_dim = channels * patch_height * patch_width
        assert pool in {'cls', 'mean'}, 'pool type must be either cls (cls token) or mean (mean pooling)'

        self.to_patch_embedding = nn.Sequential(
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = patch_height, p2 = patch_width),
            nn.LayerNorm(patch_dim),
            nn.Linear(patch_dim, dim),
            nn.LayerNorm(dim),
        )

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)

        self.pool = pool
        self.to_latent = nn.Identity()

        self.mlp_head = nn.Linear(dim, num_classes)

    def forward(self, img):
        x = self.to_patch_embedding(img)
        b, n, _ = x.shape

        cls_tokens = repeat(self.cls_token, '1 1 d -> b 1 d', b = b)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.transformer(x)

        x = x.mean(dim = 1) if self.pool == 'mean' else x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)

## 3. Metrics & Evaluation

In [3]:
from sklearn.metrics import confusion_matrix
import plotly.figure_factory as ff

def getConfusionMatrix(label_list, prediction_list, save_root=None):
        cm = confusion_matrix(label_list, prediction_list, labels=range(10), normalize=None)
        # We use plotly to create plots and charts
        

        # Create the list of unique labels in the test set, to use in our plot
        # I.e., ['animal', 'hiker', 'rock', 'tree']
        x = y = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

        # Plot the matrix above as a heatmap with annotations (values) in its cells
        fig = ff.create_annotated_heatmap(cm, x, y)
        # Set titles and ordering
        fig.update_layout(  title_text="<b>Confusion matrix</b>", 
                            yaxis = dict(categoryorder = "category descending"))
        fig.add_annotation(dict(font=dict(color="black",size=14),
                                x=0.5,
                                y=-0.15,
                                showarrow=False,
                                text="Predicted label",
                                xref="paper",
                                yref="paper"))
        fig.add_annotation(dict(font=dict(color="black",size=14),
                                x=-0.15,
                                y=0.5,
                                showarrow=False,
                                text="Actual label",
                                textangle=-90,
                                xref="paper",
                                yref="paper"))
        # We need margins so the titles fit
        fig.update_layout(margin=dict(t=80, r=20, l=100, b=50))
        fig['data'][0]['showscale'] = True
        fig.show() 


import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, roc_auc_score, recall_score, f1_score
from sklearn.utils.multiclass import type_of_target
import numpy as np
from sklearn.preprocessing import label_binarize


def plot_loss(loss_list):
    plt.figure('PyTorch_CNN_Loss')
    plt.plot(loss_list, label='Loss')
    plt.legend()
    plt.show()


def plot_acc(acc_list):
    plt.figure('PyTorch_CNN_Acc')
    plt.plot(acc_list, label='Acc')
    plt.legend()
    plt.show()


def plot_roc(y_true, y_scores):
    # y_true = y_true.numpy()
    # y_scores = y_scores.numpy()

    y_one_hot = label_binarize(y_true, classes=np.arange(10))

    # print(type(predicted))
    # y_scores = np.amax(y_true, axis=1)
    # print(type_of_target((y_true)))
    # print(type_of_target(y_scores))
    # exit()
    print(len(y_true))
    # print(y_true.shape)  # (157,)
    print(y_scores.shape)  # (157, 10)
    print(y_one_hot.shape)  # (157, 7)
    # exit()

    fpr, tpr, threshold = roc_curve(y_one_hot.ravel(), y_scores.ravel())
    roc_auc = auc(fpr, tpr)
    plt.figure()
    lw = 2
    plt.figure(figsize=(10, 10))
    plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area = %0.2f)' % roc_auc)  # 假正率为横坐标，真正率为纵坐标做曲线
    plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.legend(loc="lower right")
    plt.show()

## 4. Train & Test pipeline

### train pipeline

In [4]:
import os
import time
import torch
# from tqdm.notebook import tqdm
from sklearn.metrics import roc_curve
def train(model,
          train_dataset,
          val_dataset,
          batch_size,
          epoch,
          loss_function,
          optimizer,
          output_root = './outputs',
          save_epoch = 1,
          resume = None,
          start_epoch = 1,
    ):
    device = 'cuda:0'
    best_acc = 0
    if resume:
        model.load_state_dict(torch.load(resume))
        model.to(device)

    current_time = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime())
    output_path = os.path.join(output_root, current_time)
    os.makedirs(output_path, exist_ok=True)
    train_dataloader = DataLoader(dataset = train_dataset, batch_size=batch_size, shuffle=True )
    val_dataloader = DataLoader(dataset = val_dataset, batch_size=batch_size, shuffle=True)
    iter_num =  len(train_dataset) / train_dataloader.batch_size
    f_log = open(os.path.join(output_path, 'loss.log'), 'w', encoding='utf-8')
    f_loss_acc = open(os.path.join(output_path, 'loss.log'), 'w', encoding='utf-8')
    for e in range(start_epoch, epoch+1):
        train_loss = 0
        train_accuracy = 0
        
        for idx, (data, label) in enumerate(train_dataloader):
            data, label = data.to(device), label.to(device)

            outputs = model(data)
            loss = loss_function(outputs, label)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            acc = (outputs.argmax(dim=1) == label).float().mean()
            train_accuracy += acc / len(train_dataloader)
            train_loss += loss / len(train_dataloader)

            if idx % 1000 == 0:
                print(f'Epoch:{e}/{epoch}, iter:{idx}/{iter_num}, loss:{loss.item():.4f}')

            f_loss_acc.write(f'{loss.item():.4f}\n')


        label_list = []
        prediction_list = []
        with torch.no_grad():
            val_accuracy = 0
            val_loss = 0
            for idx, (data, label) in enumerate(val_dataloader):
                data = data.to(device)
                label = label.to(device)

                outputs = model(data)
                loss = loss_function(outputs, label)
                acc = (outputs.argmax(dim=1) == label).float().mean()
                val_accuracy += acc / len(val_dataloader)
                val_loss += loss / len(val_dataloader)

                label_list += label.tolist()
                prediction_list += outputs.argmax(dim=1).tolist()
                if idx == 0:
                    prediction_scores  = outputs.cpu().numpy()
                else:
                    prediction_scores = np.concatenate((prediction_scores , outputs.cpu().numpy()), axis=0)
        # print(outputs)
        # print(outputs.argmax(dim=1))
        # print(prediction_scores.shape)
        # plot_roc(label_list, prediction_scores)
        # getConfusionMatrix(label_list, prediction_list)
        print(f'Epoch:{e}/{epoch}, train_loss:{train_loss:.4f}, train_accuracy:{train_accuracy:.4f}, val_loss:{val_loss:.4f}, val_accuracy:{val_accuracy:.4f}')
        f_log.write(f'Epoch:{e}/{epoch}, train_loss:{train_loss:.4f}, train_accuracy:{train_accuracy:.4f}, val_loss:{val_loss:.4f}, val_accuracy:{val_accuracy:.4f}\n')
        
        # model saving
        #start to save best performance model after learning rate decay to 0.01
        if best_acc < val_accuracy:
            model_name = f'best_epoch_{e}_{val_accuracy:.4f}.pth'
            save_path = os.path.join(output_path, model_name)
            print(f'saving best model to {save_path}')
            torch.save(model.state_dict(), save_path)
            best_acc = val_accuracy
            continue

        if epoch % save_epoch == 0:
            model_name = f'epoch_{e}_{val_accuracy:.4f}.pth'
            save_path = os.path.join(output_path, model_name)
            print(f'saving model to {save_path}')
            torch.save(model.state_dict(), save_path)
    
    
    f_loss_acc.close()


### test pipeline

In [None]:
def test(model,
         checkpoint_path,
         test_dataset,
         batch_size,
         result_path = 'submission.csv',
         device = 'cuda:0',
    ):

    model.load_state_dict(torch.load(checkpoint_path))
    model.to(device)
    test_dataloader = DataLoader(dataset = test_dataset, batch_size=batch_size)
    
    current_time = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime())
    output_path = os.path.join('./submissions', current_time)
    os.makedirs(output_path, exist_ok=True)
    f = open(os.path.join(output_path, result_path), 'w', encoding='utf-8')
    f.write('id,label\n')

    with torch.no_grad():
        for data, ids in test_dataloader:
            data = data.to(device)

            outputs = model(data)
            labels = outputs.argmax(dim=1)
            for i in range(len(ids)):
                f.write(f'{ids[i]},{labels[i]}\n')
    f.close()

## 5. Experiments

In [7]:
from torch.optim.lr_scheduler import StepLR
import torch.optim as optim
# from linformer import Linformer

# loss function
loss_function = nn.CrossEntropyLoss()
# optimizer
lr = 3e-5
optimizer = optim.Adam(model.parameters(), lr=lr)
# scheduler
gamma = 0.7
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

model = resnet50().to('cuda:0')

train(model=model,
      train_dataset=train_dataset,
      val_dataset=val_dataset,
      epoch=100,
      batch_size=8,
      loss_function=loss_function,
      optimizer=optimizer,
      # resume='outputs/2024_07_27_17_59_17/epoch_10_0.6693.pth',
      # start_epoch=10,
)

In [5]:
model = resnet50().to('cuda:0')
test(model=model,
     checkpoint_path='./outputs/2024_07_31_02_53_18/best_epoch_17_0.7836.pth',
     test_dataset=test_dataset,
     batch_size=8
)