<a href="https://colab.research.google.com/github/Mustaq7777777/DL-Assignment-2/blob/main/DA6401_Assignment2_partA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Importing required Libraries

In [None]:
import os
import math
import wandb
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from torch.amp import GradScaler, autocast

Setup and Configuration

In [None]:
# # Mount Google Drive for data access
# from google.colab import drive
# drive.mount('/content/drive')

#using kaggle

wandb.login(key="c4db2008beb715972687303f6cbced62af338b92")

# Define path to dataset
BASE_PATH = '/content/drive/MyDrive/DL-Assignment2-data/inaturalist_12K'

Utility Functions

In [None]:
# Calculate output dimensions after convolution operation
def calculate_output_dimensions(input_size, kernel_size, stride=1, padding=0):
    """Calculate the output dimensions after applying convolution"""
    return math.floor((input_size - kernel_size + 2*padding) / stride) + 1

Data Preparation(train, validation and test)

In [None]:

def get_data_loaders(cfg):
    """
    Prepare data loaders for training, validation and testing

    Args:
        cfg: Configuration object containing data parameters

    Returns:
        Tuple of (train_loader, val_loader, test_loader)
    """
    # Define transformations based on augmentation flag
    if cfg.augmentation:
        # More aggressive transformations for training
        train_transforms = transforms.Compose([
            transforms.Resize((cfg.img_size, cfg.img_size)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(30),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    else:
        # Basic transformations without augmentation
        train_transforms = transforms.Compose([
            transforms.Resize((cfg.img_size, cfg.img_size)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    # Validation transforms (no augmentation needed)
    val_transforms = transforms.Compose([
        transforms.Resize((cfg.img_size, cfg.img_size)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Load datasets
    train_dataset = datasets.ImageFolder(os.path.join(BASE_PATH, 'train'), transform=train_transforms)
    test_dataset = datasets.ImageFolder(os.path.join(BASE_PATH, 'val'), transform=val_transforms)

    # Split training data to create validation set
    indices = list(range(len(train_dataset)))
    train_indices, val_indices = train_test_split(
        indices,
        test_size=0.2,  # 20% for validation
        stratify=train_dataset.targets,  # Maintain class distribution
        random_state=42  # For reproducibility
    )

    # Create subsets
    train_subset = Subset(train_dataset, train_indices)
    val_subset = Subset(train_dataset, val_indices)

    # Get number of CPU cores for worker calculation
    num_workers = min(2, os.cpu_count() or 1)  # Use at most 2 workers to avoid warning

    # Create and return data loaders
    return (
        DataLoader(train_subset, batch_size=cfg.batch_size, shuffle=True,
                   num_workers=num_workers, pin_memory=True),
        DataLoader(val_subset, batch_size=cfg.batch_size, shuffle=False,
                   num_workers=num_workers, pin_memory=True),
        DataLoader(test_dataset, batch_size=cfg.batch_size, shuffle=False,
                   num_workers=num_workers, pin_memory=True)
    )


Convolution Nueral Network implemented as class

In [None]:

class CNN(nn.Module):
    """
    Convolutional Neural Network with configurable architecture
    - Variable number of convolutional layers
    - Configurable filter sizes and counts
    - Choice of activation functions
    - Optional batch normalization
    """
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg

        # Initialize lists to hold network components
        self.conv_blocks = nn.ModuleList()

        # Track dimensions for proper sizing of fully connected layer
        in_channels = 3  # RGB input
        current_size = cfg.img_size

        # Create convolutional blocks
        for i, (out_channels, kernel_size) in enumerate(zip(cfg.num_filters, cfg.filter_sizes)):
            # Create a block with conv, optional batchnorm, activation, and pooling
            block = self._create_conv_block(
                in_channels,
                out_channels,
                kernel_size,
                use_batchnorm=cfg.batch_norm,
                activation=cfg.activation
            )
            self.conv_blocks.append(block)

            # Update dimensions for next layer
            current_size = calculate_output_dimensions(current_size, kernel_size, padding=1)
            current_size = calculate_output_dimensions(current_size, 2, stride=2)  # pooling
            in_channels = out_channels

        # Adaptive pooling ensures fixed size regardless of input dimensions
        self.adaptive_pool = nn.AdaptiveAvgPool2d((6, 6))

        # Fully connected classification layers
        self.classifier = self._create_classifier(
            in_channels * 6 * 6,  # Flattened feature maps
            cfg.fc_hidden_sizes,
            10,  # Number of classes
            cfg.dropout,
            cfg.batch_norm,
            cfg.activation
        )

    def _create_conv_block(self, in_channels, out_channels, kernel_size,
                           use_batchnorm=True, activation='relu'):
        """Create a convolutional block with optional batch normalization"""
        layers = [
            nn.Conv2d(in_channels, out_channels, kernel_size, padding=1),
        ]

        if use_batchnorm:
            layers.append(nn.BatchNorm2d(out_channels))

        layers.append(self._get_activation_function(activation))
        layers.append(nn.MaxPool2d(2, 2))

        return nn.Sequential(*layers)

    def _create_classifier(self, in_features, hidden_size, num_classes,
                           dropout_rate, use_batchnorm, activation):
        """Create the classifier part of the network"""
        layers = [
            nn.Linear(in_features, hidden_size),
        ]

        if use_batchnorm:
            layers.append(nn.BatchNorm1d(hidden_size))

        layers.extend([
            self._get_activation_function(activation),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, num_classes)
        ])

        return nn.Sequential(*layers)

    def _get_activation_function(self, name):
        """Return the appropriate activation function based on name"""
        activation_functions = {
            'relu': nn.ReLU(),
            'gelu': nn.GELU(),
            'silu': nn.SiLU(),
            'mish': nn.Mish(),
            'elu': nn.ELU(),
            'selu': nn.SELU()
        }
        return activation_functions.get(name.lower(), nn.ReLU())

    def forward(self, x):
        """Forward pass through the network"""
        # Pass input through convolutional blocks
        for block in self.conv_blocks:
            x = block(x)

        # Global pooling and flatten
        x = self.adaptive_pool(x)
        x = torch.flatten(x, 1)

        # Classification
        return self.classifier(x)

Training

In [None]:

def train():
    """Main training function that handles the entire training process"""
    # Initialize wandb with default configuration
    wandb.init(config=DEFAULT_CONFIG, reinit=True)
    cfg = wandb.config

    # Set fixed image size
    cfg.img_size = 400

    # Create run name in the requested format
    wandb.run.name = "optimizer {} activation {} num_filters {} dropout {} filter_sizes {} batch_size {} augmentation {} weight_decay {} batch_norm {}".format(
        cfg.optimizer,
        cfg.activation,
        cfg.num_filters,
        cfg.dropout,
        cfg.filter_sizes,
        cfg.batch_size,
        cfg.augmentation,
        cfg.weight_decay,
        cfg.batch_norm
    )

    # Set device (GPU if available, otherwise CPU)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    torch.backends.cudnn.benchmark = True  # For faster training

    # Get data loaders
    train_loader, val_loader, test_loader = get_data_loaders(cfg)

    # Initialize model
    model = CNN(cfg)
    model = nn.DataParallel(model)
    model = model.to(device)

    # Select optimizer based on configuration
    if cfg.optimizer.lower() == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=cfg.learning_rate, weight_decay=cfg.weight_decay)
    elif cfg.optimizer.lower() == 'nadam':
        optimizer = optim.NAdam(model.parameters(), lr=cfg.learning_rate, weight_decay=cfg.weight_decay)
    elif cfg.optimizer.lower() == 'rmsprop':
        optimizer = optim.RMSprop(model.parameters(), lr=cfg.learning_rate, weight_decay=cfg.weight_decay)
    else:
        # Default to NAdam if unspecified
        optimizer = optim.NAdam(model.parameters(), lr=cfg.learning_rate, weight_decay=cfg.weight_decay)

    # Loss function
    criterion = nn.CrossEntropyLoss()

    # Mixed precision training for better performance
    # Fixed to use new API format
    scaler = GradScaler('cuda')

    # Tracking metrics
    train_loss_history = []
    val_loss_history = []
    train_acc_history = []
    val_acc_history = []

    best_val_accuracy = 0.0
    epochs = 10  # Fixed number of epochs

    # Training loop
    for epoch in range(1, epochs+1):
        # ---------- TRAINING PHASE ----------
        model.train()
        running_loss, correct, total = 0, 0, 0

        # Process batches
        for inputs, targets in tqdm(train_loader, desc=f"Training epoch {epoch}/{epochs}"):
            inputs, targets = inputs.to(device), targets.to(device)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass with mixed precision
            # Fixed to use new API format
            with autocast('cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, targets)

            # Backward pass with gradient scaling
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # Update statistics
            running_loss += loss.item()
            predictions = outputs.argmax(1)
            correct += (predictions == targets).sum().item()
            total += targets.size(0)

        # Calculate epoch metrics
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        train_loss_history.append(train_loss)
        train_acc_history.append(train_accuracy)

        # ---------- VALIDATION PHASE ----------
        model.eval()
        val_loss, val_correct, val_total = 0, 0, 0

        # No gradient calculation needed for validation
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)

                # Calculate loss
                val_loss += criterion(outputs, targets).item()

                # Calculate accuracy
                predictions = outputs.argmax(1)
                val_correct += (predictions == targets).sum().item()
                val_total += targets.size(0)

        # Calculate validation metrics
        val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * val_correct / val_total
        val_loss_history.append(val_loss)
        val_acc_history.append(val_accuracy)

        # Log metrics to wandb
        wandb.log({
            "epoch": epoch,
            "train_accuracy": train_accuracy,
            "train_loss": train_loss,
            "val_accuracy": val_accuracy,
            "val_loss": val_loss
        })

        # Print epoch summary
        print(f"Epoch {epoch}/{epochs}")
        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_accuracy:.2f}%")
        print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_accuracy:.2f}%")

        # Save best model
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), "best_model.pth")

    # ---------- TESTING PHASE ----------
    # Load best model for final evaluation
    model.load_state_dict(torch.load("best_model.pth"))
    test_correct, test_total = 0, 0

    # Evaluate on test set
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            predictions = model(inputs).argmax(1)
            test_correct += (predictions == targets).sum().item()
            test_total += targets.size(0)

    test_accuracy = 100 * test_correct / test_total
    wandb.log({"test_accuracy": test_accuracy})
    print(f"Test Accuracy: {test_accuracy:.2f}%")

Sweep

In [None]:

if __name__ == "__main__":
    # Hyperparameter sweep configuration
    sweep_configuration = {
        'method': 'bayes',  # Bayesian optimization for efficient hyperparameter search
        'name': 'cnn-hyperparameter-tuning',
        'metric': {
            'name': 'val_accuracy',  # Metric to optimize
            'goal': 'maximize'  # We want to maximize accuracy
        },
        'parameters': {
            # Different filter configurations
            'num_filters': {
                'values': [
                    [64, 128, 256, 512, 1024],  # Wide architecture
                    [32, 32, 32, 32, 32],       # Uniform width
                    [32, 64, 64, 128, 128],     # Gradually increasing
                    [128, 128, 64, 64, 32],     # Gradually decreasing
                    [32, 64, 128, 256, 512]     # Standard pyramid
                ]
            },
            # Kernel size variations
            'filter_sizes': {
                'values': [
                    [3, 3, 3, 3, 3],  # All small kernels
                    [5, 5, 5, 5, 5],  # All large kernels
                    [5, 3, 5, 3, 5]   # Mixed kernel sizes
                ]
            },
            # Regularization strength
            'weight_decay': {
                'values': [0, 0.0005, 0.5]
            },
            # Data augmentation toggle
            'augmentation': {
                'values': [True, False]
            },
            # Dropout rates
            'dropout': {
                'values': [0, 0.2, 0.4]
            },
            # Learning rates
            'learning_rate': {
                'values': [1e-3, 1e-4]
            },
            # Activation functions
            'activation': {
                'values': ['relu', 'elu', 'selu', 'silu', 'gelu', 'mish']
            },
            # Optimizer choices
            'optimizer': {
                'values': ['nadam', 'adam', 'rmsprop']
            },
            # Batch normalization toggle
            'batch_norm': {
                'values': [True, False]
            },
            # Batch sizes
            'batch_size': {
                'values': [32, 64]
            },
            # Fully connected layer sizes
            'fc_hidden_sizes': {
                'values': [128, 256, 512]
            }
        }
    }

    # Initialize and run sweep
    sweep_id = wandb.sweep(sweep_configuration, project="DA6401-Assignment-2")
    wandb.agent("jvw6z1oy", function=train, count=20)  # Run 20 trials