In [None]:
import wandb
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient() 

personal_key_for_api = user_secrets.get_secret("wandb-key")

! wandb login $personal_key_for_api

In [None]:
import os 
import io
from PIL import Image
from torch.utils.data import DataLoader 
from torchvision import datasets, transforms
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import wandb
from torch.optim.lr_scheduler import OneCycleLR
import copy

In [None]:
sweep_config = {
    "name": "CNN's hyperparameter search",
    "metric": {
        "name": "validationAccuracy",
        "goal": "maximize"
    },
    "method": "random",
    "parameters": {
        "filters": {
            "values": [32, 64]
            },
        "activationFunction": {
            "values": ['ReLU', 'GELU', 'SiLU']
            },
        "filterOrganisation": {
            "values": ['same', 'half']
            },
        "batchNormalisation": {
            "values": ['yes', 'no']
            },
        "dropout": {
            "values": [0.2, 0.3]
            }
    }
}

In [None]:
class CNN(nn.Module):
    def __init__(self, 
                 filters=32,
                 activationFunction='ReLU',
                 filterOrganisation='same', 
                 batchNormalisation='yes',
                 dropout=0.2,
                 num_classes=10
                 ):
        super(CNN, self).__init__()

        # Select activation function
        activations = {
            'ReLU': nn.ReLU(),
            'GELU': nn.GELU(),
            'SiLU': nn.SiLU()        
        }
        self.activation = activations.get(activationFunction, nn.ReLU())

        # Setup convolutional layers
        filter_list = [filters]
        for i in range(4):
            if filterOrganisation == 'half':
                filters = max(8, filters // 2)
            filter_list.append(filters)

        layers = []
        in_channels = 3  # RGB image

        for out_channels in filter_list:
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            if batchNormalisation == 'yes':
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(self.activation)
            layers.append(nn.MaxPool2d(2))  # Reduce H, W by half
            if dropout > 0:
                layers.append(nn.Dropout(dropout))
            in_channels = out_channels

        self.conv_layers = nn.Sequential(*layers)

        # After 5 MaxPool layers on 224x224: 224 -> 112 -> 56 -> 28 -> 14 -> 7
        self.flattened_size = filter_list[-1] * 7 * 7

        self.classifier = nn.Sequential(
            nn.Linear(self.flattened_size, 256),
            self.activation,
            nn.Dropout(dropout),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.classifier(x)
        return x

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)  # Normalize RGB to [-1, 1]
])

train_dataset = datasets.ImageFolder(root='/kaggle/input/inaturalist/inaturalist_12K/train', transform=transform)
test_dataset = datasets.ImageFolder(root='/kaggle/input/inaturalist/inaturalist_12K/val', transform=transform)

In [None]:
from torch.utils.data import Subset, random_split
import random

# Define the percentage or number of samples you want
num_train_samples = 6000
train_indices = random.sample(range(len(train_dataset)), num_train_samples)
train_subset = Subset(train_dataset, train_indices)

val_size = 1000
train_size = 5000
train_subset, val_subset = random_split(train_subset, [train_size, val_size])

num_test_samples = 1000
test_indices = random.sample(range(len(test_dataset)), num_test_samples)
test_subset = Subset(test_dataset, test_indices)

# Create data loaders
train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_subset, batch_size=64, shuffle=False)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
def train(filters, activationFunction, filterOrganisation, batchNormalisation, dropout) :

    model = CNN(filters = filters,
                activationFunction = activationFunction ,
                filterOrganisation = filterOrganisation ,
                batchNormalisation = batchNormalisation,
                dropout = dropout).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(42): 
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_acc = 100. * correct / total

        wandb.log({
                "epoch" : epoch, 
                "train_acc" : train_acc
            })

        print('Epoch -', epoch, '- Train accuracy -', train_acc)
    
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    val_acc = 100. * correct / total

    return train_acc, val_acc, model   

In [None]:
def sweep_hyperparameters() :
    
    default = {
        'filters' : 32, 
        'activationFunction' : 'ReLU', 
        'filterOrganisation' : 'same', 
        'batchNormalisation' : 'yes', 
        'dropout' : 0.2
    }

    wandb.init(project= "assignment2", entity= "da6401-assignments")
    wandb.init(config= default)

    config = wandb.config 

    filters = config.filters  
    activationFunction = config.activationFunction 
    filterOrganisation = config.filterOrganisation 
    batchNormalisation = config.batchNormalisation 
    dropout = config.dropout 

    wandb.run.name = '#'.join(map(str,(
        filters, activationFunction, filterOrganisation, batchNormalisation, dropout
    )))

    trainacc, valacc, model = train(filters, activationFunction, filterOrganisation, batchNormalisation, dropout)

    wandb.log({
        'validationAccuracy' : valacc, 
        'trainAccuracy' : trainacc
    })

    wandb.run.save()

    return model

In [None]:
sweepId = wandb.sweep(sweep_config, entity="da6401-assignments", project="assignment2")
wandb.agent(sweepId, sweep_hyperparameters, count=48)
wandb.finish()

In [None]:
api = wandb.Api()
runs = api.runs("da6401-assignments/assignment2")
best_run = max(runs, key=lambda run: run.summary.get("validationAccuracy", float("inf")))

print(f"Best run name: {best_run.name}. \nValidation Accuracy: {best_run.summary.get('validationAccuracy')}")

In [None]:
train_acc, val_acc, model = train(
    filters = 64, 
    activationFunction = 'ReLU', 
    filterOrganisation = 'half', 
    batchNormalisation = 'yes', 
    dropout = 0.3
)

In [None]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

test_acc = 100. * correct / total

In [None]:
images, labels = next(iter(test_loader))
images = images[:30]  
labels = labels[:30]

In [None]:
images = images.to(device)

In [None]:
model.eval()

In [None]:
with torch.no_grad():
    outputs = model(images)
    _, preds = torch.max(outputs, 1)

In [None]:
label_class = sorted(os.listdir('/kaggle/input/inaturalist/inaturalist_12K/train'))

In [None]:
# Move tensors to CPU for plotting
images = images.cpu()
preds = preds.cpu()
labels = labels.cpu()

# Create grid
fig, axes = plt.subplots(10, 3, figsize=(10, 30))
axes = axes.flatten()

for i in range(30):
    img = images[i]
    img = img.permute(1, 2, 0).numpy()

    axes[i].imshow(img)
    pred_label = label_class[preds[i].item()]
    true_label = label_class[labels[i].item()]
    
    # Optional: color title based on correctness
    color = "green" if preds[i] == labels[i] else "red"
    axes[i].set_title(f'Pred: {pred_label}\nAct: {true_label}', color=color)
    axes[i].axis('off')

plt.tight_layout()

# Save figure to a buffer instead of showing
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
plt.close(fig)

# Convert buffer to PIL Image and log to wandb
wandb.log({"Predictions Grid": wandb.Image(Image.open(buf))})