<a href="https://colab.research.google.com/github/SaiRajesh228/DA6401_Assignment2/blob/main/DA6401_Assignment2_PartA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import time
from tqdm.notebook import tqdm

import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
import wandb

import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,ChainDataset, ConcatDataset
from torch.utils.data.distributed import DistributedSampler

import matplotlib.pyplot as plt

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
def compute_image_dimensions(dataset):
    """
    Analyzes and prints the minimum and maximum height and width of images in the provided dataset.

    Args:
        dataset: A dataset object containing image samples, where each sample is a tuple (image_tensor, label).
    """
    heights = []
    widths = []

    for idx in range(len(dataset)):
        image_tensor, _ = dataset[idx]  # Extract image tensor and ignore the label
        _, height, width = image_tensor.shape  # Assuming shape is (Channels, Height, Width)
        heights.append(height)
        widths.append(width)

    print(f"X min: {min(heights)}\tX Max: {max(heights)}")
    print(f"Y min: {min(widths)}\tY Max: {max(widths)}")

In [None]:
import torch
import torch.nn as nn

class CustomImageClassifier(nn.Module):
    """
    End-to-end CNN implementation for image classification
    Supports customizable convolutional and dense layers
    """

    def __init__(self, num_classes, conv_activation, fc_activation, conv_layers_config,
                 hidden_dims, output_activation, use_conv_bn=False, use_fc_bn=False,
                 dropout_rate=None, input_size=800, input_channels=3):
        super().__init__()

        # Spatial dimensions initialization
        self.height = self.width = input_size
        self.input_channels = input_channels

        # Build convolutional blocks
        self.conv_blocks = nn.ModuleList()
        current_channels = input_channels

        for config in conv_layers_config:
            if config[0] == "conv":
                _, filters, kernel, stride, padding = config
                self.conv_blocks.append(nn.Conv2d(
                    current_channels, filters, kernel, stride, padding
                ))
                if use_conv_bn:
                    self.conv_blocks.append(nn.BatchNorm2d(filters))
                self.conv_blocks.append(conv_activation())
                current_channels = filters

            elif config[0] == "maxpool":
                _, kernel, stride = config
                self.conv_blocks.append(nn.MaxPool2d(kernel, stride))

        # Calculate flattened features size
        final_h, final_w, final_ch = self._compute_conv_output(conv_layers_config)
        flattened_size = final_h * final_w * final_ch

        # Build dense layers
        self.dense_layers = nn.ModuleList()
        current_features = flattened_size

        for hidden_dim in hidden_dims:
            self.dense_layers.append(nn.Linear(current_features, hidden_dim))
            if use_fc_bn:
                self.dense_layers.append(nn.BatchNorm1d(hidden_dim))
            self.dense_layers.append(fc_activation())
            if dropout_rate:
                self.dense_layers.append(nn.Dropout(dropout_rate))
            current_features = hidden_dim

        # Output layer
        self.output_layer = nn.Sequential(
            nn.Linear(hidden_dims[-1], num_classes),
            output_activation()
        )

        # Initialize weights
        self._initialize_weights()

    def _compute_conv_output(self, layer_configs):
        """Calculate final conv output dimensions"""
        h = w = self.height
        channels = self.input_channels
        for config in layer_configs:
            if config[0] == "conv":
                _, filters, kernel, stride, padding = config
                w = (w - kernel + 2*padding) // stride + 1
                h = (h - kernel + 2*padding) // stride + 1
                channels = filters
            elif config[0] == "maxpool":
                _, kernel, stride = config
                w = (w - kernel) // stride + 1
                h = (h - kernel) // stride + 1
        return h, w, channels

    def _initialize_weights(self):
        """Weight initialization"""
        for module in self.modules():
            if isinstance(module, nn.Conv2d):
                nn.init.kaiming_normal_(module.weight, nonlinearity='relu')
                module.bias.data.fill_(0.01)
            elif isinstance(module, nn.Linear):
                nn.init.xavier_normal_(module.weight)
                module.bias.data.fill_(0.01)

    def forward(self, x):
        for layer in self.conv_blocks:
            x = layer(x)
        x = torch.flatten(x, 1)
        for layer in self.dense_layers:
            x = layer(x)
        return self.output_layer(x)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate model with example parameters
