<a href="https://colab.research.google.com/github/dhamu2908/DeepLearningAssignment2/blob/main/DeepLearningAssignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Deep Learning Assignment 2
CS24M027 Dhamodharan Muthu Muniyandi
IIT MADRAS
"""

'\nDeep Learning Assignment 2\nCS24M027 Dhamodharan Muthu Muniyandi\nIIT MADRAS\n'

In [None]:
import os

#Data Accessing check

print("Exploring contents of /kaggle/input/dldata:")
dataset_path = "/kaggle/input/dldata/inaturalist_12K"
train_path = os.path.join(dataset_path, "train")
val_path = os.path.join(dataset_path, "val")

# Verifying the existence of directories
is_train_available = os.path.isdir(train_path)
is_val_available = os.path.isdir(val_path)


print(f"Train directory found: {is_train_available}")
print(f"Validation directory found: {is_val_available}")

if is_train_available:
    print("Sample items in training folder:", os.listdir(train_path)[:5])

if is_val_available:
    print("Sample items in validation folder:", os.listdir(val_path)[:5])

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import time
import wandb

In [None]:
# Initialize WandB
wandb.login(key = "" )

In [None]:
# Define training configuration parameters
config = {
    "num_epochs": 10,
    "lr": 1e-4,
    "batch_sz": 32,
    "backbone": "resnet50",
    "unfreeze_layers": 1
}

IMG_DIM = 224
NUM_CLASSES = 10

# Function to divide dataset into training and validation sets while preserving class balance
def stratified_split(dataset, train_frac):
    train_ids = []
    val_ids = []

    # Index ranges for each class (based on specific dataset arrangement)
    class_boundaries = [
        (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (4000, 4998),
        (4999, 5998), (5999, 6998), (6999, 7998), (7999, 8998), (8999, 9998)
    ]

    for lower, upper in class_boundaries:
        indices = list(range(lower, upper + 1))
        split_point = int(len(indices) * train_frac)
        train_ids.extend(indices[:split_point])
        val_ids.extend(indices[split_point:])

    training_data = Subset(dataset, train_ids)
    validation_data = Subset(dataset, val_ids)

    return training_data, validation_data


In [None]:
# Responsible for creating data loaders and returning dataset statistics
def build_data_pipeline(config):
    size = (IMAGE_DIMENSION, IMAGE_DIMENSION)

    def get_transform():
        return transforms.Compose([
            transforms.Resize(size),
            transforms.ToTensor()
        ])

    # Define paths for training and testing datasets
    path_train = "/kaggle/input/nature1/inaturalist_12K/train"
    path_test = "/kaggle/input/nature1/inaturalist_12K/val"

    # Load raw datasets
    complete_train_dataset = ImageFolder(root=path_train, transform=get_transform())
    test_dataset = ImageFolder(root=path_test, transform=get_transform())

    # Split training set into train and validation sets
    splitter = DatasetSplitter(complete_train_dataset, 0.8)
    train_subset, val_subset = splitter.perform_split()

    # Create data loaders
    bs = config["batch_size"]
    loader_train = DataLoader(train_subset, batch_size=bs, shuffle=True)
    loader_val = DataLoader(val_subset, batch_size=bs, shuffle=True)
    loader_test = DataLoader(test_dataset, batch_size=bs, shuffle=True)

    # Return everything in a structured dictionary
    return {
        "train_size": len(train_subset),
        "val_size": len(val_subset),
        "test_size": len(test_dataset),
        "train_loader": loader_train,
        "val_loader": loader_val,
        "test_loader": loader_test
    }


In [None]:
# Constructs and customizes a ResNet-50 model based on the provided configuration
def build_resnet_model(config):
    from torchvision.models import resnet50

    # Load pretrained ResNet-50 architecture
    resnet = resnet50(weights="IMAGENET1K_V1")
    input_features = resnet.fc.in_features

    # Replace the final layer to match the number of output classes
    resnet.fc = torch.nn.Linear(input_features, TOTAL_CLASSES)

    # Initially freeze all layers
    for param in resnet.parameters():
        param.requires_grad = False

    # Selectively unfreeze last 'k' layers
    unfreeze_count = config["unfreeze_layers"]
    if unfreeze_count > 0:
        for param in list(resnet.parameters())[-unfreeze_count:]:
            param.requires_grad = True

    return resnet


In [None]:
# Trains a model using the provided configuration and dataset, logs metrics to Weights & Biases
def run_training(config, dataset_bundle):
    import wandb

    # Set device context
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Model setup
    model = build_resnet_model(config)
    model = torch.nn.DataParallel(model, device_ids=[0]).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])

    # Load training and validation data
    train_loader = dataset_bundle["train_loader"]
    val_loader = dataset_bundle["val_loader"]
    n_train = dataset_bundle["train_size"]
    n_val = dataset_bundle["val_size"]

    for ep in range(config["num_epochs"]):
        model.train()
        train_loss_accumulator = 0.0
        correct_train_preds = 0

        for step, (x_batch, y_batch) in enumerate(train_loader):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            logits = model(x_batch)
            loss = criterion(logits, y_batch)
            loss.backward()
            optimizer.step()

            train_loss_accumulator += loss.item()
            correct_train_preds += (logits.argmax(dim=1) == y_batch).sum().item()

            if step % 10 == 0:
                batch_acc = (logits.argmax(1) == y_batch).float().mean().item()
                print(f"[Epoch {ep} | Batch {step}] Accuracy: {batch_acc:.4f}, Loss: {loss.item():.4f}")

        # Evaluate on validation set
        model.eval()
        val_loss_accumulator = 0.0
        correct_val_preds = 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device)
                val_logits = model(x_val)
                loss = criterion(val_logits, y_val)
                val_loss_accumulator += loss.item()
                correct_val_preds += (val_logits.argmax(1) == y_val).sum().item()

        # Calculate and log metrics
        train_accuracy = correct_train_preds / n_train
        train_loss = train_loss_accumulator / len(train_loader)
        val_accuracy = correct_val_preds / n_val
        val_loss = val_loss_accumulator / len(val_loader)

        print(f"Epoch {ep} | Train Acc: {train_accuracy:.4f} | Train Loss: {train_loss:.4f} | Val Acc: {val_accuracy:.4f} | Val Loss: {val_loss:.4f}")
        wandb.log({
            "epoch": ep,
            "train_accuracy": train_accuracy,
            "train_loss": train_loss,
            "val_accuracy": val_accuracy,
            "val_loss": val_loss
        })

    print("Training complete.")
    torch.save(model.state_dict(), "./model.pth")


In [None]:
# Assign config object
experiment_config = config  # renamed from h_params for consistency

# Prepare the dataset
data_bundle = build_data_pipeline(experiment_config)

# Initialize Weights & Biases for logging
run_name = f"{experiment_config['backbone']}_epochs{experiment_config['num_epochs']}_bs{experiment_config['batch_sz']}_lr{experiment_config['lr']}_unfreeze{experiment_config['unfreeze_layers']}"
wandb_run = wandb.init(
    project="DL_Assignment_2B",
    name=run_name,
    config=experiment_config
)

# Begin model training
run_training(experiment_config, data_bundle)


In [None]:
# Define sweep configuration for hyperparameter optimization
sweep_config = {
    "method": "bayes",
    "name": "DL_Assignment2_Sweep",
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "epochs": {"values": [10]},
        "learning_rate": {"values": [0.0001, 0.001]},
        "batch_size": {"values": [32, 64]},
        "num_of_filter": {"values": [16, 32, 64]},
        "filter_size": {
            "values": [
                [3, 3, 3, 3, 3],
                [5, 5, 5, 5, 5],
                [7, 7, 7, 7, 7],
                [11, 9, 7, 5, 3],
                [3, 5, 7, 9, 11]
            ]
        },
        "actv_func": {"values": ["elu", "gelu", "leaky_relu", "selu"]},
        "filter_multiplier": {"values": [1, 2]},
        "data_aug": {"values": [False]},
        "use_batchnorm": {"values": [True, False]},
        "dropout_rate": {"values": [0, 0.1, 0.2]},
        "dense_units": {"values": [64, 128, 256]},
        "conv_layer_count": {"values": [5]}
    }
}

# Initialize sweep
sweep_id = wandb.sweep(sweep=sweep_config, project="DL_Assignment_2")

# Define training entry point for each sweep run
def sweep_main():
    wandb.init()
    cfg = wandb.config

    run_name = (
        f"{cfg.actv_func}_ep{cfg.epochs}_lr{cfg.learning_rate}_initF{cfg.num_of_filter}_"
        f"fsz{cfg.filter_size}_fmul{cfg.filter_multiplier}_aug{cfg.data_aug}_"
        f"bn{cfg.use_batchnorm}_drop{cfg.dropout_rate}_dense{cfg.dense_units}_bs{cfg.batch_size}"
    )

    with wandb.init(project="DL_Assignment_2", name=run_name, config=cfg):
        dataset = build_data_pipeline(cfg)
        run_training(cfg, dataset)

# Launch the agent to perform sweeps
wandb.agent(sweep_id, function=sweep_main, count=10)


In [1]:
#Question 4 10 X 3

import torch
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from torch.utils.data import Subset, DataLoader
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import wandb
import time
import matplotlib.pyplot as plt
import numpy as np

# Configure the hyper parameters
h_params = {
    "epochs": 10,
    "learning_rate": 0.0001,
    "batch_size": 32,
    "num_of_filter": 64,
    "filter_size": [3, 3, 3, 3, 3],
    "actv_func": "gelu",
    "filter_multiplier": 2,
    "data_augumentation": False,
    "batch_normalization": True,
    "dropout": 0.4,
    "conv_layers": 5,
    "dense_layer_size": 256
}

IMAGE_SIZE = 224
NUM_OF_CLASSES = 10
CLASS_NAMES = ["Amphibia", "Animalia", "Arachnida", "Aves", "Fungi",
               "Insecta", "Mammalia", "Mollusca", "Plantae", "Reptilia"]

# Initialize wandb
wandb.login(key="986fd96a25245251243e3084fc375526692b03b6")
run = wandb.init(project="DL_Assignment_2", config=h_params)

# Data preparation
def split_dataset_with_class_distribution(dataset, split_ratio):
    train_indices = []
    val_indices = []
    class_ranges = [(0, 999), (1000, 1999), (2000, 2999), (3000, 3999),
                   (4000, 4998), (4999, 5998), (5999, 6998),
                   (6999, 7998), (7999, 8998), (8999, 9998)]

    for start, end in class_ranges:
        class_indices = list(range(start, end + 1))
        split_idx = int(len(class_indices) * split_ratio)
        train_indices.extend(class_indices[:split_idx])
        val_indices.extend(class_indices[split_idx:])

    return Subset(dataset, train_indices), Subset(dataset, val_indices)

def prepare_data(h_params):
    transform = transforms.Compose([
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor()
    ])

    train_data_dir = "/kaggle/input/dldata/inaturalist_12K/train"
    test_data_dir = "/kaggle/input/dldata/inaturalist_12K/val"

    train_dataset = ImageFolder(train_data_dir, transform=transform)
    train_dataset, val_dataset = split_dataset_with_class_distribution(train_dataset, 0.8)
    test_dataset = ImageFolder(test_data_dir, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=h_params["batch_size"], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=h_params["batch_size"], shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=h_params["batch_size"], shuffle=True)

    return {
        "train_loader": train_loader,
        "val_loader": val_loader,
        "test_loader": test_loader,
        "train_len": len(train_dataset),
        "val_len": len(val_dataset),
        "test_len": len(test_dataset)
    }

# Model definition
class CNN(nn.Module):
    def __init__(self, h_params):
        super(CNN, self).__init__()
        self.h_params = h_params
        self.filters = [int(h_params["num_of_filter"] * (h_params["filter_multiplier"]**i))
                       for i in range(h_params["conv_layers"])]

        # Convolutional layers
        self.conv_layers = nn.ModuleList()
        self.bn_layers = nn.ModuleList()
        in_channels = 3

        for i in range(h_params["conv_layers"]):
            self.conv_layers.append(
                nn.Conv2d(in_channels, self.filters[i], h_params["filter_size"][i])
            )
            if h_params["batch_normalization"]:
                self.bn_layers.append(nn.BatchNorm2d(self.filters[i]))
            in_channels = self.filters[i]

        # Fully connected layers
        f_map_size = self.calculate_fmap_size(IMAGE_SIZE)
        self.fc1 = nn.Linear(self.filters[-1] * f_map_size * f_map_size, h_params["dense_layer_size"])
        self.fc2 = nn.Linear(h_params["dense_layer_size"], NUM_OF_CLASSES)
        self.dropout = nn.Dropout(p=h_params["dropout"])
        self.activation = self.get_activation(h_params["actv_func"])

    def forward(self, x):
        for i in range(self.h_params["conv_layers"]):
            x = self.conv_layers[i](x)
            if self.h_params["batch_normalization"]:
                x = self.bn_layers[i](x)
            x = self.activation(x)
            x = F.max_pool2d(x, 2, 2)

        x = x.view(x.size(0), -1)
        x = self.activation(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

    def calculate_fmap_size(self, input_size):
        size = input_size
        for i in range(self.h_params["conv_layers"]):
            size = (size - self.h_params["filter_size"][i] + 1) // 2
        return size

    def get_activation(self, name):
        activations = {
            'relu': F.relu,
            'gelu': F.gelu,
            'silu': F.silu,
            'selu': F.selu,
            'leaky_relu': F.leaky_relu,
            'elu': F.elu
        }
        return activations.get(name, F.relu)

# Training and evaluation
def train_model(model, data_loaders, optimizer, criterion, device, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in data_loaders["train_loader"]:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(data_loaders["train_loader"])
        train_acc = correct / total

        # Validation
        val_loss, val_acc = evaluate_model(model, data_loaders["val_loader"], criterion, device)

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")

        wandb.log({
            "epoch": epoch,
            "train_loss": train_loss,
            "train_acc": train_acc,
            "val_loss": val_loss,
            "val_acc": val_acc
        })

def evaluate_model(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return running_loss / len(loader), correct / total

def create_prediction_grid(model, loader, class_names, device, n_images=30):
    model.eval()
    images, labels = next(iter(loader))
    images, labels = images.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(images[:n_images])
        _, preds = torch.max(outputs, 1)

    # Create grid
    fig, axes = plt.subplots(10, 3, figsize=(15, 30))
    images = images[:n_images].cpu()

    for i, ax in enumerate(axes.flat):
        if i >= n_images:
            break

        img = images[i].permute(1, 2, 0).numpy()
        img = np.clip(img, 0, 1)

        true_label = class_names[labels[i]]
        pred_label = class_names[preds[i]]
        color = 'green' if true_label == pred_label else 'red'

        ax.imshow(img)
        ax.set_title(f"True: {true_label}\nPred: {pred_label}", color=color)
        ax.axis('off')

    plt.tight_layout()
    return fig

# Main execution
if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Prepare data
    data_loaders = prepare_data(h_params)

    # Initialize model
    model = CNN(h_params).to(device)
    if torch.cuda.device_count() > 1:
        model = nn.DataParallel(model)

    # Training setup
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=h_params["learning_rate"])

    # Train model
    train_model(model, data_loaders, optimizer, criterion, device, h_params["epochs"])

    # Evaluate on test set
    test_loss, test_acc = evaluate_model(model, data_loaders["test_loader"], criterion, device)
    print(f"Test Accuracy: {test_acc:.4f}")
    wandb.log({"test_acc": test_acc})

    # Create and save prediction grid
    pred_grid = create_prediction_grid(model, data_loaders["test_loader"], CLASS_NAMES, device)
    plt.savefig("prediction_grid.png")
    wandb.log({"prediction_grid": wandb.Image(pred_grid)})

    # Save model
    torch.save(model.state_dict(), "best_model.pth")
    wandb.save("best_model.pth")

    wandb.finish()