In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 

# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Installing WandB
!pip install wandb -qqq
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets
from torchvision.transforms import ToTensor

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: ", device)

from tqdm import tqdm
import wandb, os
os.environ['WANDB_API_KEY'] = "5203e53880ceb7b6d2c0a93809e14ae43261f2ed" #your key here
wandb.login()

!pip install lightning

# Part A

## Question 1

In [None]:
import torch
import torch.nn as nn

class ConvNet(nn.Module):
    def __init__(
        self,
        input_shape=(3, 224, 224),
        conv_filters=[32, 64, 128, 256, 512],
        filter_sizes=[3, 3, 3, 3, 3],
        activation_fn=nn.ReLU,
        dense_units=256,
        dense_activation_fn=nn.ReLU,
        dropout_rate=0.3,
        batch_norm=True,
        num_classes=10
    ):
        super(ConvNet, self).__init__()

        self.conv_blocks = nn.Sequential()
        in_channels = input_shape[0]
        h, w = input_shape[1], input_shape[2]

        # Add 5 Conv-BN-Activation-Pool blocks
        for i in range(5):
            out_channels = conv_filters[i]
            kernel_size = filter_sizes[i]
            padding = kernel_size // 2  # keep same spatial size before pooling

            self.conv_blocks.add_module(f"conv{i+1}", nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=padding))
            if batch_norm:
                self.conv_blocks.add_module(f"bn{i+1}", nn.BatchNorm2d(out_channels))
            self.conv_blocks.add_module(f"act{i+1}", activation_fn())
            self.conv_blocks.add_module(f"pool{i+1}", nn.MaxPool2d(2))
            if dropout_rate > 0:
                self.conv_blocks.add_module(f"dropout{i+1}", nn.Dropout2d(dropout_rate))

            in_channels = out_channels
            h, w = h // 2, w // 2  # due to MaxPool2d(2)

        # Compute the flattened size after conv blocks
        self.flattened_size = in_channels * h * w

        self.fc1 = nn.Linear(self.flattened_size, dense_units)
        self.fc1_act = dense_activation_fn()
        self.dropout = nn.Dropout(dropout_rate)

        self.output_layer = nn.Linear(dense_units, num_classes)

    def forward(self, x):
        x = self.conv_blocks(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(self.fc1_act(self.fc1(x)))
        return self.output_layer(x)


## Question 2

In [None]:
import os
import torch
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split, Subset
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np

def get_dataloaders(data_dir, batch_size=64, val_split=0.2, augment=True):
    # Transforms
    train_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor()
    ]) if augment else transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    test_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    full_dataset = ImageFolder(root=data_dir, transform=train_transforms)

    # Stratified split
    targets = np.array(full_dataset.targets)
    splitter = StratifiedShuffleSplit(n_splits=1, test_size=val_split, random_state=42)
    train_idx, val_idx = next(splitter.split(np.zeros(len(targets)), targets))

    train_set = Subset(full_dataset, train_idx)
    val_set = Subset(ImageFolder(root=data_dir, transform=test_transforms), val_idx)

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=2)

    return train_loader, val_loader, len(full_dataset.classes)


In [None]:
import torch.nn.functional as F
import wandb

def train(model, train_loader, val_loader, optimizer, criterion, device, epochs=10):
    model.to(device)

    for epoch in range(epochs):
        model.train()
        total_loss, correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            correct += (outputs.argmax(dim=1) == labels).sum().item()

        train_accuracy = correct / len(train_loader.dataset)

        # Validation
        model.eval()
        val_correct, val_loss = 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                val_correct += (outputs.argmax(dim=1) == labels).sum().item()

        val_accuracy = val_correct / len(val_loader.dataset)

        wandb.log({
            "epoch": epoch + 1,
            "train_loss": total_loss / len(train_loader),
            "train_accuracy": train_accuracy,
            "val_loss": val_loss / len(val_loader),
            "val_accuracy": val_accuracy
        })

        print(f"Epoch {epoch+1} - Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}")


In [None]:
from torchvision import models
from torch import optim
import torch.nn as nn
import wandb