model = CustomImageClassifier(
    num_classes=10,
    conv_activation=nn.ReLU,
    fc_activation=nn.ReLU,
    conv_layers_config=[
        ["conv", 32, 3, 1, 1],
        ["maxpool", 2, 2],
        ["conv", 64, 3, 1, 1],
        ["maxpool", 2, 2]
    ],
    hidden_dims=[512, 256],
    output_activation=nn.Sigmoid,
    input_size=800,
    input_channels=3
).to(device)

# Verify model structure
print("Model Architecture:")
print(model)

# Test with sample input
test_input = torch.randn(1, 3, 800, 800).to(device)
print("\nTest output shape:", model(test_input).shape)

In [None]:
import torch
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, ConcatDataset

class ImageDataManager:
    """Handles image dataset preparation with normalization and augmentation"""

    def __init__(self, img_size, data_root, device, standardize=False):
        self.img_size = img_size
        self.data_root = data_root
        self.device = device
        self.standardize = standardize
        self.mean, self.std = None, None

    def _compute_stats(self, subset):
        """Calculate dataset mean/std for normalization"""
        basic_transforms = transforms.Compose([
            transforms.Resize(self.img_size),
            transforms.ToTensor()
        ])

        dataset = datasets.ImageFolder(f"{self.data_root}{subset}", basic_transforms)
        loader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=2)

        mean = torch.zeros(3).to(self.device)
        std = torch.zeros(3).to(self.device)
        total = 0

        for images, _ in loader:
            images = images.to(self.device)
            batch = images.size(0)
            images = images.view(batch, 3, -1)
            mean += images.mean(2).sum(0)
            std += images.var(2).sum(0)
            total += batch

        self.mean, self.std = (mean/total).cpu(), (torch.sqrt(std/total)).cpu()
        return self.mean, self.std

    def create_loader(self, subset, batch_size=32, augmentations=None):
        """Create configured dataloader for specified dataset subset"""
        transforms_list = [
            transforms.Resize(self.img_size),
            transforms.ToTensor()
        ]

        if self.standardize:
            if self.mean is None:
                self._compute_stats(subset)
            transforms_list.append(transforms.Normalize(self.mean, self.std))

        if "train" in subset and augmentations:
            datasets_list = []
            datasets_list.append(datasets.ImageFolder(
                f"{self.data_root}{subset}",
                transforms.Compose(transforms_list)
            ))
            for aug in augmentations:
                datasets_list.append(datasets.ImageFolder(
                    f"{self.data_root}{subset}",
                    transforms.Compose([*aug, *transforms_list])
                ))
            final_dataset = ConcatDataset(datasets_list)
        else:
            final_dataset = datasets.ImageFolder(
                f"{self.data_root}{subset}",
                transforms.Compose(transforms_list)
            )

        return DataLoader(
            final_dataset,
            batch_size=batch_size,
            shuffle=("train" in subset),
            num_workers=2,
            pin_memory=True
        )

