# Model Optimization for CIFAR-10 Classification

This notebook presents the main process of selecting and optimizing the final model for CIFAR-10 classification.  

In [1]:
%matplotlib inline
import os
import pickle
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.transforms import RandAugment
from torch.utils.data import DataLoader, random_split
from torch.optim.lr_scheduler import CosineAnnealingLR
from PIL import Image
from torchsummary import summary
from torch.optim import RAdam
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt

## Baseline Model (Version 1)

We built the first version of the model.

In [3]:
# Automatically select CPU or GPU for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Define CIFAR-10 dataset paths
cifar10_dir = './data/cifar-10-python/cifar-10-batches-py'
cifar_test_path = './data/cifar_test_nolabel.pkl'

# Function to load CIFAR-10 batch files
def load_cifar_batch(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

# Load metadata
meta_data_dict = load_cifar_batch(os.path.join(cifar10_dir, 'batches.meta'))
label_names = [label.decode('utf-8') for label in meta_data_dict[b'label_names']]

# Load training data
train_data = []
train_labels = []
for i in range(1, 6):
    batch = load_cifar_batch(os.path.join(cifar10_dir, f'data_batch_{i}'))
    train_data.append(batch[b'data'])
    train_labels += batch[b'labels']

# Convert data to the correct format (HWC)
train_data = np.vstack(train_data).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
train_labels = np.array(train_labels)

# Define data augmentation and normalization
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)),  # Apply random affine transformations
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    transforms.RandomErasing(p=0.1, scale=(0.02, 0.08), value=0)  # Reduce random erasing strength
])

# Convert to TensorDataset and apply transformation
train_dataset = [(transform(img), label) for img, label in zip(train_data, train_labels)]

# Split dataset into training (90%) and validation (10%)
train_size = int(0.9 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Load test dataset
test_batch = load_cifar_batch(cifar_test_path)
test_images = test_batch[b'data'].astype(np.float32) / 255.0
test_images = test_images.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  

# Convert test dataset to Tensor format
test_dataset = [(transform(img)) for img in test_images]
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4)

# Create DataLoaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4)

# Define the training function
def train_model(model, train_loader, val_loader, epochs=50):
    # CrossEntropyLoss with label smoothing to improve generalization
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    # Use AdamW optimizer with weight decay
    optimizer = optim.AdamW(model.parameters(), lr=0.003, weight_decay=5e-5)
    # Cosine annealing learning rate scheduler
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs)

    for epoch in range(epochs):
        model.train() 
        running_loss = 0.0

        # Training loop
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device) 
            optimizer.zero_grad() 
            outputs = model(images)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()  # Backpropagation
            optimizer.step()  
            running_loss += loss.item()

        # Validation phase
        model.eval() 
        correct = 0
        total = 0
        with torch.no_grad(): 
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1) 
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Adjust learning rate
        scheduler.step()

        print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}, Validation Accuracy: {100 * correct / total:.2f}%')


# Define Residual Block module
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Skip connection for residual learning
        self.skip = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
        # Dropout layer
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        identity = x
        if self.skip:
            identity = self.skip(x)
        # Forward pass through the two convolutional layers
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity  # Add residual connection
        out = self.relu(out)
        out = self.dropout(out)
        return out

# Define Custom ResNet model
class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        self.init_conv = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.init_bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        
        # Define multiple residual layers
        self.layer1 = ResidualBlock(64, 128, stride=1)
        self.layer2 = ResidualBlock(128, 256, stride=2)
        self.layer3 = ResidualBlock(256, 512, stride=2)

        # Global average pooling and fully connected layer
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        out = self.init_conv(x)
        out = self.init_bn(out)
        out = self.relu(out)
        # Forward pass through each residual block stage
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # Global average pooling
        out = self.avg_pool(out)
        out = torch.flatten(out, 1) # Flatten for fully connected layer
        out = self.fc(out) 
        return out


# Function to evaluate model performance
def evaluate_model(model, dataloader, dataset_name="Dataset"):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  # Forward pass
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # Compute final accuracy
    accuracy = 100 * correct / total
    print(f'Final {dataset_name} Accuracy: {accuracy:.2f}%')
    return accuracy

# Train the model
model = CustomResNet().to(device)
# Print parameters
summary(model, (3, 32, 32))
# Train the model for 50 epochs
train_model(model, train_loader, val_loader, epochs=50)
final_val_accuracy = evaluate_model(model, val_loader, "Validation Set")