def main():
    wandb.init(project="DL_A2")

    config = wandb.config

    activation_map = {
        "ReLU": nn.ReLU,
        "GELU": nn.GELU,
        "SiLU": nn.SiLU,
        "Mish": nn.Mish
    }

    model = ConvNet(
        input_shape=(3, 224, 224),
        conv_filters=config.conv_filters,
        filter_sizes=config.filter_sizes,
        activation_fn=activation_map[config.activation_fn],
        dense_units=config.dense_units,
        dense_activation_fn=activation_map[config.activation_fn],
        dropout_rate=config.dropout,
        batch_norm=config.batch_norm,
        num_classes=10
    )

    train_loader, val_loader, _ = get_dataloaders(
        data_dir="/kaggle/input/nature-12k/inaturalist_12K/train",
        batch_size=config.batch_size,
        augment=config.augment
    )

    optimizer = optim.SGD(model.parameters(), lr=config.lr, momentum=0.9, weight_decay=1e-5)
    criterion = nn.CrossEntropyLoss()

    train(model, train_loader, val_loader, optimizer, criterion, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), epochs=config.epochs)



In [None]:
sweep_config = {
    "method": "random",
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "conv_filters": {
            "values": [[32, 32, 64, 64, 128], [32, 64, 128, 256, 512]]
        },
        "filter_sizes": {
            "values": [[3, 3, 3, 3, 3]]
        },
        "activation_fn": {
            "values": ["ReLU", "GELU", "SiLU", "Mish"]
        },
        "dropout": {
            "values": [0.2, 0.3]
        },
        "dense_units": {
            "values": [128, 256]
        },
        "batch_norm": {
            "values": [True, False]
        },
        "augment": {
            "values": [True, False]
        },
        "batch_size": {
            "values": [64, 128]
        },
        "lr": {
            "values": [0.01, 0.001]
        },
        "epochs": {
            "value": 10
        }
    }
}


In [None]:
sweep_id = wandb.sweep(sweep=sweep_config, project='DL_A2')
wandb.agent(sweep_id, function=main, count=50)


## Question 3

In [None]:

sweep_config = {
    "method": "random",
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "conv_filters": {
            "values": [[32, 32, 64, 64, 128],[512,256,128,64,32],[256,128,64,64,32], [32, 64, 128, 256, 512]]
        },
        "filter_sizes": {
            "values": [[3, 3, 3, 3, 3],[5,5,5,5,5],[7,7,7,7,7],[7,7,5,5,3],[7,5,3,3,3]]
        },
        "activation_fn": {
            "values": ["ReLU", "GELU", "SiLU", "Mish"]
        },
        "dropout": {
            "values": [0.0,0.2, 0.3]
        },
        "dense_units": {
            "values": [128, 256]
        },
        "batch_norm": {
            "values": [True]
        },
        "augment": {
            "values": [True, False]
        },
        "batch_size": {
            "values": [64, 128,256]
        },
        "lr": {
            "values": [0.01, 0.001]
        },
        "epochs": {
            "value": 10
        }
    }
}



In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np
import wandb


def get_dataloaders(data_dir, batch_size=256, val_split=0.2, augment=True):
    # Enhanced data augmentation
    train_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]) if augment else transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    val_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    full_dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)
    
    # Stratified split
    targets = np.array(full_dataset.targets)
    splitter = StratifiedShuffleSplit(n_splits=1, test_size=val_split, random_state=42)
    train_idx, val_idx = next(splitter.split(np.zeros(len(targets)), targets))
    
    train_set = Subset(full_dataset, train_idx)
    val_set = Subset(datasets.ImageFolder(root=data_dir, transform=val_transforms), val_idx)
    
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, 
                             num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, 
                           num_workers=4, pin_memory=True)
    
    return train_loader, val_loader, full_dataset.classes

class OptimizedCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(OptimizedCNN, self).__init__()
        
        # Larger filters in early layers, smaller in later layers
        self.conv_blocks = nn.Sequential(
            # Block 1: 64 filters, 7x7 kernel
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 2: 128 filters, 5x5 kernel
            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 3: 256 filters, 3x3 kernel
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 4: 512 filters, 3x3 kernel
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 5: 512 filters, 3x3 kernel
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        # Classifier
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes))
        
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                init.normal_(m.weight, 0, 0.01)
                init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.conv_blocks(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

def train(model, train_loader, val_loader, optimizer, criterion, scheduler, device, epochs=20):
    model.to(device)
    best_val_acc = 0.0
    
    for epoch in range(epochs):
        model.train()
        train_loss, train_correct = 0.0, 0
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_correct += predicted.eq(labels).sum().item()
        
        train_acc = 100 * train_correct / len(train_loader.dataset)
        
        # Validation
        model.eval()
        val_loss, val_correct = 0.0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                val_correct += outputs.argmax(1).eq(labels).sum().item()
        
        val_acc = 100 * val_correct / len(val_loader.dataset)
        
        # Step the scheduler
        scheduler.step(val_loss)
        
        # Log metrics
        wandb.log({            
            "epoch": epoch + 1,
            "train_loss": train_loss / len(train_loader),
            "train_accuracy": train_acc,
            "val_loss": val_loss / len(val_loader),
            "val_accuracy": val_acc,
            "lr": optimizer.param_groups[0]['lr']
        })
        
        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {train_loss/len(train_loader):.4f}, "
              f"Train Acc: {train_acc:.2f}%, "
              f"Val Loss: {val_loss/len(val_loader):.4f}, "
              f"Val Acc: {val_acc:.2f}%")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
    
    return best_val_acc

def main():
    wandb.init(project="DL_A2")
    
    # Device configuration
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Get data loaders
    train_loader, val_loader, classes = get_dataloaders(
        data_dir="/kaggle/input/nature-12k/inaturalist_12K/train",
        batch_size=256,
        augment=True
    )
    
    # Initialize model
    model = OptimizedCNN(num_classes=len(classes))
    
    # Loss function
    criterion = nn.CrossEntropyLoss()
    
    # Optimizer with momentum and weight decay
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-5)
    
    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.1, patience=3, verbose=True)
    
    # Train the model
    best_val_acc = train(
        model, train_loader, val_loader, 
        optimizer, criterion, scheduler,
        device=device, epochs=20
    )
    
    wandb.summary["best_val_acc"] = best_val_acc
    wandb.finish()



In [None]:
sweep_id = wandb.sweep(sweep=sweep_config, project='DL_A2')
wandb.agent(sweep_id, function=main, count=20)


## Question 4

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np
import wandb



sweep_config = {
    "method": "random",
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "conv_filters": {
            "values": [[256,128,64,64,32], [32, 64, 128, 256, 512]]
        },
        "filter_sizes": {
            "values": [[3, 3, 3, 3, 3],[7,7,7,7,7]]
        },
        "activation_fn": {
            "values": [ "GELU"]
        },
        "dropout": {
            "values": [0.2, 0.3]
        },
        "dense_units": {
            "values": [128]
        },
        "batch_norm": {
            "values": [True]
        },
        "augment": {
            "values": [False]
        },
        "batch_size": {
            "values": [64,256]
        },
        "lr": {
            "values": [0.01, 0.001]
        },
        "epochs": {
            "value": 30
        }
    }
}



def get_dataloaders(data_dir, batch_size=256, val_split=0.2, augment=True):
    # Enhanced data augmentation
    train_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]) if augment else transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    val_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    full_dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)
    
    # Stratified split
    targets = np.array(full_dataset.targets)
    splitter = StratifiedShuffleSplit(n_splits=1, test_size=val_split, random_state=42)
    train_idx, val_idx = next(splitter.split(np.zeros(len(targets)), targets))
    
    train_set = Subset(full_dataset, train_idx)
    val_set = Subset(datasets.ImageFolder(root=data_dir, transform=val_transforms), val_idx)
    
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, 
                             num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, 
                           num_workers=4, pin_memory=True)
    
    return train_loader, val_loader, full_dataset.classes

class OptimizedCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(OptimizedCNN, self).__init__()
        
        # Larger filters in early layers, smaller in later layers
        self.conv_blocks = nn.Sequential(
            # Block 1: 64 filters, 7x7 kernel
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 2: 128 filters, 5x5 kernel
            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 3: 256 filters, 3x3 kernel
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 4: 512 filters, 3x3 kernel
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
            # Block 5: 512 filters, 3x3 kernel
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        # Classifier
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Sequential(
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes))
        
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                init.normal_(m.weight, 0, 0.01)
                init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.conv_blocks(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

def train(model, train_loader, val_loader, optimizer, criterion, scheduler, device, epochs=20):
    model.to(device)
    best_val_acc = 0.0
    
    for epoch in range(epochs):
        model.train()
        train_loss, train_correct = 0.0, 0
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_correct += predicted.eq(labels).sum().item()
        
        train_acc = 100 * train_correct / len(train_loader.dataset)
        
        # Validation
        model.eval()
        val_loss, val_correct = 0.0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                val_correct += outputs.argmax(1).eq(labels).sum().item()
        
        val_acc = 100 * val_correct / len(val_loader.dataset)
        
        # Step the scheduler
        scheduler.step(val_loss)
        
        # Log metrics
        wandb.log({            
            "epoch": epoch + 1,
            "train_loss": train_loss / len(train_loader),
            "train_accuracy": train_acc,
            "val_loss": val_loss / len(val_loader),
            "val_accuracy": val_acc,
            "lr": optimizer.param_groups[0]['lr']
        })
        
        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {train_loss/len(train_loader):.4f}, "
              f"Train Acc: {train_acc:.2f}%, "
              f"Val Loss: {val_loss/len(val_loader):.4f}, "
              f"Val Acc: {val_acc:.2f}%")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
    
    return best_val_acc

def main():
    wandb.init(project="DL_A2")
    
    # Device configuration
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Get data loaders
    train_loader, val_loader, classes = get_dataloaders(
        data_dir="/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/train",
        batch_size=256,
        augment=True
    )
    
    # Initialize model
    model = OptimizedCNN(num_classes=len(classes))
    
    # Loss function
    criterion = nn.CrossEntropyLoss()
    
    # Optimizer with momentum and weight decay
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-5)
    
    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.1, patience=3, verbose=True)
    
    # Train the model
    best_val_acc = train(
        model, train_loader, val_loader, 
        optimizer, criterion, scheduler,
        device=device, epochs=30
    )
    
    wandb.summary["best_val_acc"] = best_val_acc
    wandb.finish()



In [None]:
sweep_id = wandb.sweep(sweep=sweep_config, project='DL_A2')
wandb.agent(sweep_id, function=main, count=20)


### Visualization on Test dataset.

In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from collections import defaultdict
import wandb
import random
from sklearn.metrics import confusion_matrix
import seaborn as sns

def create_prediction_grid(images, labels, preds, class_names, n_rows=10, n_cols=3, title="Predictions"):
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, n_rows*2))
    fig.suptitle(title, fontsize=16, y=1.02)
    
    for i in range(n_rows):
        for j in range(n_cols):
            idx = i * n_cols + j
            if idx >= len(images):
                break
                
            ax = axes[i,j]
            img = images[idx].numpy().transpose((1, 2, 0))
            mean = np.array([0.485, 0.456, 0.406])
            std = np.array([0.229, 0.224, 0.225])
            img = np.clip((img * std + mean), 0, 1)
            
            ax.imshow(img)
            ax.axis('off')
            
            true_label = class_names[labels[idx]]
            pred_label = class_names[preds[idx]]
            is_correct = preds[idx] == labels[idx]
            
            title_color = 'green' if is_correct else 'red'
            title_text = f"True: {true_label}\nPred: {pred_label}"
            ax.set_title(title_text, fontsize=9, color=title_color, pad=2)
    
    plt.tight_layout()
    return fig

