# Exercise 02: Multi-class Classification 
In this exercise, you will train a ResNet18 model on the CIFAR10-LT from the scratch using PyTorch.

### Basic Imports

In [1]:
import os
import time
import os.path as osp

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

from torchvision import datasets
from torchvision import transforms
import torchvision

import matplotlib.pyplot as plt
from PIL import Image

from imbalance_data.cifar10Imbanlance import Cifar10Imbanlance # dataloader
from models.resnet import ResNet18 # model

### Hyperparameters

In [3]:
# random seed
SEED = 1 

# Dataset
IMBALANCE_RATIO = 0.1
DATASET_DIR = "/shareddata/"
# DATASET_DIR = "./data"
NUM_CLASS = 10

# Training
BATCH_SIZE = 128
NUM_EPOCHS = 40
EVAL_INTERVAL=1
SAVE_DIR = './log'

# Optimizer
LEARNING_RATE = 1e-1
MOMENTUM = 0.9
STEP=5
GAMMA=0.5


### Device

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


### Dataset


In [5]:
imbanlance_rate = IMBALANCE_RATIO
root = DATASET_DIR

# cifar10 transform
transform_cifar10_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_cifar10_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])


trainset = Cifar10Imbanlance(imbanlance_rate, transform=transform_cifar10_train, train=True, file_path=root)
testset = Cifar10Imbanlance(imbanlance_rate, transform=transform_cifar10_test, train=False, file_path=root)


train_dataloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)


class_counts = {}
for train_data, train_label in train_dataloader:

    labels_list = train_label.tolist()
    counts = torch.bincount(torch.tensor(labels_list))
    
    for label, count in enumerate(counts):
        if label in class_counts:
            class_counts[label] += count
        else:
            class_counts[label] = count


class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
for i, count in enumerate(class_counts):
    print(f"{class_names[i]}: {int(count)} samples")

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /shareddata/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:15<00:00, 10909650.03it/s]


Extracting /shareddata/cifar-10-python.tar.gz to /shareddata/
Files already downloaded and verified
airplane: 0 samples
automobile: 1 samples
bird: 2 samples
cat: 3 samples
deer: 4 samples
dog: 5 samples
frog: 6 samples
horse: 7 samples
ship: 8 samples
truck: 9 samples


### Model

In [6]:
model = ResNet18(num_classes=NUM_CLASS)
model = model.cuda()

### Loss Function

In [7]:
criterion = nn.CrossEntropyLoss()

### Optimizer

In [8]:
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=STEP, gamma=GAMMA)

### Task 1: per batch training/testing
---

Please denfine two function named ``train_batch`` and ``test_batch``. These functions are essential for training and evaluating machine learning models using batched data from dataloaders.

**To do**: 
1. Take the image as the input and generate the output using the pre-defined ResNet18.
2. Calculate the loss between the output and the corresponding label using *nn.CrossEntropyLoss()*.

In [9]:
def train_batch(model, image, target):
    """
    Perform one training batch iteration.

    Args:
        model (torch.nn.Module): The machine learning model to train.
        image (torch.Tensor): Batch of input data (images).
        target (torch.Tensor): Batch of target labels.

    Returns:
        torch.Tensor: Model output (predictions) for the batch.
        torch.Tensor: Loss value calculated for the batch.
    """
    
    ##################### Write your answer here ##################
    
    
    ###############################################################

    return output, loss


def test_batch(model, image, target):
    """
    Perform one testing batch iteration.

    Args:
        model (torch.nn.Module): The machine learning model to evaluate.
        image (torch.Tensor): Batch of input data (images).
        target (torch.Tensor): Batch of target labels.

    Returns:
        torch.Tensor: Model output (predictions) for the batch.
        torch.Tensor: Loss value calculated for the batch.
    """

    ##################### Write your answer here ##################
    
    
    ###############################################################

    return output, loss

### Model Training

In [10]:
training_loss = []
training_acc = []
testing_loss = []
testing_acc = []

