In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models
import numpy as np
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import pandas as pd
import time
import os
from tqdm.auto import tqdm


## Experiment Configuration and Details

### Variables:
- **Datasets**: MNIST, FashionMNIST
- **Models**: ResNet-18, ResNet-50
- **Batch Size**: 16
- **Optimizers**: SGD, Adam
- **Learning Rates**: 0.001, 0.0001
- **Epochs**: 3, 5
- **Pin Memory**: False, True

### Constants:
- **USE_AMP**: True (Automatic Mixed Precision enabled)
- **Train-Val-Test Split**: 70%-10%-20%
- **Image Size**: 64x64 (resized from 28x28)

## FLOPs Calculation

The `count_model_flops()` function calculates actual FLOPs (Floating Point Operations) by:
1. **Profiling the model** with a 64×64×3 input tensor
2. **Counting operations** in:
   - Conv2D layers: kernel_size × channels × output_elements
   - Linear layers: in_features × out_features  
   - BatchNorm layers: 2 × number_of_elements

This gives us the **real computational cost** for each model configuration, not just estimates.


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
os.makedirs('results', exist_ok=True)
os.makedirs('models', exist_ok=True)


In [None]:
def load_and_split_dataset(dataset_name, batch_size, pin_memory=False):
    transform_mnist = transforms.Compose([
        transforms.Grayscale(3),
        transforms.Resize(64),
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
    ])
    
    if dataset_name == 'MNIST':
        full_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform_mnist)
        test_dataset = datasets.MNIST('./data', train=False, download=True, transform=transform_mnist)
    else:
        full_dataset = datasets.FashionMNIST('./data', train=True, download=True, transform=transform_mnist)
        test_dataset = datasets.FashionMNIST('./data', train=False, download=True, transform=transform_mnist)
    
    train_size = int(0.7 * len(full_dataset))
    val_size = int(0.1 * len(full_dataset))
    remaining = len(full_dataset) - train_size - val_size
    
    train_dataset, val_dataset, _ = random_split(
        full_dataset, 
        [train_size, val_size, remaining],
        generator=torch.Generator().manual_seed(42)
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=pin_memory)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=pin_memory)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=pin_memory)
    
    return train_loader, val_loader, test_loader


In [None]:
def get_model(model_name, num_classes=10):
    if model_name == 'ResNet-18':
        model = models.resnet18(weights=None)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'ResNet-50':
        model = models.resnet50(weights=None)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model


In [None]:
def train_epoch(model, loader, criterion, optimizer, device, scaler, use_amp=True):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    pbar = tqdm(loader, desc='Training', leave=False)
    for inputs, labels in pbar:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        if use_amp and device.type == 'cuda':
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        pbar.set_postfix({'loss': f'{running_loss/len(loader):.4f}', 'acc': f'{100.*correct/total:.2f}%'})
    
    return running_loss / len(loader), 100. * correct / total


In [None]:
def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    return running_loss / len(loader), 100. * correct / total