# Example usage:
if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize data manager
    data_mgr = ImageDataManager(
        img_size=(800, 800),
        data_root="/content/data/",  # Update with your path
        device=device,
        standardize=True
    )

    # Create dataloaders with example augmentations
    train_loader = data_mgr.create_loader("train/", batch_size=32,
        augmentations=[[transforms.RandomHorizontalFlip()]])

    val_loader = data_mgr.create_loader("val/", batch_size=32)

    # Test output
    sample_batch = next(iter(train_loader))
    print(f"Batch shape: {sample_batch[0].shape}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, ConcatDataset, RandomSampler
import numpy as np
import time
from tqdm import tqdm
import matplotlib.pyplot as plt

class DLExperiment:
    """End-to-end deep learning pipeline for image classification"""

    def __init__(self, img_size, data_root='/content/data/', device=None, use_wandb=False):
        self.img_size = img_size
        self.data_root = data_root
        self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.use_wandb = use_wandb
        self.class_names = None

    class ImageProcessor:
        """Handles dataset loading and preprocessing"""
        def __init__(self, img_size, data_root, device):
            self.img_size = img_size
            self.data_root = data_root
            self.device = device
            self.mean, self.std = None, None

        def _compute_stats(self, subset):
            transform = transforms.Compose([
                transforms.Resize(self.img_size),
                transforms.ToTensor()
            ])
            dataset = datasets.ImageFolder(self.data_root + subset, transform=transform)
            loader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=2)

            mean = torch.zeros(3).to(self.device)
            std = torch.zeros(3).to(self.device)
            total = 0

            for images, _ in loader:
                images = images.to(self.device)
                batch = images.size(0)
                images = images.view(batch, 3, -1)
                mean += images.mean(2).sum(0)
                std += images.var(2).sum(0)
                total += batch

            return (mean/total).cpu(), (torch.sqrt(std/total)).cpu()

        def get_loader(self, subset, batch_size=32, augmentations=None):
            transforms_list = [
                transforms.Resize(self.img_size),
                transforms.ToTensor()
            ]

            if self.mean is None:
                self.mean, self.std = self._compute_stats(subset)
            transforms_list.append(transforms.Normalize(self.mean, self.std))

            if "train" in subset and augmentations:
                dataset_list = [datasets.ImageFolder(
                    self.data_root+subset,
                    transforms.Compose(transforms_list)
                )]
                for aug in augmentations:
                    dataset_list.append(datasets.ImageFolder(
                        self.data_root+subset,
                        transforms.Compose(aug + transforms_list)
                    ))
                final_dataset = ConcatDataset(dataset_list)
            else:
                final_dataset = datasets.ImageFolder(
                    self.data_root+subset,
                    transforms.Compose(transforms_list)
                )

            return DataLoader(
                final_dataset,
                batch_size=batch_size,
                shuffle=("train" in subset),
                num_workers=2,
                pin_memory=True
            )

    class CNNModel(nn.Module):
        """Customizable CNN architecture"""
        def __init__(self, input_size, in_channels, num_classes, conv_layers, fc_layers):
            super().__init__()
            self.features = nn.ModuleList()
            current_channels = in_channels
            h, w = input_size

            for layer in conv_layers:
                if layer['type'] == 'conv':
                    self.features.append(nn.Conv2d(
                        current_channels, layer['filters'], layer['kernel'],
                        layer['stride'], layer['padding']
                    ))
                    if layer['bn']: self.features.append(nn.BatchNorm2d(layer['filters']))
                    self.features.append(self._get_activation(layer['activation']))
                    current_channels = layer['filters']
                    h = (h - layer['kernel'] + 2*layer['padding'])//layer['stride'] + 1
                    w = (w - layer['kernel'] + 2*layer['padding'])//layer['stride'] + 1
                elif layer['type'] == 'pool':
                    self.features.append(nn.MaxPool2d(layer['size'], layer['stride']))
                    h = (h - layer['size'])//layer['stride'] + 1
                    w = (w - layer['size'])//layer['stride'] + 1

            self.classifier = nn.ModuleList()
            in_features = h * w * current_channels
            for units in fc_layers:
                self.classifier.append(nn.Linear(in_features, units))
                self.classifier.append(self._get_activation('relu'))
                if layer['dropout']: self.classifier.append(nn.Dropout(layer['dropout']))
                in_features = units

            self.output = nn.Linear(in_features, num_classes)
            self._init_weights()

        def _get_activation(self, name):
            return {'relu': nn.ReLU(), 'sigmoid': nn.Sigmoid()}[name.lower()]

        def _init_weights(self):
            for m in self.modules():
                if isinstance(m, nn.Conv2d):
                    nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                    if m.bias is not None: m.bias.data.zero_()
                elif isinstance(m, nn.Linear):
                    nn.init.xavier_normal_(m.weight)
                    m.bias.data.zero_()

        def forward(self, x):
            for layer in self.features: x = layer(x)
            x = torch.flatten(x, 1)
            for layer in self.classifier: x = layer(x)
            return self.output(x)

    def setup_data(self, batch_size=32, augmentations=None):
        """Initialize data loaders"""
        processor = self.ImageProcessor(self.img_size, self.data_root, self.device)
        self.train_loader = processor.get_loader('train/', batch_size, augmentations)
        self.val_loader = processor.get_loader('val/', batch_size)
        self.test_loader = processor.get_loader('test/', batch_size)
        self.class_names = self.train_loader.dataset.classes
        return self.train_loader, self.val_loader, self.test_loader

    def setup_model(self, conv_config, fc_config, num_classes):
        """Initialize CNN model"""
        self.model = self.CNNModel(
            input_size=self.img_size,
            in_channels=3,
            num_classes=num_classes,
            conv_layers=conv_config,
            fc_layers=fc_config
        ).to(self.device)
        return self.model

    def train_model(self, epochs=10, lr=1e-3, weight_decay=1e-4):
        """Training workflow"""
        optimizer = optim.Adam(self.model.parameters(), lr=lr, weight_decay=weight_decay)
        criterion = nn.CrossEntropyLoss()

        best_acc = 0.0
        for epoch in tqdm(range(epochs)):
            self.model.train()
            running_loss = 0.0
            correct = 0
            total = 0

            for inputs, labels in self.train_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

            train_loss = running_loss / total
            train_acc = 100 * correct / total
            val_loss, val_acc = self.evaluate(self.val_loader)

            print(f"Epoch {epoch+1}/{epochs}")
            print(f"Train Loss: {train_loss:.4f} | Acc: {train_acc:.2f}%")
            print(f"Val Loss: {val_loss:.4f} | Acc: {val_acc:.2f}%")

            if val_acc > best_acc:
                best_acc = val_acc
                torch.save(self.model.state_dict(), 'best_model.pth')

    def evaluate(self, loader):
        """Model evaluation"""
        self.model.eval()
        total = 0
        correct = 0
        loss = 0.0
        criterion = nn.CrossEntropyLoss()

        with torch.no_grad():
            for inputs, labels in loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                loss += criterion(outputs, labels).item() * inputs.size(0)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        return loss/total, 100*correct/total

    def visualize_predictions(self, num_images=9):
        """Sample predictions visualization"""
        self.model.eval()
        sampler = RandomSampler(self.test_loader.dataset)
        sample_loader = DataLoader(self.test_loader.dataset, batch_size=num_images, sampler=sampler)
        images, labels = next(iter(sample_loader))

        with torch.no_grad():
            outputs = self.model(images.to(self.device))
            _, preds = torch.max(outputs, 1)

        plt.figure(figsize=(12, 12))
        for i in range(num_images):
            plt.subplot(3, 3, i+1)
            image = images[i].permute(1, 2, 0).cpu().numpy()
            plt.imshow(np.clip(image, 0, 1))
            color = 'green' if preds[i] == labels[i] else 'red'
            plt.title(f"True: {self.class_names[labels[i]]}\nPred: {self.class_names[preds[i]]}", color=color)
            plt.axis('off')
        plt.tight_layout()
        plt.show()