for epoch in range(NUM_EPOCHS):
    model.train()
    torch.cuda.empty_cache()

    ##########################
    ### Training
    ##########################

    running_cls_loss = 0.0
    running_cls_corrects = 0

    for batch_idx, (image, target) in enumerate(train_dataloader):

        image = image.to(device)
        target = target.to(device)

        # train model
        outputs, loss = train_batch(model, image, target)
        _, preds = torch.max(outputs, 1)

        
        loss_data = loss.data.item()
        if np.isnan(loss_data):
            raise ValueError('loss is nan while training')
        running_cls_loss += loss.item()
        running_cls_corrects += torch.sum(preds == target.data)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    epoch_loss = running_cls_loss / len(trainset)
    epoch_acc = running_cls_corrects.double() / len(trainset)

    print(f'Epoch: {epoch+1}/{NUM_EPOCHS} Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

    training_loss.append(epoch_loss)
    training_acc.append(epoch_acc.cpu().detach().numpy())

    # change learning rate
    scheduler.step()


    ##########################
    ### Testing
    ##########################
    # # eval model during training or in the last epoch
    if (epoch + 1) % EVAL_INTERVAL == 0 or (epoch +1) == NUM_EPOCHS:
        print('Begin test......')
        model.eval()

        val_loss = 0.0
        val_corrects = 0

        for batch_idx, (image, target) in enumerate(test_dataloader):

            image = image.to(device)
            target = target.to(device)

            # test model
            outputs, loss = test_batch(model, image, target)
            _, preds = torch.max(outputs, 1)
            
            val_loss += loss.item()
            val_corrects += torch.sum(preds == target.data)

        val_loss = val_loss / len(testset)
        val_acc = val_corrects.double() / len(testset)
        print(f'Test Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
        testing_loss.append(val_loss)
        testing_acc.append(val_acc.cpu().detach().numpy())

        # save the model in last epoch
        if (epoch +1) == NUM_EPOCHS:
            
            state = {
            'state_dict': model.state_dict(),
            'acc': epoch_acc,
            'epoch': (epoch+1),
            }

            # check the dir
            if not os.path.exists(SAVE_DIR):
                os.makedirs(SAVE_DIR)

            # save the state
            torch.save(state, osp.join(SAVE_DIR, 'checkpoint_%s.pth' % (str(epoch+1))))



Epoch: 1/40 Train Loss: 0.0171 Acc: 0.3299
Begin test......
Test Loss: 0.0169 Acc: 0.2465
Epoch: 2/40 Train Loss: 0.0125 Acc: 0.4288
Begin test......
Test Loss: 0.0157 Acc: 0.2824
Epoch: 3/40 Train Loss: 0.0116 Acc: 0.4711
Begin test......
Test Loss: 0.0157 Acc: 0.2983
Epoch: 4/40 Train Loss: 0.0110 Acc: 0.4909
Begin test......
Test Loss: 0.0140 Acc: 0.3412
Epoch: 5/40 Train Loss: 0.0105 Acc: 0.5178
Begin test......
Test Loss: 0.0139 Acc: 0.3849
Epoch: 6/40 Train Loss: 0.0097 Acc: 0.5537
Begin test......
Test Loss: 0.0127 Acc: 0.4297
Epoch: 7/40 Train Loss: 0.0094 Acc: 0.5706
Begin test......
Test Loss: 0.0127 Acc: 0.4244
Epoch: 8/40 Train Loss: 0.0091 Acc: 0.5855
Begin test......
Test Loss: 0.0114 Acc: 0.4794
Epoch: 9/40 Train Loss: 0.0088 Acc: 0.6032
Begin test......
Test Loss: 0.0129 Acc: 0.4329
Epoch: 10/40 Train Loss: 0.0085 Acc: 0.6157
Begin test......
Test Loss: 0.0120 Acc: 0.4715
Epoch: 11/40 Train Loss: 0.0079 Acc: 0.6426
Begin test......
Test Loss: 0.0107 Acc: 0.5184
Epoch: 1

### Task 2: Plotting Loss and Accuracy
---
The task is to create a function named ``plot_loss_and_accuracy`` that generates a visualization showing the training and testing loss as well as training and testing accuracy over different epochs during the training of a machine learning model.

**To do**: 
1. Plot the training and testing loss curves on the left subplot.
2. Plot the training and testing accuracy curves on the right subplot.

In [12]:


def plot_loss_and_accuracy(training_loss, training_acc, testing_loss, testing_acc):
    """
    Plot training and testing loss, as well as training and testing accuracy.

    Args:
        training_loss (list or array): Training loss values over epochs.
        training_acc (list or array): Training accuracy values over epochs.
        testing_loss (list or array): Testing loss values over epochs.
        testing_acc (list or array): Testing accuracy values over epochs.
    """

    plt.figure(figsize=(10, 4))

     # Plot the training and testing loss curves in the left subplot
    plt.subplot(1, 2, 1)  # loss subplot

    ##################### Write your answer here ##################
    
    
    ###############################################################
    
    plt.title("Loss over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    # Plot the training and testing accuracy curves in the right subplot
    plt.subplot(1, 2, 2)  # accuracy subplot

    ##################### Write your answer here ##################
    
    
    ###############################################################

    plt.title("Accuracy over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.tight_layout()
    plt.show()



In [None]:
plot_loss_and_accuracy(training_loss, training_acc, testing_loss, testing_acc)
# print(training_acc)

### Task 3: Instance inference and visualization
---
The task is to create a function named ``instance_inference`` that visualizes an image along with model predictions and class probabilities. Note that this function assumes that you have a pre-trained model and you are passing the model's output tensor as outputs. The function then extracts predictions and probabilities from this output tensor.
**To do**: 
1. Reverse the normalization process to restore the image to its original scale and range for further plot.
2. Calculate the prediction and the probabilities for each class.
         

In [15]:
def instance_inference(inputs, outputs, class_names, title=None):
    """
    Display image for Tensor with model outputs and class probabilities.
    Args:
        inputs (CxHxW Tensor): Input image tensor.
        outputs (Tensor): Model's output tensor.
        class_names (list): List of class names.
        title (str, optional): Title for the image. Default is None.
    """
    images = inputs.cpu().numpy().transpose((1, 2, 0))

    ##################### Write your answer here ##################
    
    
    
    ###############################################################
    
    images = np.clip(images, 0, 1)
    plt.imshow(images)

    if title is not None:
        plt.title(title)

    ##################### Write your answer here ##################
   
   
    ###############################################################


    predicted_class = class_names[predicted.item()]
    predicted_probability = probabilities[predicted].item()

    plt.text(17, 30, f'Predicted Class: {predicted_class}\nProbability: {predicted_probability:.2f}', 
             color='white', backgroundcolor='black', fontsize=8)
    plt.show()

    # Print probabilities for each class
    print('Print probabilities for each class:')
    for i in range(len(class_names)):
        print(f'{class_names[i]}: {probabilities[i].item():.4f}')




In [None]:
inputs, classes = next(iter(test_dataloader))
inputs = inputs.to(device)
inputs = inputs[3:4]

outputs = model(inputs)

instance_inference(inputs[0].cpu(), outputs, class_names)