<a href="https://colab.research.google.com/github/bythyag/overfitting-analysis/blob/main/scratchpad.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Experiment: Overfitting, Underfitting, Regularization (L1, L2, ElasticNet), Early Stopping

Dataset: CIFAR-10
Model: Custom CNN
Framework: PyTorch

In [1]:
#load libraries

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset, random_split
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import copy
import time
import os

In [2]:
# Move to GPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# Data parameters
BATCH_SIZE = 128
VALIDATION_SPLIT = 0.1 # 10% of training data for validation

# Training parameters
EPOCHS = 100 # Increase epochs to ensure overfitting can be observed
LEARNING_RATE = 0.001

# Regularization parameters
WEIGHT_DECAY_L2 = 1e-4 # Lambda for L2
L1_LAMBDA = 1e-5      # Lambda for L1
# Elastic Net uses both WEIGHT_DECAY_L2 and L1_LAMBDA

# Early Stopping parameters
EARLY_STOPPING_PATIENCE = 7
EARLY_STOPPING_MIN_DELTA = 0.001 # Minimum change to qualify as improvement

# For reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

Using device: cuda


In [3]:
print("\n--- Loading Data ---")

# Standard transformations for CIFAR-10
# Normalization values are standard for CIFAR-10
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(), # Basic augmentation
    transforms.RandomCrop(32, padding=4), # Basic augmentation
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Load CIFAR-10 dataset
full_train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                                download=True, transform=transform_test)

# Split training data into training and validation sets
num_train = len(full_train_dataset)
indices = list(range(num_train))
split = int(np.floor(VALIDATION_SPLIT * num_train))
np.random.shuffle(indices) # Shuffle indices

train_idx, val_idx = indices[split:], indices[:split]
train_subset = Subset(full_train_dataset, train_idx)
val_subset = Subset(full_train_dataset, val_idx)

# Adjust validation subset to use test transform (no augmentation)
# Create a new validation dataset instance with the correct indices and transform
# Note: This is slightly less efficient than modifying Subset, but cleaner
val_dataset_clean = torchvision.datasets.CIFAR10(root='./data', train=True,
                                                  download=False, transform=transform_test)
val_subset_eval = Subset(val_dataset_clean, val_idx)


print(f"Full training set size: {len(full_train_dataset)}")
print(f"Training subset size: {len(train_subset)}")
print(f"Validation subset size: {len(val_subset_eval)}")
print(f"Test set size: {len(test_dataset)}")

# Create DataLoaders
train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_subset_eval, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')



--- Loading Data ---


100%|██████████| 170M/170M [00:03<00:00, 43.4MB/s]


Full training set size: 50000
Training subset size: 45000
Validation subset size: 5000
Test set size: 10000


In [4]:
print("\n--- Defining Models ---")

# Model capable of overfitting CIFAR-10
class OverfittingCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(OverfittingCNN, self).__init__()
        self.conv_layer1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 32x32 -> 16x16
        )
        self.conv_layer2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 16x16 -> 8x8
        )
        self.conv_layer3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 8x8 -> 4x4
        )
        self.fc_layer = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 4 * 4, 1024), # 256 channels * 4x4 feature map
            nn.ReLU(inplace=True),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            # nn.Dropout(0.5), # Could add dropout, but focusing on L1/L2/ElasticNet
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.conv_layer1(x)
        x = self.conv_layer2(x)
        x = self.conv_layer3(x)
        x = self.fc_layer(x)
        return x

# Simpler model for demonstrating Underfitting
class UnderfittingCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(UnderfittingCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2) # 32 -> 16
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        # Pool again: 16 -> 8
        self.fc1 = nn.Linear(32 * 8 * 8, 64) # Flattened size
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


--- Defining Models ---


In [5]:
print("\n--- Defining Training & Evaluation ---")

def calculate_l1_loss(model):
    """Calculates the L1 norm of model parameters (weights only)."""
    l1_norm = sum(p.abs().sum() for name, p in model.named_parameters() if 'weight' in name)
    return l1_norm

