In [1]:
import PIL
import torch
import torchvision.transforms as T
from PIL import Image
import glob
from torch.utils.data import Dataset, DataLoader
from matplotlib import pyplot as plt
import os
import torch.nn as nn
import torch.optim as optim

## Q1

Implement L2 regularization on cat-dog classification neural network. Train the model on thedataset, and observe the impact of the regularization on the weight parameters. (Do not usedata augmentation).

a. L2 regularization using optimizer’s weight decay

b. L2 regularization using loop to find L2 norm of weights

In [2]:
import torch
import torchvision.transforms as T
from PIL import Image
import os
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

preprocess = T.Compose([
    T.Resize((128, 128)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class MyDataset(Dataset):
    def __init__(self, transform=None, split="train"):
        self.imgs_path = os.path.join("../data/cats_and_dogs_filtered", split)
        self.data = [
            [os.path.join(class_path, img), class_name]
            for class_name in os.listdir(self.imgs_path)
            if os.path.isdir(class_path := os.path.join(self.imgs_path, class_name))
            for img in os.listdir(class_path)
            if img.endswith('.jpg')
        ]
        self.class_map = {"dogs": 0, "cats": 1}
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_path, class_name = self.data[index]
        img = Image.open(img_path).convert('RGB')
        class_id = torch.tensor(self.class_map[class_name])
        if self.transform:
            img = self.transform(img)
        return img, class_id

class CatsDogsCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 32 * 32, 512),
            nn.ReLU(),
            nn.Linear(512, 2)
        )
    
    def forward(self, x):
        return self.net(x)

def get_weight_magnitude(model):
    total_magnitude = 0.0
    num_params = 0
    for param in model.parameters():
        if param.requires_grad:
            total_magnitude += torch.norm(param, p=2).item()
            num_params += 1
    return total_magnitude / num_params if num_params > 0 else 0

def train_epoch(model, dataloader, criterion, optimizer, device, use_explicit_l2=False, lambda_l2=0.01):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        
        if use_explicit_l2:
            base_loss = criterion(outputs, labels)
            l2_reg = torch.tensor(0., requires_grad=True).to(device)
            for param in model.parameters():
                if param.requires_grad:
                    l2_reg = l2_reg + torch.norm(param, p=2) ** 2
            loss = base_loss + lambda_l2 * l2_reg
        else:
            loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    return running_loss / len(dataloader), 100 * correct / total

def train_model(weight_decay=0, use_explicit_l2=False):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    batch_size = 32
    
    train_dataset = MyDataset(transform=preprocess, split="train")
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    model = CatsDogsCNN().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=weight_decay)
    
    weight_magnitudes = []
    for epoch in range(5):
        loss, acc = train_epoch(model, train_dataloader, criterion, optimizer, device, use_explicit_l2)
        weight_mag = get_weight_magnitude(model)
        weight_magnitudes.append(weight_mag)
        print(f"Epoch {epoch+1}: Loss {loss:.4f}, Acc {acc:.2f}%, Weight Mag {weight_mag:.6f}")
    
    return weight_magnitudes

print("Weight Decay Experiment")
wd_magnitudes = train_model(weight_decay=0.01)

print("\nExplicit L2 Experiment")
explicit_magnitudes = train_model(use_explicit_l2=True)

print("\nResults")
print("Weight Decay:", [f"{x:.6f}" for x in wd_magnitudes])
print("Explicit L2:", [f"{x:.6f}" for x in explicit_magnitudes])

Weight Decay Experiment
Epoch 1: Loss 4.9545, Acc 55.50%, Weight Mag 2.828387
Epoch 2: Loss 0.6619, Acc 64.35%, Weight Mag 2.473258
Epoch 3: Loss 0.6123, Acc 68.80%, Weight Mag 2.280278
Epoch 4: Loss 0.5812, Acc 70.40%, Weight Mag 2.213264
Epoch 5: Loss 0.5353, Acc 73.85%, Weight Mag 2.155546

Explicit L2 Experiment
Epoch 1: Loss 9.5080, Acc 54.00%, Weight Mag 2.601229
Epoch 2: Loss 3.1271, Acc 61.80%, Weight Mag 2.220500
Epoch 3: Loss 2.5091, Acc 65.00%, Weight Mag 2.044233
Epoch 4: Loss 2.2283, Acc 67.95%, Weight Mag 1.990941
Epoch 5: Loss 2.0344, Acc 72.00%, Weight Mag 1.932163

Results
Weight Decay: ['2.828387', '2.473258', '2.280278', '2.213264', '2.155546']
Explicit L2: ['2.601229', '2.220500', '2.044233', '1.990941', '1.932163']


