In [1]:
import torch
import os
import wandb
import numpy as np
import torch
import random
import torch.nn as nn
from PIL import Image
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

## Question 1

In [None]:
class CNNModel(nn.Module):
    def __init__(
        self,
        input_channels=3,
        filters_per_layer=[32, 64, 128, 256, 512],
        kernel_size=3,
        pool_sizes=2,
        conv_activation='relu',
        dense_units=256,
        dense_activation='relu',
        num_classes=10,
        dropout_rate=0.5,
        use_batch_norm=True
    ):
        
        super().__init__()
        
        self.kernel_size = kernel_size
        self.pool_sizes = pool_sizes
        self.conv_activation = conv_activation
        self.dense_activation = dense_activation

        # Initializing Convolutional, Batch Norm, and pooling layers
        self.conv_layers = nn.ModuleList()
        self.batch_norm_layers = nn.ModuleList()
        self.pool_layers = nn.ModuleList()
        
        in_channels = input_channels
        
        # Create 5 convolutional blocks
        for filters in filters_per_layer:
            # Convolutional layer
            self.conv_layers.append(
                nn.Conv2d(
                    in_channels=in_channels,
                    out_channels=filters,
                    kernel_size=kernel_size,
                    padding="same",
                )
            )
            
            # Batch normalization layer
            if use_batch_norm:
                self.batch_norm_layers.append(nn.BatchNorm2d(filters))
            else:
                self.batch_norm_layers.append(None)
            
            # Max pooling layer
            self.pool_layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            
            # Update in_channels for next layer
            in_channels = filters
        
        # Calculate the output size after conv layers
        # Assuming input is 224x224, after 5 max-pooling layers it will be 7x7
        conv_output_size = 7 * 7 * filters_per_layer[-1]
        
        # First flatten the image to pass it to the dense layer
        self.flatten = nn.Flatten()

        # Dense layer
        self.fc1 = nn.Linear(conv_output_size, dense_units)
        self.fc_bn = nn.BatchNorm1d(dense_units) if use_batch_norm else None
        self.dropout1 = nn.Dropout(dropout_rate)
        
        # Output layer
        self.fc2 = nn.Linear(dense_units, num_classes)
        self.dropout2 = nn.Dropout(dropout_rate)
    
    def activation_func(self, activation, x):
        """Apply the selected activation function"""
        if activation.lower() == 'relu':
            return F.relu(x)
        elif activation.lower() == 'gelu':
            return F.gelu(x)
        elif activation.lower() == 'silu' or activation.lower() == 'swish':
            return F.silu(x)
        elif activation.lower() == 'mish':
            return x * torch.tanh(F.softplus(x))
            return F.sigmoid(x)
        elif activation.lower() == 'leakyrelu':
            return F.leaky_relu(x, negative_slope=0.01)
        else:
            # Default to ReLU
            return F.relu(x)
    
    def forward(self, x):
        """Forward pass through the network"""
        # Convolutional blocks
        for i, (conv, bn, pool) in enumerate(zip(self.conv_layers, self.batch_norm_layers, self.pool_layers)):
            x = conv(x)
            if bn is not None:
                x = bn(x)
            x = self.activation_func(self.conv_activation, x)
            x = pool(x)
        
        # Flatten
        x = self.flatten(x)
        # x = torch.flatten(x, 1)
        
        # Dense layer
        x = self.fc1(x)
        if self.fc_bn is not None:
            x = self.fc_bn(x)
        x = self.activation_func(self.dense_activation, x)
        x = self.dropout1(x)
        
        # Output layer
        x = self.fc2(x)
        
        return x

## Question 2