def evaluate_testset(model_path, test_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Same transforms as validation
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # Load test dataset
    test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=4)
    class_names = test_dataset.classes

    # Load model
    model = OptimizedCNN(num_classes=len(class_names))
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    # Collect predictions and ground truth
    all_images = []
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            
            all_images.extend(images.cpu())
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    # Calculate overall accuracy
    accuracy = 100 * np.sum(np.array(all_labels) == np.array(all_preds)) / len(all_labels)
    print(f"Test Accuracy: {accuracy:.2f}%")

    # Initialize wandb
    wandb.init(project="DL_A2", name="test_evaluation", job_type="eval")
    wandb.log({"test_accuracy": accuracy})

    # Calculate class-wise accuracy
    class_correct = defaultdict(int)
    class_total = defaultdict(int)
    
    for label, pred in zip(all_labels, all_preds):
        class_total[label] += 1
        if label == pred:
            class_correct[label] += 1
    
    # Create accuracy table
    accuracy_table = wandb.Table(columns=["Class", "Accuracy", "Samples"])
    for class_idx in range(len(class_names)):
        if class_total[class_idx] > 0:
            acc = 100 * class_correct[class_idx] / class_total[class_idx]
        else:
            acc = float('nan')
        accuracy_table.add_data(class_names[class_idx], acc, class_total[class_idx])
    
    wandb.log({"class_accuracy": accuracy_table})

    # Create confusion matrix (only for classes with samples)
    present_classes = [c for c in range(len(class_names)) if class_total[c] > 0]
    if present_classes:
        cm = confusion_matrix(all_labels, all_preds, labels=present_classes)
        plt.figure(figsize=(12, 10))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=[class_names[c] for c in present_classes],
                   yticklabels=[class_names[c] for c in present_classes])
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.xticks(rotation=45, ha='right')
        plt.yticks(rotation=0)
        wandb.log({"confusion_matrix": wandb.Image(plt)})
        plt.close()

    # Create 10x3 prediction grid
    num_samples = 30  # 10 rows x 3 columns
    if len(all_images) >= num_samples:
        indices = random.sample(range(len(all_images)), num_samples)
        sample_images = [all_images[i] for i in indices]
        sample_labels = [all_labels[i] for i in indices]
        sample_preds = [all_preds[i] for i in indices]
        
        grid_fig = create_prediction_grid(
            sample_images, sample_labels, sample_preds, 
            class_names, title="Test Set Predictions (Random Sample)"
        )
        wandb.log({"prediction_grid": wandb.Image(grid_fig)})
        plt.close(grid_fig)
    else:
        print(f"Not enough samples ({len(all_images)}) to create full 10x3 grid")

    wandb.finish()

if __name__ == "__main__":
    evaluate_testset(
        model_path='/kaggle/input/cnn/pytorch/default/1/best_model.pth',
        test_dir='/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/val'
    )

### Feature map and Filter Analysis

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, datasets
import wandb

def visualize_first_layer(model, test_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Load transformation
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Load test dataset and get random image
    test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)
    random_idx = np.random.randint(0, len(test_dataset))
    img, label = test_dataset[random_idx]
    img = img.unsqueeze(0).to(device)  # Add batch dimension
    
    # Get the first convolutional layer
    first_conv = model.conv_blocks[0]
    num_filters = first_conv.out_channels  # Dynamically get number of filters
    
    # Visualize filters
    filters = first_conv.weight.data.cpu().numpy()
    
    # Normalize filters to 0-1 for visualization
    f_min, f_max = filters.min(), filters.max()
    filters = (filters - f_min) / (f_max - f_min)
    
    # Calculate grid size (square as possible)
    grid_size = int(np.ceil(np.sqrt(num_filters)))
    
    # Plot filters
    plt.figure(figsize=(12, 12))
    for i in range(num_filters):
        plt.subplot(grid_size, grid_size, i+1)
        # Show first channel only (assuming RGB input)
        plt.imshow(filters[i, 0], cmap='gray')
        plt.axis('off')
    plt.suptitle(f'First Layer Filters ({num_filters} total)', fontsize=16)
    plt.tight_layout()
    filters_fig = plt.gcf()
    
    # Get feature maps
    model.eval()
    with torch.no_grad():
        feature_maps = first_conv(img)
    
    # Normalize feature maps
    fmaps = feature_maps.squeeze(0).cpu().numpy()
    fmap_min, fmap_max = fmaps.min(), fmaps.max()
    fmaps = (fmaps - fmap_min) / (fmap_max - fmap_min)
    
    # Plot feature maps
    plt.figure(figsize=(12, 12))
    for i in range(num_filters):
        plt.subplot(grid_size, grid_size, i+1)
        plt.imshow(fmaps[i], cmap='viridis')
        plt.axis('off')
    plt.suptitle(f'Feature Maps ({num_filters} total)', fontsize=16)
    plt.tight_layout()
    fmap_fig = plt.gcf()
    
    # Show original image (denormalized)
    img_denorm = img.squeeze(0).cpu().numpy().transpose(1, 2, 0)
    img_denorm = img_denorm * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
    img_denorm = np.clip(img_denorm, 0, 1)
    
    plt.figure(figsize=(8, 8))
    plt.imshow(img_denorm)
    plt.title(f'Original Test Image\nClass: {test_dataset.classes[label]}')
    plt.axis('off')
    orig_fig = plt.gcf()
    
    # Additional analysis
    # 1. Filter magnitude distribution (fixed calculation)
    filter_magnitudes = torch.norm(first_conv.weight.data.view(num_filters, -1), p=2, dim=1).cpu().numpy()
    
    plt.figure(figsize=(10, 5))
    plt.hist(filter_magnitudes, bins=20)
    plt.title('Filter Magnitude Distribution')
    plt.xlabel('Magnitude (L2 norm)')
    plt.ylabel('Count')
    magnitude_fig = plt.gcf()
    
    # 2. Activation statistics
    activation_means = feature_maps.mean(dim=(0, 2, 3)).cpu().numpy()
    activation_max = feature_maps.amax(dim=(0, 2, 3)).cpu().numpy()
    
    plt.figure(figsize=(10, 5))
    plt.bar(range(num_filters), activation_means, alpha=0.5, label='Mean')
    plt.bar(range(num_filters), activation_max, alpha=0.5, label='Max')
    plt.title('Feature Map Activation Statistics')
    plt.xlabel('Filter Index')
    plt.ylabel('Activation Value')
    plt.legend()
    activation_fig = plt.gcf()
    
    # Log to wandb
    wandb.init(project="DL_A2", name="filter_visualization")
    wandb.log({
        "original_image": wandb.Image(orig_fig),
        "first_layer_filters": wandb.Image(filters_fig),
        "feature_maps": wandb.Image(fmap_fig),
        "filter_magnitudes": wandb.Image(magnitude_fig),
        "activation_stats": wandb.Image(activation_fig),
        "selected_class": test_dataset.classes[label]
    })
    
    plt.close('all')
    return {
        "num_filters": num_filters,
        "filter_magnitudes": filter_magnitudes,
        "activation_means": activation_means,
        "activation_max": activation_max
    }