# Example usage in Colab:
if __name__ == "__main__":
    # Initialize experiment
    exp = DLExperiment(img_size=(600, 600), data_root='/content/data/')

    # Define layer configurations
    conv_config = [
        {'type': 'conv', 'filters': 32, 'kernel': 3, 'stride': 1, 'padding': 1, 'activation': 'relu', 'bn': True},
        {'type': 'pool', 'size': 2, 'stride': 2},
        {'type': 'conv', 'filters': 64, 'kernel': 3, 'stride': 1, 'padding': 1, 'activation': 'relu', 'bn': True},
        {'type': 'pool', 'size': 2, 'stride': 2}
    ]

    fc_config = [512, 256]

    # Setup data and model
    exp.setup_data(batch_size=32)
    exp.setup_model(conv_config, fc_config, num_classes=10)

    # Start training
    exp.train_model(epochs=10, lr=1e-3)

    # Evaluate and visualize
    test_loss, test_acc = exp.evaluate(exp.test_loader)
    print(f"\nFinal Test Accuracy: {test_acc:.2f}%")
    exp.visualize_predictions()

In [None]:
wandb.login(key="")


sweep_config = {
    'method': 'random',
    'name' : 'PA2 Hyper Sweep Factor Two',
    'metric': {
      'name': 'Validation accuracy',
      'goal': 'maximize'
    },
    'parameters': {
        'num_hidden_layers': {
            'values': [1]
        },
         'hidden_size':{
            'values':[32,64,128,256]
        },
        'activation': {
            'values': ['relu','silu','tanh']
        },

        'optimiser': {
            'values': ["adam","rmsprop","nadam"]
        },

        'num_conv_layers' :{
            'values' : [5]
        },

        'conv_filter_factor':{
            'values' : [2]
        },

        'num_filters':{
            'values' : [4,8,16]
        },

        'filter_size':{
            'values' : [5,7,11]
        },


        'lr': {
            'values': [1e-3,1e-4,3e-4]
        },
        'weight_decay': {
            'values': [0,5e-3,5e-4]
        },

        'batch_norm' : {
            'values' : [True, False]
        },

        'dropout' : {

            'values' : [None,0.2,0.3]
        },

        'data_aug' : {

            'values' : [None,1,2]
        },

        'epochs' : {

            'values' : [8]
        },

        'Image_Crop_Size':{
            'values' : [500,600,800]
        },

        'batch_size' : {

            'values' : [16,32]
        }

        }
    }