In [4]:
class CatsDogsCNNWithDropout(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout2d(dropout_rate),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout2d(dropout_rate),
            nn.Flatten(),
            nn.Linear(64 * 32 * 32, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 2)
        )
    
    def forward(self, x):
        return self.net(x)

def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return running_loss / len(dataloader), 100 * correct / total

def train_model_with_eval(model_class, dropout_rate=0, weight_decay=0):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    batch_size = 32
    
    train_dataset = MyDataset(transform=preprocess, split="train")
    val_dataset = MyDataset(transform=preprocess, split="validation")
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    model = model_class(dropout_rate=dropout_rate).to(device) if dropout_rate > 0 else model_class().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=weight_decay)
    
    train_losses, train_accs, val_losses, val_accs = [], [], [], []
    weight_magnitudes = []
    
    for epoch in range(5):
        train_loss, train_acc = train_epoch(model, train_dataloader, criterion, optimizer, device)
        val_loss, val_acc = evaluate_model(model, val_dataloader, criterion, device)
        weight_mag = get_weight_magnitude(model)
        
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        weight_magnitudes.append(weight_mag)
        
        print(f"Epoch {epoch+1}:")
        print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.2f}%")
        print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")
        print(f"Weight Mag: {weight_mag:.6f}")
    
    return train_losses, train_accs, val_losses, val_accs, weight_magnitudes

def compare_dropout_experiments():
    print("Training without Dropout")
    no_dropout_results = train_model_with_eval(CatsDogsCNN, dropout_rate=0)
    
    print("\nTraining with Dropout (rate=0.5)")
    dropout_results = train_model_with_eval(CatsDogsCNNWithDropout, dropout_rate=0.5)
    
    print("\nResults Comparison:")
    print("Without Dropout:")
    print(f"Final Train Loss: {no_dropout_results[0][-1]:.4f}, Train Acc: {no_dropout_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {no_dropout_results[2][-1]:.4f}, Val Acc: {no_dropout_results[3][-1]:.2f}%")
    
    print("\nWith Dropout:")
    print(f"Final Train Loss: {dropout_results[0][-1]:.4f}, Train Acc: {dropout_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {dropout_results[2][-1]:.4f}, Val Acc: {dropout_results[3][-1]:.2f}%")
    
    # Calculate overfitting gap
    no_dropout_gap = no_dropout_results[1][-1] - no_dropout_results[3][-1]
    dropout_gap = dropout_results[1][-1] - dropout_results[3][-1]
    print("\nOverfitting Analysis:")
    print(f"Without Dropout - Train-Val Acc Gap: {no_dropout_gap:.2f}%")
    print(f"With Dropout    - Train-Val Acc Gap: {dropout_gap:.2f}%")

compare_dropout_experiments()

Training without Dropout
Epoch 1:
Train - Loss: 5.1461, Acc: 58.25%
Val   - Loss: 0.6986, Acc: 62.30%
Weight Mag: 3.933904
Epoch 2:
Train - Loss: 0.6063, Acc: 68.15%
Val   - Loss: 0.6332, Acc: 68.30%
Weight Mag: 3.955605
Epoch 3:
Train - Loss: 0.5339, Acc: 72.90%
Val   - Loss: 0.6223, Acc: 68.80%
Weight Mag: 3.975358
Epoch 4:
Train - Loss: 0.4853, Acc: 75.50%
Val   - Loss: 0.6528, Acc: 66.60%
Weight Mag: 3.995235
Epoch 5:
Train - Loss: 0.4574, Acc: 78.40%
Val   - Loss: 0.6360, Acc: 67.40%
Weight Mag: 4.020451

Training with Dropout (rate=0.5)
Epoch 1:
Train - Loss: 4.5760, Acc: 51.70%
Val   - Loss: 0.6933, Acc: 52.80%
Weight Mag: 3.973997
Epoch 2:
Train - Loss: 0.7046, Acc: 53.60%
Val   - Loss: 0.6718, Acc: 58.60%
Weight Mag: 4.032490
Epoch 3:
Train - Loss: 0.6901, Acc: 55.70%
Val   - Loss: 0.6723, Acc: 59.80%
Weight Mag: 4.078828
Epoch 4:
Train - Loss: 0.6960, Acc: 57.55%
Val   - Loss: 0.6662, Acc: 59.80%
Weight Mag: 4.131607
Epoch 5:
Train - Loss: 0.6812, Acc: 57.90%
Val   - Loss: 0.

In [None]:
import torch.distributions as dist