# Load your model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_dataset = datasets.ImageFolder(root='/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/val', transform=transforms.ToTensor())
model = OptimizedCNN(num_classes=len(test_dataset.classes))
model.load_state_dict(torch.load('/kaggle/input/cnn/pytorch/default/1/best_model.pth', map_location=device))
model = model.to(device)

# Run visualization
results = visualize_first_layer(
    model, 
    test_dir='/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/val'
)

# Part B

## Question 7

### training The model

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np
import wandb


# ------------------------ Sweep Config ------------------------ #
sweep_config = {
    "method": "random",
    "metric": {"name": "val_accuracy", "goal": "maximize"},
    "parameters": {
        "augment": {"values": [True, False]},
        "batch_size": {"values": [64, 256]},
        "lr": {"values": [0.01, 0.001]},
        "epochs": {"value": 10}
    }
}


# ------------------------ Data Loader ------------------------ #
def get_dataloaders(data_dir, batch_size=256, val_split=0.2, augment=True):
    train_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(0.2, 0.2, 0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]) if augment else transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    val_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    full_dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)
    targets = np.array(full_dataset.targets)

    splitter = StratifiedShuffleSplit(n_splits=1, test_size=val_split, random_state=42)
    train_idx, val_idx = next(splitter.split(np.zeros(len(targets)), targets))

    train_set = Subset(full_dataset, train_idx)
    val_set = Subset(datasets.ImageFolder(root=data_dir, transform=val_transforms), val_idx)

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

    return train_loader, val_loader, full_dataset.classes


# ------------------------ Training Function ------------------------ #
def train(model, train_loader, val_loader, optimizer, criterion, scheduler, device, epochs=30):
    model.to(device)
    best_val_acc = 0

    for epoch in range(epochs):
        # Progressive Unfreezing
        if epoch == 5:
            for name, param in model.named_parameters():
                if "encoder.layer.10" in name or "encoder.layer.11" in name:
                    param.requires_grad = True

        model.train()
        train_loss, correct = 0, 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            correct += (out.argmax(1) == y).sum().item()
        train_acc = 100. * correct / len(train_loader.dataset)

        # Validation
        model.eval()
        val_loss, val_correct = 0, 0
        with torch.no_grad():
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                out = model(x)
                loss = criterion(out, y)
                val_loss += loss.item()
                val_correct += (out.argmax(1) == y).sum().item()
        val_acc = 100. * val_correct / len(val_loader.dataset)
        scheduler.step(val_loss)

        wandb.log({
            "epoch": epoch + 1,
            "train_loss": train_loss / len(train_loader),
            "train_accuracy": train_acc,
            "val_loss": val_loss / len(val_loader),
            "val_accuracy": val_acc,
            "lr": optimizer.param_groups[0]['lr']
        })

        print(f"Epoch {epoch+1}: Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_model_vit.pth")

    return best_val_acc


