In [21]:
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import random
import copy
from torch.utils.data import random_split, DataLoader

# ---------- Reproducibility ---------- #
def set_seed(seed=42):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

# ---------- Show sample images ---------- #
def imshow(img):
    img = img / 2 + 0.5
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.axis('off')
    plt.show()

In [22]:
# ---------- EarlyStopping Helper ---------- #
class EarlyStopping:
    def __init__(self, patience=10, delta=0):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_acc):
        score = val_acc
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0


In [23]:
def mixup_data(x, y, alpha=0.2):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [24]:
# ---------- Models ---------- #
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class ImprovedCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.4)
        self.fc1 = nn.Linear(128 * 4 * 4, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = x.view(-1, 128 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# ---------- Residual Block ---------- #
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample=False):
        super().__init__()
        stride = 2 if downsample else 1

        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels)
        )

        self.relu = nn.ReLU(inplace=True)

        self.downsample = nn.Sequential()
        if downsample or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.downsample(x)
        out = self.block(x)
        out += identity
        return self.relu(out)

# ---------- ResNet Style CNN ---------- #
class ResNetStyleCNN(nn.Module):
    def __init__(self):
        super().__init__()

        self.initial = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.layer1 = nn.Sequential(
            ResidualBlock(64, 64),
            ResidualBlock(64, 64)
        )

        self.layer2 = nn.Sequential(
            ResidualBlock(64, 128, downsample=True),
            ResidualBlock(128, 128)
        )

        self.layer3 = nn.Sequential(
            ResidualBlock(128, 256, downsample=True),
            ResidualBlock(256, 256)
        )

        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, 10)

    def forward(self, x):
        x = self.initial(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [25]:
# ---------- Prepare CIFAR-10 ---------- #
def prepare_data(batch_size=64):
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness = 0.2, contrast = 0.2, saturation = 0.2),
        transforms.ToTensor(),
        transforms.RandomErasing(p=0.25, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    full_trainset = torchvision.datasets.CIFAR10(root='.', train=True, download=True, transform=transform)
    testset = torchvision.datasets.CIFAR10(root='.', train=False, download=True, transform=transform)
    train_size = int(0.8 * len(full_trainset))
    val_size = len(full_trainset) - train_size
    trainset, valset = random_split(full_trainset, [train_size, val_size])
    kwargs = {'num_workers': 2, 'pin_memory': torch.cuda.is_available()}
    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, **kwargs)
    valloader = DataLoader(valset, batch_size=batch_size, shuffle=False, **kwargs)
    testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, **kwargs)
    return trainloader, valloader, testloader, full_trainset.classes

# ---------- Train, Validate, Test ---------- #
def train_evaluate(model, trainloader, valloader, testloader, device, name="Model", epochs=5, use_mixup = True):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = epochs)
    early_stopper = EarlyStopping(patience = 10, delta = 0.001)
    best_val_acc = 0.0

    train_losses, val_losses, train_accs, val_accs = [], [], [], []

    for epoch in range(epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for x, y in trainloader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()

            if use_mixup:
                inputs, targets_a, targets_b, lam = mixup_data(x, y, alpha=0.2)
                out = model(inputs)
                loss = mixup_criterion(criterion, out, targets_a, targets_b, lam)
            else:
                out = model(x)
                loss = criterion(out, y)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, pred = torch.max(out, 1)
            correct += (pred == y).sum().item()
            total += y.size(0)

        train_losses.append(running_loss / len(trainloader))
        train_accs.append(100 * correct / total)

        # Validation
        model.eval()
        val_loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for x, y in valloader:
                x, y = x.to(device), y.to(device)
                out = model(x)
                val_loss += criterion(out, y).item()
                _, pred = torch.max(out, 1)
                correct += (pred == y).sum().item()
                total += y.size(0)

        val_losses.append(val_loss / len(valloader))
        val_acc = 100 * correct / total
        val_accs.append(val_acc)
        print(f"[{name}] Epoch {epoch+1} - Train Acc: {train_accs[-1]:.2f}% | Val Acc: {val_acc:.2f}%")

        scheduler.step()

        # Save model if validation improves
        if val_accs[-1] > best_val_acc:
            best_val_acc = val_accs[-1]
            torch.save(model.state_dict(), f"{name}_best_model.pth")

        # Early stopping check
        early_stopper(val_accs[-1])
        if early_stopper.early_stop:
            print(f"Early stopping at epoch {epoch+1}")
            break


    # Test
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for x, y in testloader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            _, pred = torch.max(out, 1)
            correct += (pred == y).sum().item()
            total += y.size(0)
    print(f"[{name}] Test Accuracy: {100 * correct / total:.2f}%")

    return train_losses, val_losses, train_accs, val_accs


In [26]:
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

def evaluate_model(model, dataloader, device, class_names):
    model.eval()
    all_preds, all_targets = [], []

    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            _, preds = torch.max(out, 1)
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(y.cpu().numpy())

    print("\nðŸ“Š Classification Report:")
    print(classification_report(all_targets, all_preds, target_names=class_names))

    cm = confusion_matrix(all_targets, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap="Blues", xticks_rotation=45)
    plt.title("Confusion Matrix")
    plt.show()

In [27]:
def plot_curves(train_losses, val_losses, train_accs, val_accs, title="Model"):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label="Train Loss")
    plt.plot(epochs, val_losses, label="Val Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title(f"{title} - Loss Curve")
    plt.legend()
    plt.grid()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accs, label="Train Accuracy")
    plt.plot(epochs, val_accs, label="Val Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy (%)")
    plt.title(f"{title} - Accuracy Curve")
    plt.legend()
    plt.grid()

    plt.tight_layout()
    plt.show()

In [28]:
def summarize_results(results):
    print("\n Summary of Final Validation Accuracies:\n")
    for name, metrics in results.items():
        print(f"{name:20s} - Final Val Acc: {metrics['val_accs'][-1]:.2f}%")

In [29]:
def load_best_model(model_class, path, device):
    model = model_class().to(device)
    model.load_state_dict(torch.load(path, map_location=device))
    model.eval()
    print(f"Loaded best model from {path}")
    return model

In [30]:
def run_all_models(model_classes, epochs=5, use_mixup=True):
    set_seed()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainloader, valloader, testloader, classes = prepare_data()

    results = {}

    for model_class in model_classes:
        name = model_class.__name__
        print(f"\n Running model: {name}\n")
        model = model_class()

        train_losses, val_losses, train_accs, val_accs = train_evaluate(
            model, trainloader, valloader, testloader, device, name, epochs, use_mixup=use_mixup
        )

        # Plot training curves
        plot_curves(train_losses, val_losses, train_accs, val_accs, title=name)

        # Load and evaluate best model
        best_model = load_best_model(model_class, f"{name}_best_model.pth", device)
        evaluate_model(best_model, testloader, device, classes)

        results[name] = {
            "train_losses": train_losses,
            "val_losses": val_losses,
            "train_accs": train_accs,
            "val_accs": val_accs
        }

    summarize_results(results)
    return results


In [31]:
#if __name__ == "__main__":
#    run_all_models([SimpleCNN, ImprovedCNN, ResNetStyleCNN], epochs=40, use_mixup = False)

In [None]:
if __name__ == "__main__":
    run_all_models([ResNetStyleCNN], epochs=40, use_mixup = False)


 Running model: ResNetStyleCNN

[ResNetStyleCNN] Epoch 1 - Train Acc: 31.28% | Val Acc: 40.51%
[ResNetStyleCNN] Epoch 2 - Train Acc: 48.81% | Val Acc: 55.16%
