In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [4]:
# this time apply some augmentation to the training dataset
# also normalize testing and training set

transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2470, 0.2435, 0.2616))
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2470, 0.2435, 0.2616))
])

In [5]:
trainset = torchvision.datasets.CIFAR10(
    root="./data", train=True, download=True, transform=transform_train
)
testset = torchvision.datasets.CIFAR10(
    root="./data", train=False, download=True, transform=transform_test
)

trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 170498071/170498071 [00:10<00:00, 16106840.30it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [6]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 16x16

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),  # 8x8
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

model = SimpleCNN()

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [8]:
def train_one_epoch(epoch):
    model.train()
    running_loss = 0.0
    correct = 0

    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        correct += (outputs.argmax(dim=1) == labels).sum().item()

    epoch_loss = running_loss / len(trainset)
    epoch_acc = correct / len(trainset)
    print(f"Epoch {epoch}: Loss={epoch_loss:.4f}, Acc={epoch_acc:.4f}")

In [None]:
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',         # because we're monitoring accuracy
    factor=0.5,         # reduce LR by half
    patience=2,         # epochs to wait before reducing LR
    verbose=True
)

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_score = None
        self.counter = 0

    def step(self, score):
        # If first iteration, set best score
        if self.best_score is None:
            self.best_score = score
            return False

        # If improvement is less than min_delta
        if score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        else:
            self.best_score = score
            self.counter = 0

        return False

In [None]:
early_stopping = EarlyStopping(patience=5, min_delta=0.001)

In [None]:
best_test_acc = 0.0

for epoch in range(1, 50):  # max epochs
    print(f"\nEpoch {epoch}")

    # ---- Training ----
    train_one_epoch(epoch)

    # ---- Evaluation ----
    model.eval()
    correct = 0
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            correct += (outputs.argmax(dim=1) == labels).sum().item()

    test_acc = correct / len(testset)
    print(f"Test Accuracy = {test_acc:.4f}")

    # ---- Scheduler Step ----
    scheduler.step(test_acc)

    # ---- Early Stopping ----
    if early_stopping.step(test_acc):
        print("Early stopping triggered!")
        break

    # ---- Save best model ----
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        torch.save(model.state_dict(), "best_cifar10_model.pth")

In [9]:
def evaluate():
    model.eval()
    correct = 0

    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            correct += (outputs.argmax(dim=1) == labels).sum().item()

    test_acc = correct / len(testset)
    print(f"Test Accuracy: {test_acc:.4f}")


In [10]:
for epoch in range(1, 11):
    train_one_epoch(epoch)
    evaluate()

Epoch 1: Loss=1.3060, Acc=0.5341
Test Accuracy: 0.6131
Epoch 2: Loss=0.9532, Acc=0.6664
Test Accuracy: 0.6917
Epoch 3: Loss=0.8253, Acc=0.7115
Test Accuracy: 0.7116
Epoch 4: Loss=0.7393, Acc=0.7419
Test Accuracy: 0.7124
Epoch 5: Loss=0.6649, Acc=0.7683
Test Accuracy: 0.7243
Epoch 6: Loss=0.6067, Acc=0.7881
Test Accuracy: 0.7414
Epoch 7: Loss=0.5597, Acc=0.8049
Test Accuracy: 0.7427
Epoch 8: Loss=0.5127, Acc=0.8203
Test Accuracy: 0.7448
Epoch 9: Loss=0.4634, Acc=0.8382
Test Accuracy: 0.7480
Epoch 10: Loss=0.4320, Acc=0.8492
Test Accuracy: 0.7507