def evaluate_model(model, data_loader, criterion, device):
    """Evaluates the model on the given data loader."""
    model.eval()  # Set model to evaluation mode
    total_loss = 0.0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs.data, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = accuracy_score(all_labels, all_preds)
    return avg_loss, accuracy

def train_model(model, criterion, optimizer, train_loader, val_loader, epochs, device,
                l1_lambda=0.0, early_stopping_patience=None, early_stopping_min_delta=0.0,
                model_name="Model"):
    """Trains the model with options for L1 regularization and early stopping."""
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None
    stop_epoch = epochs

    print(f"\n--- Training {model_name} ---")
    start_time = time.time()

    for epoch in range(epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            base_loss = criterion(outputs, labels)

            # Add L1 regularization loss (if applicable)
            l1_loss = 0.0
            if l1_lambda > 0:
                l1_loss = l1_lambda * calculate_l1_loss(model)

            loss = base_loss + l1_loss

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

            if (i + 1) % 100 == 0: # Print progress every 100 batches
                 print(f'Epoch [{epoch+1}/{epochs}], Batch [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')


        epoch_train_loss = running_loss / len(train_loader.dataset)
        epoch_train_acc = correct_train / total_train
        history['train_loss'].append(epoch_train_loss)
        history['train_acc'].append(epoch_train_acc)

        # Validation step
        epoch_val_loss, epoch_val_acc = evaluate_model(model, val_loader, criterion, device)
        history['val_loss'].append(epoch_val_loss)
        history['val_acc'].append(epoch_val_acc)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_train_loss:.4f}, Train Acc: {epoch_train_acc:.4f} | "
              f"Val Loss: {epoch_val_loss:.4f}, Val Acc: {epoch_val_acc:.4f}")

        # Early Stopping check
        if early_stopping_patience is not None:
            # Check if validation loss improved
            if epoch_val_loss < best_val_loss - early_stopping_min_delta:
                best_val_loss = epoch_val_loss
                epochs_no_improve = 0
                # Save the best model state
                best_model_state = copy.deepcopy(model.state_dict())
                print(f"  -> Validation loss improved to {best_val_loss:.4f}. Saving model.")
            else:
                epochs_no_improve += 1
                print(f"  -> Validation loss did not improve for {epochs_no_improve} epoch(s).")

            if epochs_no_improve >= early_stopping_patience:
                print(f"\nEarly stopping triggered after epoch {epoch+1}!")
                stop_epoch = epoch + 1
                # Load the best model state found
                if best_model_state is not None:
                    model.load_state_dict(best_model_state)
                    print("  -> Restored best model weights.")
                else:
                     print("  -> Warning: Early stopping triggered, but no best model state was saved.")
                break # Exit the training loop

    end_time = time.time()
    print(f"--- Training Finished for {model_name} in {(end_time - start_time)/60:.2f} minutes ---")

    # If early stopping didn't trigger, the last model is the 'best' in terms of epochs completed
    # If it did trigger, the best model state is already loaded.

    return model, history, stop_epoch # Return trained model and history


--- Defining Training & Evaluation ---


In [None]:
criterion = nn.CrossEntropyLoss()
results = {} # To store history for plotting
final_models = {} # To store trained models for final evaluation
stopped_epochs = {} # To store where training stopped

# --- Experiment 1: Underfitting ---
print("\n=== Experiment 1: Underfitting ===")
underfit_model = UnderfittingCNN(num_classes=10).to(DEVICE)
optimizer_underfit = optim.Adam(underfit_model.parameters(), lr=LEARNING_RATE)
final_models['Underfitting'], results['Underfitting'], stopped_epochs['Underfitting'] = train_model(
    underfit_model, criterion, optimizer_underfit, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, model_name="Underfitting"
)