sweep_id = wandb.sweep(sweep=sweep_config, project='')

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms

def configure_image_experiment(config, data_root="inaturalist_12K/", model_checkpoint=None, use_wandb=False):
    """Configure and execute complete image classification workflow"""

    # Device setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using hardware accelerator: {device}")

    # Initialize experiment manager
    experiment = DLExperiment(
        img_size=(config['crop_size'], config['crop_size']),
        data_root=data_root,
        device=device,
        use_wandb=use_wandb
    )

    # Configure data augmentations
    augmentation_presets = []
    if config['data_aug']:
        augmentations = [
            [transforms.RandomPerspective(p=1)],
            [transforms.ColorJitter(brightness=0.5, hue=0.5)]
        ]
        augmentation_presets = augmentations[:config['data_aug']]

    # Initialize data pipelines
    train_loader, val_loader, test_loader = experiment.setup_data(
        batch_size=config['batch_size'],
        augmentations=augmentation_presets
    )

    # Load existing model if provided
    if model_checkpoint:
        experiment.model.load_state_dict(torch.load(model_checkpoint))
        test_loss, test_acc = experiment.evaluate(test_loader)
        print(f"Loaded model test accuracy: {test_acc:.2f}%")
        return experiment.model

    # Configure convolutional architecture
    conv_settings = []
    current_filters = config['initial_filters']
    for layer_idx in range(config['conv_layers']):
        conv_settings.append({
            'type': 'conv',
            'filters': current_filters,
            'kernel': config['kernel_size'],
            'stride': 2 if layer_idx > 0 else 1,
            'padding': (config['kernel_size']-1) if layer_idx == 0 else 0,
            'activation': 'relu',
            'bn': config['batch_norm']
        })
        conv_settings.append({
            'type': 'pool',
            'size': 2,
            'stride': 1
        })
        current_filters *= config['filter_growth_factor']

    # Configure fully connected layers
    fc_settings = [config['hidden_units']] * config['dense_layers']

    # Initialize model architecture
    experiment.setup_model(
        conv_config=conv_settings,
        fc_config=fc_settings,
        num_classes=len(train_loader.dataset.classes)
    )

    # Execute training workflow
    experiment.train_model(
        epochs=config['training_epochs'],
        lr=config['learning_rate'],
        weight_decay=config['l2_regularization']
    )

    return experiment.model

# Example configuration dictionary
sample_config = {
    'crop_size': 224,
    'batch_size': 64,
    'data_aug': 2,
    'conv_layers': 4,
    'initial_filters': 32,
    'kernel_size': 3,
    'filter_growth_factor': 2,
    'dense_layers': 2,
    'hidden_units': 512,
    'batch_norm': True,
    'dropout_rate': 0.5,
    'learning_rate': 1e-3,
    'l2_regularization': 1e-4,
    'training_epochs': 25
}

# Example execution
if __name__ == "__main__":
    model = configure_image_experiment(sample_config)
    model.visualize_predictions()

In [None]:
optimized_model_config = {
    # Network Architecture
    'dense_layers': 1,               # Previously num_hidden_layers
    'hidden_units': 64,              # Previously hidden_size
    'activation': 'relu',            # Unchanged
    'optimizer': 'rmsprop',          # Previously optimiser

    # Convolutional Parameters
    'conv_layers': 5,                # Previously num_conv_layers
    'filter_growth_factor': 1,       # Previously conv_filter_factor
    'initial_filters': 32,           # Previously num_filters
    'kernel_size': 5,                # Previously filter_size

    # Training Parameters
    'learning_rate': 3e-4,           # Previously lr
    'l2_regularization': 5e-4,       # Previously weight_decay
    'batch_norm': True,              # Unchanged
    'dropout_rate': 0.2,             # Previously dropout

    # Data Configuration
    'data_aug': 1,                   # Unchanged
    'training_epochs': 8,            # Previously epochs
    'crop_size': 600,                # Previously Image_Crop_Size
    'batch_size': 16                 # Unchanged
}

In [None]:
# To run with best parameters and load existing model
configure_image_experiment(
    config=optimized_model_config,
    model_checkpoint="best_model.pth",  # Updated parameter name
    data_root="/content/inaturalist_12K/",  # Explicit path for Colab
    use_wandb=False
)

# To train from scratch with best parameters
configure_image_experiment(
    config=optimized_model_config,
    data_root="/content/inaturalist_12K/",
    use_wandb=True
)