Using device: cuda
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
              ReLU-3           [-1, 64, 32, 32]               0
            Conv2d-4          [-1, 128, 32, 32]           8,192
       BatchNorm2d-5          [-1, 128, 32, 32]             256
            Conv2d-6          [-1, 128, 32, 32]          73,728
       BatchNorm2d-7          [-1, 128, 32, 32]             256
              ReLU-8          [-1, 128, 32, 32]               0
            Conv2d-9          [-1, 128, 32, 32]         147,456
      BatchNorm2d-10          [-1, 128, 32, 32]             256
             ReLU-11          [-1, 128, 32, 32]               0
          Dropout-12          [-1, 128, 32, 32]               0
    ResidualBlock-13          [-1, 128, 32, 32]               0
           Conv2d-14

## Model with MixUp (Version 2)

In this version, MixUp augmentation is introduced to prevent overfitting and improve generalization. 

In [4]:
#######
# This part of the code design is no different from the baseline model
#######

# Define MixUp data augmentation function
def mixup_data(x, y, alpha=0.2):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam
    
# Define the training function
def train_model(model, train_loader, val_loader, epochs=50):
    # CrossEntropyLoss with label smoothing to improve generalization
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    # Use AdamW optimizer with weight decay
    optimizer = optim.AdamW(model.parameters(), lr=0.003, weight_decay=5e-5)
    # Cosine annealing learning rate scheduler
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs)

    for epoch in range(epochs):
        model.train() 
        running_loss = 0.0

        # Training loop
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device) 
            optimizer.zero_grad() 
            
            # Apply MixUp data augmentation
            images, targets_a, targets_b, lam = mixup_data(images, labels, alpha=0.2)
            outputs = model(images)  # Forward pass
            loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)
            
            loss.backward()  # Backpropagation
            optimizer.step()  
            running_loss += loss.item()

        # Validation phase
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Adjust learning rate
        scheduler.step()

        print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}, Validation Accuracy: {100 * correct / total:.2f}%')


# Define Residual Block module
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Skip connection for residual learning
        self.skip = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
        # Dropout layer
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        identity = x
        if self.skip:
            identity = self.skip(x)
        # Forward pass through the two convolutional layers
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity  # Add residual connection
        out = self.relu(out)
        out = self.dropout(out)
        return out


# Define Custom ResNet model
class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        self.init_conv = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.init_bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        
        # Define multiple residual layers
        self.layer1 = ResidualBlock(64, 128, stride=1)
        self.layer2 = ResidualBlock(128, 256, stride=2)
        self.layer3 = ResidualBlock(256, 512, stride=2)

        # Global average pooling and fully connected layer
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        out = self.init_conv(x)
        out = self.init_bn(out)
        out = self.relu(out)
        # Forward pass through each residual block stage
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # Global average pooling
        out = self.avg_pool(out)
        out = torch.flatten(out, 1) # Flatten for fully connected layer
        out = self.fc(out) 
        return out

        
# Function to evaluate model performance
def evaluate_model(model, dataloader, dataset_name="Dataset"):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  # Forward pass
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # Compute final accuracy
    accuracy = 100 * correct / total
    print(f'Final {dataset_name} Accuracy: {accuracy:.2f}%')
    return accuracy