# ------------------------ Main Function ------------------------ #
def main():
    wandb.init(project="DL_A2")
    config = wandb.config
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_loader, val_loader, classes = get_dataloaders(
        data_dir="/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/train",
        batch_size=config.batch_size,
        augment=config.augment
    )

    # Load pre-trained ViT
    vit = models.vit_b_16(weights=models.ViT_B_16_Weights.DEFAULT)

    # Freeze all layers first
    for param in vit.parameters():
        param.requires_grad = False

    # Replace classifier head
    in_features = vit.heads[0].in_features
    vit.heads = nn.Sequential(
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(512, len(classes))
    )

    # Enable training on new head
    for param in vit.heads.parameters():
        param.requires_grad = True

    optimizer = optim.SGD(vit.parameters(), lr=config.lr, momentum=0.9, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)
    criterion = nn.CrossEntropyLoss()

    best_val_acc = train(
        model=vit,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        criterion=criterion,
        scheduler=scheduler,
        device=device,
        epochs=config.epochs
    )

    wandb.summary["best_val_acc"] = best_val_acc
    wandb.finish()


# ------------------------ Start Sweep ------------------------ #
sweep_id = wandb.sweep(sweep_config, project="DL_A2")
wandb.agent(sweep_id, function=main, count=20)

### Test the model

In [None]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import wandb
from sklearn.metrics import confusion_matrix
import random


def evaluate_vit_test(model_path, test_dir):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Transforms
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.5]*3, [0.5]*3)
    ])

    test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=2)
    class_names = test_dataset.classes

    # Load pre-trained ViT
    model = models.vit_b_16(pretrained=False)

    # Rebuild classifier head to match training setup
    in_features = model.heads[0].in_features
    model.heads = nn.Sequential(
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(512, len(class_names))
    )

    # Load trained weights
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    all_preds, all_labels = [], []
    correct = 0
    sample_imgs = []

    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            sample_imgs.extend(imgs.cpu())

    accuracy = 100 * correct / len(test_dataset)
    print(f"Test Accuracy: {accuracy:.2f}%")

    wandb.init(project="DL_A2", name="ViT Test Evaluation")
    wandb.log({"vit_test_accuracy": accuracy})

    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names, cmap='Blues')
    plt.title("Confusion Matrix - ViT")
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")
    plt.xticks(rotation=45)
    wandb.log({"confusion_matrix_vit": wandb.Image(plt)})
    plt.close()

    # Sample prediction grid
    def show_predictions(images, labels, preds):
        fig, axes = plt.subplots(5, 6, figsize=(15, 10))
        for idx, ax in enumerate(axes.flat):
            if idx >= len(images): break
            img = images[idx].permute(1, 2, 0).numpy()
            img = 0.5 * img + 0.5  # unnormalize
            ax.imshow(np.clip(img, 0, 1))
            color = 'green' if labels[idx] == preds[idx] else 'red'
            ax.set_title(f"True: {class_names[labels[idx]]}\nPred: {class_names[preds[idx]]}", color=color, fontsize=8)
            ax.axis('off')
        plt.tight_layout()
        return fig

    indices = random.sample(range(len(sample_imgs)), min(30, len(sample_imgs)))
    sample_imgs_subset = [sample_imgs[i] for i in indices]
    sample_labels_subset = [all_labels[i] for i in indices]
    sample_preds_subset = [all_preds[i] for i in indices]

    fig = show_predictions(sample_imgs_subset, sample_labels_subset, sample_preds_subset)
    wandb.log({"vit_test_predictions": wandb.Image(fig)})
    plt.close(fig)

    wandb.finish()
    return accuracy


# Example usage
if __name__ == "__main__":
    evaluate_vit_test(
        model_path="/kaggle/input/vit/pytorch/default/1/vit_best_model.pth",
        test_dir="/kaggle/input/d/d4debeniitm/nature-12k/inaturalist_12K/val"
    )