class CustomDropout(nn.Module):
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p

    def forward(self, x):
        if self.training:
            # Create Bernoulli distribution with probability (1-p) of keeping a neuron
            bernoulli = dist.Bernoulli(probs=1-self.p)
            # Generate mask
            mask = bernoulli.sample(x.size()).to(x.device)
            # Apply mask and scale
            return x * mask / (1-self.p)
        return x

class CatsDogsCNNWithCustomDropout(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            CustomDropout(dropout_rate),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            CustomDropout(dropout_rate),
            nn.Flatten(),
            nn.Linear(64 * 32 * 32, 512),
            nn.ReLU(),
            CustomDropout(dropout_rate),
            nn.Linear(512, 2)
        )
    
    def forward(self, x):
        return self.net(x)

def compare_dropout_implementations():
    print("Training with Built-in Dropout")
    builtin_results = train_model_with_eval(CatsDogsCNNWithDropout, dropout_rate=0.5)
    
    print("\nTraining with Custom Dropout")
    custom_results = train_model_with_eval(CatsDogsCNNWithCustomDropout, dropout_rate=0.5)
    
    print("\nResults Comparison:")
    print("Built-in Dropout:")
    print(f"Final Train Loss: {builtin_results[0][-1]:.4f}, Train Acc: {builtin_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {builtin_results[2][-1]:.4f}, Val Acc: {builtin_results[3][-1]:.2f}%")
    
    print("\nCustom Dropout:")
    print(f"Final Train Loss: {custom_results[0][-1]:.4f}, Train Acc: {custom_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {custom_results[2][-1]:.4f}, Val Acc: {custom_results[3][-1]:.2f}%")
    
    # Calculate overfitting gap
    builtin_gap = builtin_results[1][-1] - builtin_results[3][-1]
    custom_gap = custom_results[1][-1] - custom_results[3][-1]
    print("\nOverfitting Analysis:")
    print(f"Built-in Dropout - Train-Val Acc Gap: {builtin_gap:.2f}%")
    print(f"Custom Dropout   - Train-Val Acc Gap: {custom_gap:.2f}%")

In [None]:
def train_model_with_early_stopping(model_class, dropout_rate=0.5, patience=2, weight_decay=0):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    batch_size = 32
    
    train_dataset = MyDataset(transform=preprocess, split="train")
    val_dataset = MyDataset(transform=preprocess, split="validation")
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    model = model_class(dropout_rate=dropout_rate).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=weight_decay)
    
    train_losses, train_accs, val_losses, val_accs = [], [], [], []
    weight_magnitudes = []
    
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None
    
    for epoch in range(100):  # Large number of epochs, early stopping will handle termination
        train_loss, train_acc = train_epoch(model, train_dataloader, criterion, optimizer, device)
        val_loss, val_acc = evaluate_model(model, val_dataloader, criterion, device)
        weight_mag = get_weight_magnitude(model)
        
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        weight_magnitudes.append(weight_mag)
        
        print(f"Epoch {epoch+1}:")
        print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.2f}%")
        print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.2f}%")
        print(f"Weight Mag: {weight_mag:.6f}")
        
        # Early stopping logic
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
            best_model_state = model.state_dict()
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve >= patience:
            print(f"\nEarly stopping triggered after {epoch+1} epochs")
            model.load_state_dict(best_model_state)
            break
    
    return train_losses, train_accs, val_losses, val_accs, weight_magnitudes

def compare_early_stopping():
    print("Training without Early Stopping (5 epochs)")
    no_es_results = train_model_with_eval(CatsDogsCNNWithDropout, dropout_rate=0.5)
    
    print("\nTraining with Early Stopping (patience=2)")
    es_results = train_model_with_early_stopping(CatsDogsCNNWithDropout, dropout_rate=0.5, patience=2)
    
    print("\nResults Comparison:")
    print("Without Early Stopping:")
    print(f"Final Train Loss: {no_es_results[0][-1]:.4f}, Train Acc: {no_es_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {no_es_results[2][-1]:.4f}, Val Acc: {no_es_results[3][-1]:.2f}%")
    
    print("\nWith Early Stopping:")
    print(f"Final Train Loss: {es_results[0][-1]:.4f}, Train Acc: {es_results[1][-1]:.2f}%")
    print(f"Final Val Loss: {es_results[2][-1]:.4f}, Val Acc: {es_results[3][-1]:.2f}%")
    
    # Calculate overfitting gap
    no_es_gap = no_es_results[1][-1] - no_es_results[3][-1]
    es_gap = es_results[1][-1] - es_results[3][-1]
    print("\nOverfitting Analysis:")
    print(f"Without Early Stopping - Train-Val Acc Gap: {no_es_gap:.2f}%")
    print(f"With Early Stopping    - Train-Val Acc Gap: {es_gap:.2f}%")