In [None]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Data paths
DATASET_PATH = r"E:\IITM\2nd sem\inaturalist_12K"  # Update with your path
TRAIN_DIR = os.path.join(DATASET_PATH, "train")
TEST_DIR = os.path.join(DATASET_PATH, "val")

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Hyperparameter sweep configuration
sweep_config = {
    'method': 'bayes',
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'base_filters': {'values': [32, 64]},
        'conv_activation': {'values': ['relu', 'gelu', 'silu', 'mish']},
        'dense_activation': {'values':['relu', 'gelu', 'silu', 'leakyrelu']},
        'filter_organization': {'values': ['same', 'doubling', 'halving']},
        'data_augmentation': {'values': [True, False]},
        'use_batch_norm': {'values': [True, False]},
        'dropout_rate': {'values': [0, 0.2, 0.3, 0.5]},
        'dense_neurons': {'values': [128, 256, 512, 1024]},
        'learning_rate': {'values': [0.0001, 0.001, 0.01]},
        'epochs': {'value': 10},
        'batch_size': {'value': 32},
        'image_size': {'value': 224},
        'validation_split': {'value': 0.2}
    }
}


In [None]:
class iNaturalistDataset(Dataset):
    """Custom Dataset for iNaturalist images."""
    
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with class subdirectories.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        
        # Get class directories and create class-to-idx mapping
        self.classes = sorted([d for d in os.listdir(root_dir) 
                              if os.path.isdir(os.path.join(root_dir, d))])
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        
        # Get all image paths and corresponding labels
        self.image_paths = []
        self.labels = []
        
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.image_paths.append(os.path.join(class_dir, img_name))
                    self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label