# --- Experiment 2: Baseline (Potential Overfitting) ---
print("\n=== Experiment 2: Baseline (Overfitting) ===")
baseline_model = OverfittingCNN(num_classes=10).to(DEVICE)
optimizer_baseline = optim.Adam(baseline_model.parameters(), lr=LEARNING_RATE, weight_decay=0) # No L2
final_models['Baseline'], results['Baseline'], stopped_epochs['Baseline'] = train_model(
    baseline_model, criterion, optimizer_baseline, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, l1_lambda=0.0, model_name="Baseline" # No L1
)

# --- Experiment 3: L2 Regularization ---
print("\n=== Experiment 3: L2 Regularization ===")
l2_model = OverfittingCNN(num_classes=10).to(DEVICE)
# AdamW applies weight decay correctly (decoupled weight decay)
optimizer_l2 = optim.AdamW(l2_model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY_L2)
final_models['L2 Regularization'], results['L2 Regularization'], stopped_epochs['L2 Regularization'] = train_model(
    l2_model, criterion, optimizer_l2, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, l1_lambda=0.0, model_name="L2 Regularization" # No L1
)

# --- Experiment 4: L1 Regularization ---
print("\n=== Experiment 4: L1 Regularization ===")
l1_model = OverfittingCNN(num_classes=10).to(DEVICE)
# Use standard Adam, L1 is added manually in the training loop
optimizer_l1 = optim.Adam(l1_model.parameters(), lr=LEARNING_RATE, weight_decay=0) # No L2 here
final_models['L1 Regularization'], results['L1 Regularization'], stopped_epochs['L1 Regularization'] = train_model(
    l1_model, criterion, optimizer_l1, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, l1_lambda=L1_LAMBDA, model_name="L1 Regularization"
)

# --- Experiment 5: Elastic Net Regularization ---
print("\n=== Experiment 5: Elastic Net Regularization ===")
elastic_model = OverfittingCNN(num_classes=10).to(DEVICE)
# Use AdamW for L2 part, L1 is added manually
optimizer_elastic = optim.AdamW(elastic_model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY_L2)
final_models['Elastic Net'], results['Elastic Net'], stopped_epochs['Elastic Net'] = train_model(
    elastic_model, criterion, optimizer_elastic, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, l1_lambda=L1_LAMBDA, model_name="Elastic Net"
)

# --- Experiment 6: Early Stopping ---
print("\n=== Experiment 6: Early Stopping ===")
early_stop_model = OverfittingCNN(num_classes=10).to(DEVICE)
optimizer_early_stop = optim.Adam(early_stop_model.parameters(), lr=LEARNING_RATE, weight_decay=0) # No L2
final_models['Early Stopping'], results['Early Stopping'], stopped_epochs['Early Stopping'] = train_model(
    early_stop_model, criterion, optimizer_early_stop, train_loader, val_loader,
    epochs=EPOCHS, device=DEVICE, l1_lambda=0.0, # No L1
    early_stopping_patience=EARLY_STOPPING_PATIENCE,
    early_stopping_min_delta=EARLY_STOPPING_MIN_DELTA,
    model_name="Early Stopping"
)


=== Experiment 1: Underfitting ===

--- Training Underfitting ---
Epoch [1/100], Batch [100/352], Loss: 1.7056
Epoch [1/100], Batch [200/352], Loss: 1.4665
Epoch [1/100], Batch [300/352], Loss: 1.4363
Epoch 1/100 | Train Loss: 1.6597, Train Acc: 0.3951 | Val Loss: 1.4182, Val Acc: 0.4892
Epoch [2/100], Batch [100/352], Loss: 1.5268
Epoch [2/100], Batch [200/352], Loss: 1.3900
Epoch [2/100], Batch [300/352], Loss: 1.2421
Epoch 2/100 | Train Loss: 1.3814, Train Acc: 0.5052 | Val Loss: 1.2248, Val Acc: 0.5720
Epoch [3/100], Batch [100/352], Loss: 1.3527
Epoch [3/100], Batch [200/352], Loss: 1.4779
Epoch [3/100], Batch [300/352], Loss: 1.2782
Epoch 3/100 | Train Loss: 1.2677, Train Acc: 0.5492 | Val Loss: 1.1355, Val Acc: 0.5994
Epoch [4/100], Batch [100/352], Loss: 1.1106
Epoch [4/100], Batch [200/352], Loss: 1.0811
Epoch [4/100], Batch [300/352], Loss: 1.0783
Epoch 4/100 | Train Loss: 1.1814, Train Acc: 0.5801 | Val Loss: 1.0519, Val Acc: 0.6216
Epoch [5/100], Batch [100/352], Loss: 1.1

