## Import libraries

In [None]:
!pip install netcal
!pip install torchsummary

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import time
import os
import numpy as np
from torchsummary import summary
from torch.nn.functional import softmax
import torch.nn.functional as F
import netcal.metrics as metrics
from netcal.metrics import ECE
from sklearn.metrics import precision_recall_curve, auc, roc_auc_score, roc_curve
from torch.utils.data import random_split
import multiprocessing
multiprocessing.set_start_method('spawn', force=True)
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


##  Prepare the data and the model

### Load and Augment CIFAR-10 and CIFAR100

Similar to the paper, I only use Horizontal Flip for data augmentation

In [4]:
# Define the transform
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Load the train set
full_trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                             download=True, transform=transform_train)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(full_trainset))  # 80% for training
valid_size = len(full_trainset) - train_size  # 20% for validation
train_subset, valid_subset = random_split(full_trainset, [train_size, valid_size])

# Create DataLoaders
trainloader = torch.utils.data.DataLoader(train_subset, batch_size=128,
                                          shuffle=True, num_workers=2)
validloader = torch.utils.data.DataLoader(valid_subset, batch_size=128,
                                          shuffle=False, num_workers=2)

# Load the test set
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=128,
                                         shuffle=False, num_workers=2)

# Verify the sizes of the datasets
print(f'Training set size: {len(train_subset)}')
print(f'Validation set size: {len(valid_subset)}')
print(f'Test set size: {len(testset)}')

Files already downloaded and verified
Files already downloaded and verified
Training set size: 40000
Validation set size: 10000
Test set size: 10000


In [5]:
# Define the transform
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load the CIFAR100 dataset
full_trainset100 = torchvision.datasets.CIFAR100(root='./data', train=True,
                                                 download=True, transform=transform)

# Split the dataset into training and validation sets
train_size100 = int(0.8 * len(full_trainset100))  # 80% for training
valid_size100 = len(full_trainset100) - train_size100  # 20% for validation
train_subset100, valid_subset100 = random_split(full_trainset100, [train_size100, valid_size100])

# Create DataLoaders
trainloader100 = torch.utils.data.DataLoader(train_subset100, batch_size=128,
                                             shuffle=True, num_workers=2)
validloader100 = torch.utils.data.DataLoader(valid_subset100, batch_size=128,
                                             shuffle=False, num_workers=2)

# Load the test set
testset100 = torchvision.datasets.CIFAR100(root='./data', train=False,
                                           download=True, transform=transform)
testloader100 = torch.utils.data.DataLoader(testset100, batch_size=128,
                                            shuffle=False, num_workers=2)

# Verify the sizes of the datasets
print(f'Training set size: {len(train_subset100)}')
print(f'Validation set size: {len(valid_subset100)}')
print(f'Test set size: {len(testset100)}')

Files already downloaded and verified
Files already downloaded and verified
Training set size: 40000
Validation set size: 10000
Test set size: 10000


### Define the ResNet50 model

In [6]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

In [None]:
resnet50 = ResNet(Bottleneck, [3, 4, 6, 3], num_classes=10)
resnet50.to(device)
summary(resnet50, (3, 32, 32))

### Define organized train and test functions for the model

In [8]:
def train_model(model: nn.Module, train_loader: DataLoader, val_loader: DataLoader,
                epochs: int = 10, learning_rate: float = 0.005, gamma_lr: float = 0.1,
                milestones: list = [5, 15], save_path: str = 'model.pth',Weight_decay: float = 5e-4) -> (list, list):
    """
    Trains the model and evaluates it on the validation set after each epoch.

    Parameters:
        model (nn.Module): The neural network model to train.
        train_loader (DataLoader): DataLoader for the training dataset.
        val_loader (DataLoader): DataLoader for the validation dataset.
        epochs (int): The number of epochs to train the model.
        learning_rate (float): The learning rate for the optimizer.
        gamma_lr (float): Factor by which the learning rate will be multiplied at each milestone.
        milestones (list): List of epoch indices at which to adjust the learning rate.
        save_path (str): Path to save the trained model state.

    Returns:
        train_losses (tuple): A tuple containing lists of training losses per epoch.
        val_losses (tuple): A tuple containing lists of validation losses per epoch.
    """
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=Weight_decay)
    criterion = nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=gamma_lr)

    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        model.train()  # Set model to training mode
        train_loss = 0.0

        # Training loop
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()  # Zero the parameter gradients
            output = model(data)  # Forward pass
            loss = criterion(output, target)  # Loss calculation
            loss.backward()  # Backward pass (backpropagation)
            optimizer.step()  # Optimize model parameters
            train_loss += loss.item()

        # Store average training loss
        train_losses.append(train_loss / len(train_loader))

        # Validation loop
        val_loss = 0.0
        model.eval()  # Set model to evaluation mode
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item()

        # Store average validation loss
        val_losses.append(val_loss / len(val_loader))

        # Print epoch summary
        print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}')

        scheduler.step()  # Adjust learning rate

    # Save the trained model state
    torch.save(model.state_dict(), save_path)

    # Plot training and validation losses
    plt.figure(figsize=(10, 4))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    return train_losses, val_losses

In [9]:
def test_model(model: nn.Module, test_loader: DataLoader, load_path: str = 'vit_mnist.pth') -> None:
    """
    Evaluates the model on the test dataset.

    Parameters:
        model (nn.Module): The neural network model to be evaluated.
        test_loader (DataLoader): DataLoader for the test dataset.
        load_path (str): Path to the file from which the model state is loaded.
    """
    # Load the saved model state
    model.load_state_dict(torch.load(load_path))
    model.eval()  # Set the model to evaluation mode

    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():  # No gradient calculation for inference
        for data, target in test_loader:
            # Move data and target to the same device as the model
            data, target = data.to(device), target.to(device)

            # Forward pass and loss calculation
            output = model(data)
            test_loss += criterion(output, target).item()

            # Prediction and accuracy calculation
            pred = output.argmax(dim=1, keepdim=True).squeeze()
            correct += pred.eq(target).sum().item()

            # Storing all predictions and targets for F1 score calculation
            all_preds.extend(pred.tolist())
            all_targets.extend(target.tolist())

    # Calculate average test loss and accuracy
    test_loss /= len(test_loader)
    accuracy = correct / len(test_loader.dataset)

    # Calculate F1 score
    f1 = f1_score(all_targets, all_preds, average='weighted')

    # Print results
    print(f'\n\n Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy}%), F1 Score: {f1:.4f}')


## CIFAR10

In [None]:
train_losses, val_losses = train_model(resnet50, trainloader, validloader,
                                       epochs=200, learning_rate=0.1, gamma_lr=0.2,
                                       milestones=[60, 120,160], save_path='./resnet50_cifar10.pth', Weight_decay=5e-4)

In [None]:
test_model(resnet50, testloader , load_path="./resnet50_cifar10.pth")

## CIFAR100

In [None]:
train_losses, val_losses = train_model(resnet50, trainloader100, validloader100,
                                       epochs=200, learning_rate=0.1, gamma_lr=0.2,
                                       milestones=[60, 120,160], save_path='./resnet50_cifar100.pth', Weight_decay=5e-4)

In [None]:
test_model(resnet50, testloader100 , load_path="./resnet50_cifar100.pth")