In [None]:
def load_and_split_data(config):
    """
    Load data and split into train and validation sets,
    ensuring equal class representation in validation set.
    """
    # Base transforms
    base_transform = transforms.Compose([
        transforms.Resize((config.image_size, config.image_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Data augmentation transform
    augment_transform = transforms.Compose([
        transforms.Resize((config.image_size, config.image_size)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Choose transform based on config
    train_transform = augment_transform if config.data_augmentation else base_transform
    
    # Load dataset
    full_dataset = iNaturalistDataset(root_dir=TRAIN_DIR, transform=train_transform)
    val_dataset = iNaturalistDataset(root_dir=TRAIN_DIR, transform=base_transform)
    test_datase = iNaturalistDataset(root_dir=TEST_DIR, transform=base_transform)
    
    # Get class counts for stratified split
    class_counts = {}
    for label in full_dataset.labels:
        if label not in class_counts:
            class_counts[label] = 0
        class_counts[label] += 1
    
    # Create stratified split
    train_indices = []
    val_indices = []
    
    for class_idx in range(len(full_dataset.classes)):
        # Get indices for this class
        class_indices = [i for i, label in enumerate(full_dataset.labels) if label == class_idx]
        np.random.shuffle(class_indices)
        
        # Split indices
        val_count = int(len(class_indices) * config.validation_split)
        val_indices.extend(class_indices[:val_count])
        train_indices.extend(class_indices[val_count:])
    
    # Create subset datasets
    train_dataset = Subset(full_dataset, train_indices)
    val_dataset = Subset(val_dataset, val_indices)
    
    print(f"Total training samples: {len(train_dataset)}")
    print(f"Total validation samples: {len(val_dataset)}")
    
    # Create data loaders
    train_loader = DataLoader(
        train_dataset, 
        batch_size=config.batch_size,
        shuffle=True,
        num_workers=4,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=config.batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    test_loader = DataLoader(
        test_datase,
        batch_size=config.batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    return train_loader, val_loader, test_loader, len(full_dataset.classes)


In [None]:
def train_model(config):
    """Train the model with current hyperparameter configuration."""
    # Load and split data
    train_loader, val_loader, test_loader, num_classes = load_and_split_data(config)
    
    # Create model based on hyperparameters
    if config.filter_organization == 'same':
        filters = [config.base_filters] * 5
    elif config.filter_organization == 'doubling':
        filters = [config.base_filters * (2**i) for i in range(5)]
    elif config.filter_organization == 'halving':
        filters = [config.base_filters * (2**(4-i)) for i in range(5)]
    else:
        filters = [32, 64, 128, 256, 512]  # Default
    
    model = CNNModel(
        input_channels=3,
        num_classes=num_classes,
        filters_per_layer=filters,
        kernel_size=3,
        conv_activation=config.conv_activation,
        dense_units=config.dense_neurons,
        dense_activation = config.dense_activation,
        dropout_rate=config.dropout_rate,
        use_batch_norm=config.use_batch_norm
    )
    
    # Move model to device
    model = model.to(device)
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    
    # Initialize WandB for tracking
    wandb.watch(model, criterion, log="all", log_freq=100)
    
    # Training loop
    best_val_acc = 0.0
    
    for epoch in range(config.epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        
        train_acc = correct / total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        val_acc = val_correct / val_total
        
        # Log metrics to wandb
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": running_loss / len(train_loader),
            "train_accuracy": train_acc,
            "val_loss": val_loss / len(val_loader),
            "val_accuracy": val_acc
        })
        
        print(f'Epoch: {epoch + 1}, Val Loss: {val_loss / len(val_loader):.3f}, Val Acc: {100 * val_acc:.2f}%')
        
        # Save the best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            # Save the model
            torch.save(model, f"best_model_{wandb.run.id}.pth")
            # Log the model to wandb
            artifact = wandb.Artifact('model', type='model')
            artifact.add_file(f"best_model_{wandb.run.id}.pth")
            wandb.log_artifact(artifact)
    
    model.eval()
    test_correct = 0
    test_total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            test_total += labels.size(0)
            test_correct += predicted.eq(labels).sum().item()
    
    test_acc = test_correct / test_total
    print(f'Test Accuracy: {100 * test_acc:.2f}%')
    wandb.log({"test_accuracy": test_acc})
    
    # Save final model
    torch.save(model, f"final_model_{wandb.run.id}.pth")

    # Log the model to wandb
    artifact = wandb.Artifact('model', type='model')
    artifact.add_file(f"final_model_{wandb.run.id}.pth")
    wandb.log_artifact(artifact)


In [None]:
def sweep_train():
    """Configure and run hyperparameter sweep."""
    # Initialize wandb
    wandb.init()

    # Configuration parameters
    config = wandb.config

    # Set run name based on hyperparameters
    run_name = f'bf_{config.base_filters}_fo_{config.filter_organization}_dn_{config.dense_neurons}_ca_{config.conv_activation}_da_{config.dense_activation}_v5'
    wandb.run.name = run_name

    # Call training function with current hyperparameters
    train_model(config)

In [None]:
if __name__ == "__main__":
    sweep_id = wandb.sweep(sweep=sweep_config, project="DA6401-A2-V4")
    
    wandb.agent(sweep_id, sweep_train, count=1)

## Question 4

In [None]:
# Path to the test data
DATASET_PATH = r"E:\IITM\2nd sem\inaturalist_12K"  # Update with your path
TEST_DIR = os.path.join(DATASET_PATH, "val")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Image size
IMAGE_SIZE = 224


In [None]:
def get_best_run_from_sweep(sweep_id, entity, project):
    api = wandb.Api()
    
    sweep = api.sweep(f"{entity}/{project}/{sweep_id}")
    
    best_val_acc = -1
    best_run = None
    
    for run in sweep.runs:
        val_acc = run.summary.get("val_accuracy")
        if val_acc is not None and val_acc > best_val_acc:
            best_val_acc = val_acc
            best_run = run
    
    if best_run is None:
        raise ValueError("No runs with 'val_accuracy' found in the sweep")
    
    print(f"Best run: {best_run.name}, val_accuracy: {best_val_acc:.4f}")
    return best_run


In [None]:
def download_best_model(run, artifact_name="model", output_dir="downloaded_model"):
    """
    Download the best model file from wandb.
    
    Args:
        best_run: The wandb run object for the best run
    
    Returns:
        model_path: Local path to the downloaded model
        config: Configuration of the best model
    """
    api = wandb.Api()

    # List all artifacts with this name in the project
    artifact_versions = api.artifacts(name=f"{run.project}/{artifact_name}", type_name='model')
    
    for artifact in artifact_versions:
        # Match the artifact to the run that created it
        if artifact.logged_by and artifact.logged_by().id == run.id:
            print(f"Found artifact version: {artifact.version} from run: {run.name}")
            artifact_dir = artifact.download(root=output_dir)
            for file_name in os.listdir(artifact_dir):
                print(file_name)
                if file_name.startswith("final_model") and file_name.endswith(".pth"):
                    model_path = os.path.join(artifact_dir, file_name)
                    print(f"Downloaded model file: {model_path}")
    
    print(f"Downloaded model file: {model_path}")
    
    # Get the model configuration from the run
    config = {
        'base_filters': run.config.get('base_filters', 32),
        'dense_activation': run.config.get('dense_activation', 'relu'),
        'filter_organization': run.config.get('filter_organization', 'doubling'),
        'dense_neurons': run.config.get('dense_neurons', 512),
        'dropout_rate': run.config.get('dropout_rate', 0.3),
        'use_batch_norm': run.config.get('use_batch_norm', True),
        'conv_activation': run.config.get('conv_activation', 'mish')
    }
    
    return model_path, config


In [None]:
def load_test_data():
    """Load the test dataset"""
    # Define transforms for test data
    test_transform = transforms.Compose([
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Load test dataset
    test_dataset = iNaturalistDataset(root_dir=TEST_DIR, transform=test_transform)
    
    # Create data loader
    test_loader = DataLoader(
        test_dataset,
        batch_size=32,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    return test_loader, test_dataset


In [None]:
def evaluate_model(sweep_id='uf2dfd5t', entity='da24m008-iit-madras', project='DA6401-A2-V4'):
    """Evaluate the best model on the test set"""
    # Initialize wandb
    wandb.init(project=project, job_type="evaluation")
    
    try:
        # Get the best run and download its model
        best_run = get_best_run_from_sweep(
            sweep_id=sweep_id,
            entity=entity,
            project=project
        )

        model_path, best_config = download_best_model(best_run)
        
        # Log the best run information
        wandb.log({"best_run_id": best_run.id, "best_run_name": best_run.name})
        
        # Load test data
        test_loader, test_dataset = load_test_data()
        
        # Get class names and count
        class_names = test_dataset.classes
        num_classes = len(class_names)
        print(f"Number of classes: {num_classes}")
        print(f"Class names: {class_names}")
        
        # Update config with the correct number of classes
        best_config['num_classes'] = num_classes
        
        # Create model with the best configuration
        if best_config['filter_organization'] == 'same':
            filters = [best_config['base_filters']] * 5
        elif best_config['filter_organization'] == 'doubling':
            filters = [best_config['base_filters'] * (2**i) for i in range(5)]
        elif best_config['filter_organization'] == 'halving':
            filters = [best_config['base_filters'] * (2**(4-i)) for i in range(5)]
        else:
            filters = [32, 64, 128, 256, 512]  # Default
        
        model = CNNModel(
            input_channels=3,
            num_classes=num_classes,
            filters_per_layer=filters,
            kernel_size=3,
            conv_activation=best_config['conv_activation'],
            dense_units=best_config['dense_neurons'],
            dropout_rate=best_config['dropout_rate'],
            use_batch_norm=best_config['use_batch_norm'],
            dense_activation=best_config['dense_activation']
        )
        
        # Load the best model weights
        model.load_state_dict(torch.load(model_path, map_location=device))
        model = model.to(device)
        model.eval()  # Set to evaluation mode
        
        # Evaluate model
        correct = 0
        total = 0
        all_labels = []
        all_predictions = []
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = outputs.max(1)
                
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
                
                # Store for confusion matrix
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predicted.cpu().numpy())
        
        test_accuracy = correct / total
        print(f"Test accuracy: {test_accuracy:.4f}")
        
        # Log to wandb
        wandb.log({
            "best_model_test_accuracy": test_accuracy
        })
        
        return model, test_loader, test_dataset, all_labels, all_predictions, class_names
    
    except Exception as e:
        print(f"Error during evaluation: {e}")
        raise e


In [None]:
def create_prediction_grid(model, test_dataset, class_names):
    """Create a 10x3 grid of test images with predictions"""
    # Set model to evaluation mode
    model.eval()
    
    # Define transform for visualization
    vis_transform = transforms.Compose([
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
    ])
    
    # Sample indices for the grid
    num_samples = min(30, len(test_dataset))  # 10x3 grid needs 30 images
    indices = np.random.choice(len(test_dataset), num_samples, replace=False)
    
    # Create figure for the grid
    plt.figure(figsize=(15, 25))
    
    # Create lists to store images and captions for wandb
    wandb_images = []
    
    with torch.no_grad():
        for i, idx in enumerate(indices):
            # Get image and label
            original_image, label = test_dataset[idx]
            
            # For visualization, we need the unnormalized image
            img_path = test_dataset.image_paths[idx]
            vis_image = Image.open(img_path).convert('RGB')
            vis_tensor = vis_transform(vis_image)
            
            # Move to device and add batch dimension
            input_tensor = original_image.unsqueeze(0).to(device)
            
            # Get prediction
            output = model(input_tensor)
            _, prediction = output.max(1)
            prediction = prediction.item()
            
            # Plot
            plt.subplot(10, 3, i+1)
            # Convert tensor to numpy for plotting
            img_array = vis_tensor.permute(1, 2, 0).numpy()
            plt.imshow(img_array)
            
            true_class = class_names[label]
            pred_class = class_names[prediction]
            
            if label == prediction:
                color = 'green'
                caption = f"True: {true_class} | Pred: {pred_class} ✓"
            else:
                color = 'red'
                caption = f"True: {true_class} | Pred: {pred_class} ✗"
            
            plt.title(caption, color=color)
            plt.axis('off')
            
            # Add to wandb images list
            wandb_images.append(wandb.Image(img_array, caption=caption))
    
    plt.tight_layout()
    
    # Save the figure
    plt.savefig('prediction_grid.png')
    
    # Log the figure to wandb
    wandb.log({"prediction_grid": wandb.Image('prediction_grid.png')})
    
    # Also log the individual images with captions
    wandb.log({"test_predictions": wandb_images})


In [None]:
def create_confusion_matrix(all_labels, all_predictions, class_names):
    """Create and log a confusion matrix"""
    # Create confusion matrix
    cm = confusion_matrix(all_labels, all_predictions)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    
    plt.figure(figsize=(12, 10))
    disp.plot(cmap=plt.cm.Blues)
    plt.xticks(rotation=90)
    plt.tight_layout()
    
    # Save the figure
    plt.savefig('confusion_matrix.png')
    
    # Log the figure to wandb
    wandb.log({"confusion_matrix": wandb.Image('confusion_matrix.png')})
    
    # Also log a summary of class-wise accuracies
    class_accuracy = cm.diagonal() / cm.sum(axis=1)
    for i, (class_name, accuracy) in enumerate(zip(class_names, class_accuracy)):
        wandb.log({f"class_accuracy/{class_name}": accuracy})


In [None]:
def generate_classification_report(all_labels, all_predictions, class_names):
    """Generate and log classification report"""    
    # Generate report
    report = classification_report(all_labels, all_predictions, 
                                  target_names=class_names, 
                                  output_dict=True)
    
    # Log to wandb
    for class_name in class_names:
        if class_name in report:
            wandb.log({
                f"metrics/{class_name}/precision": report[class_name]['precision'],
                f"metrics/{class_name}/recall": report[class_name]['recall'],
                f"metrics/{class_name}/f1-score": report[class_name]['f1-score']
            })
    
    # Log overall metrics
    wandb.log({
        "metrics/accuracy": report['accuracy'],
        "metrics/macro_avg_precision": report['macro avg']['precision'],
        "metrics/macro_avg_recall": report['macro avg']['recall'],
        "metrics/macro_avg_f1": report['macro avg']['f1-score'],
        "metrics/weighted_avg_precision": report['weighted avg']['precision'],
        "metrics/weighted_avg_recall": report['weighted avg']['recall'],
        "metrics/weighted_avg_f1": report['weighted avg']['f1-score']
    })


In [None]:
# Evaluate model
model, test_loader, test_dataset, all_labels, all_predictions, class_names = evaluate_model(
    sweep_id='uf2dfd5t',
    entity='da24m008-iit-madras',
    project='DA6401-A2-V4'
)


In [None]:
# Create prediction grid
create_prediction_grid(model, test_dataset, class_names)

In [None]:
# Create confusion matrix
create_confusion_matrix(all_labels, all_predictions, class_names)

In [None]:
# Generate classification report
generate_classification_report(all_labels, all_predictions, class_names)