## Modeling

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from collections import defaultdict
import time
import pickle

# Check device
if torch.cuda.is_available():
    device = torch.device('cuda')
elif torch.backends.mps.is_available():
    device = torch.device('mps')
else:
    device = torch.device('cpu')

print(f"Using device: {device}")

with open('cifar10_loaders.pkl', 'rb') as f:
    data = pickle.load(f)

train_loader = data['train_loader']
val_loader = data['val_loader']
test_loader = data['test_loader']
class_names = data['class_names']
batch_size = data['batch_size']

print("DataLoaders loaded successfully!")
print(f"Training batches: {len(train_loader)}")


Using device: mps
DataLoaders loaded successfully!
Training batches: 625


## Network Architecture

In [None]:
class CIFAR10Net(nn.Module):
    """Flexible CNN for CIFAR-10 with configurable activation and regularization"""
    
    def __init__(self, activation_name='relu', regularization=None, dropout_rate=0.5):
        super(CIFAR10Net, self).__init__()
        
        # Store configuration
        self.activation_name = activation_name
        self.regularization = regularization
        self.dropout_rate = dropout_rate
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2)
        
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2)
        
        self.conv5 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(2)
        
        # Global average pooling
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        
        # Classifier
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, 10)
        
        # Dropout (if used)
        self.dropout = nn.Dropout(dropout_rate)
        
    def get_activation(self):
        """Return activation function based on name"""
        if self.activation_name == 'relu':
            return F.relu
        elif self.activation_name == 'leaky_relu':
            return F.leaky_relu
        elif self.activation_name == 'elu':
            return F.elu
        elif self.activation_name == 'tanh':
            return torch.tanh
        else:
            return F.relu
    
    def forward(self, x):
        act = self.get_activation()
        
        # Block 1
        x = act(self.bn1(self.conv1(x)))
        x = act(self.bn2(self.conv2(x)))
        x = self.pool1(x)
        if self.regularization == 'dropout':
            x = self.dropout(x)
        
        # Block 2
        x = act(self.bn3(self.conv3(x)))
        x = act(self.bn4(self.conv4(x)))
        x = self.pool2(x)
        if self.regularization == 'dropout':
            x = self.dropout(x)
        
        # Block 3
        x = act(self.bn5(self.conv5(x)))
        x = act(self.bn6(self.conv6(x)))
        x = self.pool3(x)
        if self.regularization == 'dropout':
            x = self.dropout(x)
        
        # Classifier
        x = self.gap(x)
        x = x.view(x.size(0), -1)
        x = act(self.fc1(x))
        if self.regularization == 'dropout':
            x = self.dropout(x)
        x = self.fc2(x)
        
        return x

# Test the network
print("="*60)
print("TESTING NETWORK ARCHITECTURE")
print("="*60)

test_net = CIFAR10Net(activation_name='relu', regularization='dropout').to(device)
test_input = torch.randn(4, 3, 32, 32).to(device)
test_output = test_net(test_input)

print(f"Network test successful!")
print(f"Input shape: {test_input.shape}")
print(f"Output shape: {test_output.shape}")
print(f"Total parameters: {sum(p.numel() for p in test_net.parameters()):,}")

# Test different configurations
print("\nTesting different configurations:")
for activation in ['relu', 'leaky_relu', 'elu', 'tanh']:
    for reg in [None, 'dropout']:
        net = CIFAR10Net(activation_name=activation, regularization=reg).to(device)
        params = sum(p.numel() for p in net.parameters())
        print(f"  {activation:12s} + {str(reg):8s}: {params:,} parameters")

TESTING NETWORK ARCHITECTURE
âœ… Network test successful!
  Input shape: torch.Size([4, 3, 32, 32])
  Output shape: torch.Size([4, 10])
  Total parameters: 323,498

ðŸ“Š Testing different configurations:
  relu         + None    : 323,498 parameters
  relu         + dropout : 323,498 parameters
  leaky_relu   + None    : 323,498 parameters
  leaky_relu   + dropout : 323,498 parameters
  elu          + None    : 323,498 parameters
  elu          + dropout : 323,498 parameters
  tanh         + None    : 323,498 parameters
  tanh         + dropout : 323,498 parameters


## Training

In [None]:
def train_model(model, train_loader, val_loader, epochs=15, lr=0.001, weight_decay=0):
    """
    Train a model and return history with timing information
    """
    # Move model to device
    model = model.to(device)
    
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    
    # Learning rate scheduler (optional - reduces LR when plateau is reached)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)
    
    # Training history
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'epoch_time': []
    }
    
    start_time = time.time()
    
    for epoch in range(epochs):
        epoch_start = time.time()
        
        # Training phase
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Add L2 regularization if specified (and not using optimizer weight_decay)
            if model.regularization == 'l2' and weight_decay == 0:
                l2_reg = 0
                for param in model.parameters():
                    l2_reg += torch.norm(param, 2)
                loss += 0.001 * l2_reg
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
        
        # Validation phase
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        # Calculate metrics
        train_loss = train_loss / len(train_loader)
        train_acc = 100. * train_correct / train_total
        val_loss = val_loss / len(val_loader)
        val_acc = 100. * val_correct / val_total
        
        # Update learning rate scheduler
        scheduler.step(val_loss)
        
        # Store history
        epoch_time = time.time() - epoch_start
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['epoch_time'].append(epoch_time)
        
        # Print progress
        print(f'Epoch {epoch+1:2d}/{epochs} | '
                f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | '
                f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}% | '
                f'Time: {epoch_time:.2f}s')
    
    total_time = time.time() - start_time
    print(f'\nTraining completed in {total_time:.2f}s ({total_time/60:.2f} minutes)')
    
    # Add final test accuracy to history
    model.eval()
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            test_total += labels.size(0)
            test_correct += predicted.eq(labels).sum().item()
    
    history['test_acc'] = 100. * test_correct / test_total
    print(f'Test Accuracy: {history["test_acc"]:.2f}%')
    
    return history