In [None]:
def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs=3, use_amp=True):
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []
    best_val_acc = 0.0
    best_model_state = None
    scaler = torch.cuda.amp.GradScaler()
    
    for epoch in tqdm(range(epochs), desc='Epochs'):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device, scaler, use_amp)
        val_loss, val_acc = validate(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        tqdm.write(f'Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return model, train_losses, train_accs, val_losses, val_accs


In [None]:
def plot_training_curves(train_losses, train_accs, val_losses, val_accs, save_path):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    ax1.plot(train_losses, label='Train Loss')
    ax1.plot(val_losses, label='Val Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)
    
    ax2.plot(train_accs, label='Train Accuracy')
    ax2.plot(val_accs, label='Val Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()


In [None]:
results_q1a = []
epoch_values = [3, 5]
pin_memory_values = [False, True]
USE_AMP = True

configs = [
    {'batch_size': 16, 'optimizer': 'SGD', 'lr': 0.001},
    {'batch_size': 16, 'optimizer': 'SGD', 'lr': 0.0001},
    {'batch_size': 16, 'optimizer': 'Adam', 'lr': 0.001},
    {'batch_size': 16, 'optimizer': 'Adam', 'lr': 0.0001},
]


In [None]:
total_experiments = len(['MNIST', 'FashionMNIST']) * len(configs) * len(['ResNet-18', 'ResNet-50']) * len(epoch_values) * len(pin_memory_values)
exp_counter = 0

for dataset_name in ['MNIST', 'FashionMNIST']:
    for config in configs:
        batch_size = config['batch_size']
        opt_name = config['optimizer']
        lr = config['lr']
        
        for pin_mem in pin_memory_values:
            train_loader, val_loader, test_loader = load_and_split_dataset(dataset_name, batch_size, pin_mem)
            
            for model_name in ['ResNet-18', 'ResNet-50']:
                for epochs in epoch_values:
                    exp_counter += 1
                    exp_name = f"{dataset_name}_{model_name}_bs{batch_size}_{opt_name}_lr{lr}_ep{epochs}_pm{pin_mem}"
                    print(f"\n{'='*80}")
                    print(f"Experiment {exp_counter}/{total_experiments}: {exp_name}")
                    print(f"{'='*80}")
                    
                    model = get_model(model_name, num_classes=10).to(device)
                    criterion = nn.CrossEntropyLoss()
                    
                    if opt_name == 'SGD':
                        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
                    else:
                        optimizer = optim.Adam(model.parameters(), lr=lr)
                    
                    model, train_losses, train_accs, val_losses, val_accs = train_model(
                        model, train_loader, val_loader, optimizer, criterion, device, epochs, USE_AMP
                    )
                    
                    _, test_acc = validate(model, test_loader, criterion, device)
                    
                    torch.save(model.state_dict(), f'models/{exp_name}.pth')
                    plot_training_curves(train_losses, train_accs, val_losses, val_accs, 
                                       f'results/{exp_name}.png')
                    
                    print(f"✓ Completed - Test Accuracy: {test_acc:.2f}%")
                    
                    results_q1a.append({
                        'Dataset': dataset_name,
                        'Batch Size': batch_size,
                        'Optimizer': opt_name,
                        'Learning Rate': lr,
                        'Epochs': epochs,
                        'Pin Memory': pin_mem,
                        'USE_AMP': USE_AMP,
                        'Model': model_name,
                        'Test Accuracy': f"{test_acc:.2f}"
                    })


In [None]:
df_q1a = pd.DataFrame(results_q1a)
df_q1a.to_csv('results/q1a_results.csv', index=False)
df_q1a


In [None]:
def load_dataset_for_svm(dataset_name):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])
    
    if dataset_name == 'MNIST':
        train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
        test_dataset = datasets.MNIST('./data', train=False, download=True, transform=transform)
    else:
        train_dataset = datasets.FashionMNIST('./data', train=True, download=True, transform=transform)
        test_dataset = datasets.FashionMNIST('./data', train=False, download=True, transform=transform)
    
    X_train = train_dataset.data.numpy().reshape(len(train_dataset), -1) / 255.0
    y_train = train_dataset.targets.numpy()
    X_test = test_dataset.data.numpy().reshape(len(test_dataset), -1) / 255.0
    y_test = test_dataset.targets.numpy()
    
    sample_size = min(10000, len(X_train))
    indices = np.random.choice(len(X_train), sample_size, replace=False)
    X_train = X_train[indices]
    y_train = y_train[indices]
    
    return X_train, y_train, X_test, y_test


In [None]:
results_q1b = []

print(f"\n{'='*80}")
print("Q1(b) - SVM Training")
print(f"{'='*80}")

for dataset_name in ['MNIST', 'FashionMNIST']:
    X_train, y_train, X_test, y_test = load_dataset_for_svm(dataset_name)
    
    for kernel in ['poly', 'rbf']:
        print(f"\nTraining SVM on {dataset_name} with {kernel} kernel...")
        start_time = time.time()
        
        svm = SVC(kernel=kernel, gamma='scale', max_iter=1000)
        svm.fit(X_train, y_train)
        
        train_time = (time.time() - start_time) * 1000
        
        y_pred = svm.predict(X_test)
        test_acc = accuracy_score(y_test, y_pred) * 100
        
        print(f"✓ Completed - Test Accuracy: {test_acc:.2f}%, Training Time: {train_time:.2f}ms")
        
        results_q1b.append({
            'Dataset': dataset_name,
            'Kernel': kernel,
            'Test Accuracy': f"{test_acc:.2f}",
            'Training Time (ms)': f"{train_time:.2f}"
        })


In [None]:
df_q1b = pd.DataFrame(results_q1b)
df_q1b.to_csv('results/q1b_results.csv', index=False)
df_q1b


In [None]:
def count_model_flops(model, input_size=64, device='cpu'):
    model_copy = model.to('cpu')
    model_copy.eval()
    
    input_tensor = torch.randn(1, 3, input_size, input_size)
    
    total_flops = 0
    
    def count_conv2d(m, x, y):
        nonlocal total_flops
        cin = m.in_channels
        kernel_ops = m.kernel_size[0] * m.kernel_size[1] * (cin // m.groups)
        output_elements = y.numel()
        total_flops += kernel_ops * output_elements
    
    def count_linear(m, x, y):
        nonlocal total_flops
        total_flops += m.in_features * m.out_features
    
    def count_bn(m, x, y):
        nonlocal total_flops
        total_flops += 2 * x[0].numel()
    
    hooks = []
    for name, module in model_copy.named_modules():
        if isinstance(module, nn.Conv2d):
            hooks.append(module.register_forward_hook(count_conv2d))
        elif isinstance(module, nn.Linear):
            hooks.append(module.register_forward_hook(count_linear))
        elif isinstance(module, (nn.BatchNorm2d, nn.BatchNorm1d)):
            hooks.append(module.register_forward_hook(count_bn))
    
    with torch.no_grad():
        model_copy(input_tensor)
    
    for hook in hooks:
        hook.remove()
    
    model.to(device)
    return total_flops


In [None]:
sample_resnet18 = get_model('ResNet-18', num_classes=10)
sample_resnet50 = get_model('ResNet-50', num_classes=10)

flops_r18 = count_model_flops(sample_resnet18, input_size=64)
flops_r50 = count_model_flops(sample_resnet50, input_size=64)

print(f"Actual FLOPs for 64x64 input:")
print(f"ResNet-18: {flops_r18:,} FLOPs ({flops_r18:.2e})")
print(f"ResNet-50: {flops_r50:,} FLOPs ({flops_r50:.2e})")

del sample_resnet18, sample_resnet50


In [None]:
results_q2 = []
dataset_name = 'FashionMNIST'
batch_size = 16
epoch_values_q2 = [3, 5]
pin_memory_values_q2 = [False, True]
USE_AMP_Q2 = True

configs_q2 = [
    {'optimizer': 'SGD', 'lr': 0.001},
    {'optimizer': 'Adam', 'lr': 0.001},
]


In [None]:
print(f"\n{'='*80}")
print("Q2 - CPU vs GPU Performance")
print(f"{'='*80}")

for compute in ['CPU', 'GPU']:
    device_q2 = torch.device('cpu' if compute == 'CPU' else 'cuda')
    
    if compute == 'GPU' and not torch.cuda.is_available():
        print(f"\n⚠ GPU not available, skipping GPU experiments")
        continue
    
    print(f"\n--- Running on {compute} ---")
    
    for pin_mem in pin_memory_values_q2:
        train_loader, val_loader, test_loader = load_and_split_dataset(dataset_name, batch_size, pin_mem)
        
        for config in configs_q2:
            opt_name = config['optimizer']
            lr = config['lr']
            
            for model_name in ['ResNet-18', 'ResNet-50']:
                for epochs in epoch_values_q2:
                    exp_name = f"{compute}_{model_name}_{opt_name}_lr{lr}_ep{epochs}_pm{pin_mem}"
                    print(f"\n{exp_name}")
                    
                    model = get_model(model_name, num_classes=10).to(device_q2)
                    criterion = nn.CrossEntropyLoss()
                    
                    if opt_name == 'SGD':
                        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
                    else:
                        optimizer = optim.Adam(model.parameters(), lr=lr)
                    
                    start_time = time.time()
                    model, _, _, _, _ = train_model(
                        model, train_loader, val_loader, optimizer, criterion, device_q2, epochs, USE_AMP_Q2
                    )
                    train_time = (time.time() - start_time) * 1000
                    
                    _, test_acc = validate(model, test_loader, criterion, device_q2)
                    
                    flops = count_model_flops(model, input_size=64, device=device_q2)
                    
                    print(f"✓ Test Accuracy: {test_acc:.2f}%, Training Time: {train_time:.2f}ms, FLOPs: {flops:.2e}")
                    
                    results_q2.append({
                        'Compute': compute,
                        'Batch Size': batch_size,
                        'Optimizer': opt_name,
                        'Learning Rate': lr,
                        'Epochs': epochs,
                        'Pin Memory': pin_mem,
                        'USE_AMP': USE_AMP_Q2,
                        'Model': model_name,
                        'Test Accuracy': f"{test_acc:.2f}",
                        'Training Time (ms)': f"{train_time:.2f}",
                        'FLOPs': f"{flops:.2e}"
                    })


In [None]:
df_q2 = pd.DataFrame(results_q2)
df_q2.to_csv('results/q2_results.csv', index=False)
df_q2