In [None]:
print("\n--- Plotting Results ---")

def plot_history(results, stopped_epochs, title_prefix=""):
    """Plots training and validation loss and accuracy."""
    plt.style.use('seaborn-v0_8-darkgrid') # Pretty plots
    fig, axs = plt.subplots(1, 2, figsize=(18, 6))
    fig.suptitle(f'{title_prefix} Training History Comparison', fontsize=16)

    # Plot Loss
    for name, history in results.items():
        epochs_ran = stopped_epochs[name]
        epochs_axis = range(1, epochs_ran + 1)
        axs[0].plot(epochs_axis, history['train_loss'][:epochs_ran], label=f'{name} Train Loss')
        axs[0].plot(epochs_axis, history['val_loss'][:epochs_ran], label=f'{name} Val Loss', linestyle='--')
    axs[0].set_title('Loss per Epoch')
    axs[0].set_xlabel('Epochs')
    axs[0].set_ylabel('Loss')
    axs[0].legend(loc='best')
    axs[0].grid(True)
    axs[0].set_ylim(bottom=0) # Loss cannot be negative

    # Plot Accuracy
    for name, history in results.items():
        epochs_ran = stopped_epochs[name]
        epochs_axis = range(1, epochs_ran + 1)
        axs[1].plot(epochs_axis, history['train_acc'][:epochs_ran], label=f'{name} Train Acc')
        axs[1].plot(epochs_axis, history['val_acc'][:epochs_ran], label=f'{name} Val Acc', linestyle='--')
    axs[1].set_title('Accuracy per Epoch')
    axs[1].set_xlabel('Epochs')
    axs[1].set_ylabel('Accuracy')
    axs[1].legend(loc='best')
    axs[1].grid(True)
    axs[1].set_ylim(0, 1.05) # Accuracy between 0 and 1

    plt.tight_layout(rect=[0, 0.03, 1, 0.95]) # Adjust layout to prevent title overlap
    plt.show()

# Plot all results together
plot_history(results, stopped_epochs)

# Optionally, plot specific comparisons if the main plot is too crowded
# Example: Baseline vs L2 vs Early Stopping
plot_history({k: results[k] for k in ['Baseline', 'L2 Regularization', 'Early Stopping']},
              {k: stopped_epochs[k] for k in ['Baseline', 'L2 Regularization', 'Early Stopping']},
              title_prefix="Baseline vs L2 vs Early Stopping")


In [None]:
print("\n--- Final Evaluation on Test Set ---")

evaluation_summary = []

for name, model in final_models.items():
    print(f"Evaluating {name}...")
    test_loss, test_acc = evaluate_model(model, test_loader, criterion, DEVICE)
    # Get more detailed report (optional, can be slow if test set is huge)
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
      for inputs, labels in test_loader:
        inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    report = classification_report(all_labels, all_preds, target_names=classes, output_dict=True)
    test_acc_from_report = report['accuracy'] # Should match test_acc

    evaluation_summary.append({
        "Model": name,
        "Test Loss": test_loss,
        "Test Accuracy": test_acc,
        "Stopped Epoch": stopped_epochs[name]
    })
    print(f"{name} - Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Stopped Epoch: {stopped_epochs[name]}")


# Create and print a Pandas DataFrame for a nice table
evaluation_df = pd.DataFrame(evaluation_summary)
print("\n--- Evaluation Summary Table ---")
print(evaluation_df.to_string(index=False))