# Train the model
model = CustomResNet().to(device)
# Print parameters
summary(model, (3, 32, 32))
# Train the model for 50 epochs
train_model(model, train_loader, val_loader, epochs=50)
final_val_accuracy = evaluate_model(model, val_loader, "Validation Set")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
              ReLU-3           [-1, 64, 32, 32]               0
            Conv2d-4          [-1, 128, 32, 32]           8,192
       BatchNorm2d-5          [-1, 128, 32, 32]             256
            Conv2d-6          [-1, 128, 32, 32]          73,728
       BatchNorm2d-7          [-1, 128, 32, 32]             256
              ReLU-8          [-1, 128, 32, 32]               0
            Conv2d-9          [-1, 128, 32, 32]         147,456
      BatchNorm2d-10          [-1, 128, 32, 32]             256
             ReLU-11          [-1, 128, 32, 32]               0
          Dropout-12          [-1, 128, 32, 32]               0
    ResidualBlock-13          [-1, 128, 32, 32]               0
           Conv2d-14          [-1, 256,

The accuracy of the second version (Model with MixUp) is slightly improved over the first version (Baseline Model), so the next step is to further optimize the model.

## Refined Model with Adjusted Residual Blocks (Version 3)`

This version increases model depth by adding multiple residual blocks per stage and adjusts channel sizes from 64 → 512 to 32 → 256 to reduce parameters.

In [5]:
#######
# This part of the code design is no different from the Model with MixUp version
#######


# Define Residual Block module
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        # First convolutional layer in the residual block
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        # Second convolutional layer in the residual block
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Skip connection for residual learning
        self.skip = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
        # Dropout layer
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        identity = x
        if self.skip:
            identity = self.skip(x) 
        # Forward pass through the two convolutional layers
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        # Add the skip connection (residual connection)
        out += identity
        out = self.relu(out)
        out = self.dropout(out)
        return out

class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        # Initial convolutional layer
        self.init_conv = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
        self.init_bn = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)
        # Modified residual block structure with multiple residual blocks per stage
        self.layer1 = nn.Sequential(ResidualBlock(32, 64, stride=1), ResidualBlock(64, 64, stride=1))
        self.layer2 = nn.Sequential(ResidualBlock(64, 128, stride=2), ResidualBlock(128, 128, stride=1))
        self.layer3 = nn.Sequential(ResidualBlock(128, 256, stride=2), ResidualBlock(256, 256, stride=1))
        # Global average pooling and fully connected layer
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        # Final classification layer
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        out = self.init_conv(x)
        out = self.init_bn(out)
        out = self.relu(out)
        # Forward pass through each residual block stage
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # Global average pooling
        out = self.avg_pool(out)
        out = torch.flatten(out, 1) # Flatten for fully connected layer
        out = self.fc(out)
        return out

# Function to evaluate model performance
def evaluate_model(model, dataloader, dataset_name="Dataset"):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  # Forward pass
            _, predicted = torch.max(outputs, 1)  # Get class with highest probability
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    # Compute final accuracy
    accuracy = 100 * correct / total
    print(f'Final {dataset_name} Accuracy: {accuracy:.2f}%')
    return accuracy

# Train the model
model = CustomResNet().to(device)
# Print parameters
summary(model, (3, 32, 32))
# Train the model for 50 epochs
train_model(model, train_loader, val_loader, epochs=50)
final_val_accuracy = evaluate_model(model, val_loader, "Validation Set")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 32, 32]             864
       BatchNorm2d-2           [-1, 32, 32, 32]              64
              ReLU-3           [-1, 32, 32, 32]               0
            Conv2d-4           [-1, 64, 32, 32]           2,048
       BatchNorm2d-5           [-1, 64, 32, 32]             128
            Conv2d-6           [-1, 64, 32, 32]          18,432
       BatchNorm2d-7           [-1, 64, 32, 32]             128
              ReLU-8           [-1, 64, 32, 32]               0
            Conv2d-9           [-1, 64, 32, 32]          36,864
      BatchNorm2d-10           [-1, 64, 32, 32]             128
             ReLU-11           [-1, 64, 32, 32]               0
          Dropout-12           [-1, 64, 32, 32]               0
    ResidualBlock-13           [-1, 64, 32, 32]               0
           Conv2d-14           [-1, 64,

After improving the model structure, the validation accuracy increased, demonstrating the effectiveness of architectural refinement.

## Optimized Model (Final Version)

In the final version of the model, based on the current model, we have carried out the following optimization:
1. Data Augmentation Strategy Updated: Applied augmentation only to training data, keeping test data unchanged to improve generalization.
2. Optimizer Change: Switched from AdamW to RAdam, with learning rate reduced from 0.003 → 0.001 and weight decay increased from 5e-5 → 1e-4 for better convergence.
3. Learning Rate Scheduler Adjustment: Replaced CosineAnnealingLR with ReduceLROnPlateau to dynamically adjust learning rate based on validation performance.
4. Enhanced Data Augmentation: Switched from manually defined augmentations (RandomCrop, ColorJitter, RandomAffine) to RandAugment, allowing automated augmentation selection.
5. Increased Training Epochs: Raised from 50 → 150 for better learning stability and improved accuracy.\
\
The process of final model is implemented in 'main.ipynb', and the verification accuracy exceeded **90%**.