<a href="https://colab.research.google.com/github/Sai-sakunthala/Assignment2/blob/main/test_data_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install lightning

In [None]:
import os
import random
import torch
import wandb
import numpy as np
import matplotlib.pyplot as plt
import pytorch_lightning as pl
from torchvision import transforms, datasets
from torchvision.datasets import ImageFolder
from torch import nn
import torch.nn.functional as functional
from torch.utils.data import DataLoader, random_split, Subset
from collections import defaultdict
from pytorch_lightning.loggers import WandbLogger

class CNN(pl.LightningModule):
    def __init__(self, initial_in_channels=3, num_classes=10, num_conv_layers=5, num_filters=64, kernel_size=3, activation_fn=nn.SiLU,
                 dense_neurons=256, learning_rate=1e-3, use_batchnorm=True, dropout_rate=0.3, filter_organization='same', data_augmentation = True):

        super().__init__()
        self.save_hyperparameters()

        # initialize a list to save all convolution layers
        layers_conv = []

        #number of imput images channels which is 3 in our case
        input_channels = initial_in_channels

        #variable to track filters in current layer
        current_filters = num_filters

        #loop over number of convolution layers
        for i in range(num_conv_layers):
            #number of output channels needed
            out_channels = current_filters

            #convolution layer with padding
            layers_conv.append(nn.Conv2d(input_channels, out_channels, kernel_size = kernel_size, padding = kernel_size//2))

            #if batch normalization is specified use it
            if use_batchnorm:
                layers_conv.append(nn.BatchNorm2d(out_channels))

            #activation layer
            layers_conv.append(activation_fn())

            #dropout is added after activation layer
            if dropout_rate == 0:
                layers_conv.append(nn.Dropout(dropout_rate))

            #maxpool layer
            layers_conv.append(nn.MaxPool2d(kernel_size=2, stride=2))

            #update input channels
            input_channels = out_channels

            #update number of filters for following layers based on configuration
            if filter_organization == 'double':
                current_filters *= 2
            elif filter_organization == 'half':
                current_filters = max(4, current_filters // 2)

        #add all layers as convolution block
        self.conv_block = nn.Sequential(*layers_conv)

        #dense layer
        self.fc1 = nn.LazyLinear(dense_neurons)
        self.bn_fc1 = nn.BatchNorm1d(dense_neurons) if use_batchnorm else None
        self.activation_dense = activation_fn()
        self.dropout_fc1 = nn.Dropout(dropout_rate) if dropout_rate == 0 else None

        #final classification layer
        self.fc2 = nn.Linear(dense_neurons, num_classes)
        self.learning_rate = learning_rate

    def forward(self, x):
        #forward propagation of network
        x = self.conv_block(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        if self.hparams.use_batchnorm:
            x = self.bn_fc1(x)
        x = self.activation_dense(x)
        if self.hparams.dropout_rate == 0:
            x = self.dropout_fc1(x)
        x = self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        #training in batches
        x, y = batch
        y_hat = self(x)
        loss = functional.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(dim=1) == y).float().mean()

        #log metrics
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        #validation in batches
        x, y = batch
        y_hat = self(x)
        loss = functional.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(dim=1) == y).float().mean()

        #log metrics
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)

    def test_step(self, batch, batch_idx):
        #useful for testing the model later
        x, y = batch
        y_hat = self(x)
        loss = functional.cross_entropy(y_hat, y)
        acc = (y_hat.argmax(dim=1) == y).float().mean()

        #log metrics
        self.log("test_loss", loss, prog_bar=True)
        self.log("test_acc", acc, prog_bar=True)
        return {"test_loss": loss, "test_acc": acc}

    def configure_optimizers(self):
        #adam optimizer with weightdecay
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay = 5e-5)

        #learning rate scheduler
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=30)
        return [optimizer], [scheduler]

#Load best model from sweep
api = wandb.Api()
sweep_id = '1f9810oq'
sweep_path = f"sai-sakunthala-indian-institute-of-technology-madras/cnn-sweep/{sweep_id}"
sweep = api.sweep(sweep_path)
best_run = max(sweep.runs, key=lambda r: r.summary.get('val_acc', 0))
artifact = best_run.logged_artifacts()[0]
artifact_dir = artifact.download()
ckpt_path = os.path.join(artifact_dir, "model.ckpt")

#Load the model
best_model = CNN.load_from_checkpoint(ckpt_path)
best_model.eval()

#Test transform
test_transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

#Load test set
test_dir = "/content/inaturalist_data/inaturalist_12K/val"
test_dataset = ImageFolder(test_dir, transform=test_transform)

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2, pin_memory=True)
class_names = test_dataset.classes

#Evaluate test accuracy
test_run = wandb.init(project="cnn-sweep", name="final_test_evaluation", job_type="evaluation")
test_trainer = pl.Trainer(accelerator = 'auto', logger=False)
test_results = test_trainer.test(best_model, dataloaders=test_loader)

from collections import defaultdict

#Store per-class correct and total counts
class_correct = defaultdict(int)
class_total = defaultdict(int)

best_model.eval()
with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(best_model.device), y.to(best_model.device)
        outputs = best_model(x)
        preds = outputs.argmax(dim=1)

        for label, pred in zip(y, preds):
            class_total[label.item()] += 1
            if label == pred:
                class_correct[label.item()] += 1

In [None]:
#Compute accuracy per class
class_wise_accuracy = {}
for class_idx in range(len(class_names)):
    correct = class_correct[class_idx]
    total = class_total[class_idx]
    acc = 100 * correct / total if total > 0 else 0.0
    class_wise_accuracy[class_names[class_idx]] = acc

#Log runs
test_run.log({"📊 Class-wise Test Accuracy": class_wise_accuracy})
test_run.finish()

wandb.init(project="cnn-sweep", name="final_test_table", job_type="evaluation")

#Create the table with columns for class names and accuracy values
table = wandb.Table(columns=["Class", "Accuracy"])

#Add data from the class_wise_accuracy dictionary to the table
for class_name, accuracy in class_wise_accuracy.items():
    table.add_data(class_name, accuracy)

#Log the table to wandb
wandb.log({"Class-wise Accuracy Table": table})

#Finish the wandb run
wandb.finish()

In [None]:
import cv2  #For adding borders

#Target per class
target_total_per_class = 3

#Initialize wandb
wandb.init(project="inatiralist_finetune", name="final_test_images_2", job_type="evaluation")

#Set model to evaluation mode
best_model.eval()

#Collect images per class (label-wise)
samples_per_class = {i: [] for i in range(len(class_names))}

#Function to add border (green if correct, red otherwise)
def add_border_to_image(img, is_correct):
    img = (img * 255).astype(np.uint8)
    color = (0, 255, 0) if is_correct else (255, 0, 0)
    thickness = 5
    return cv2.copyMakeBorder(img, thickness, thickness, thickness, thickness, cv2.BORDER_CONSTANT, value=color)

# Collect predictions and prepare images
with torch.no_grad():
    for idx in range(len(test_dataset)):
        img, true_label = test_dataset[idx]
        input_tensor = img.unsqueeze(0).to(best_model.device)
        output = best_model(input_tensor)
        pred_label = output.argmax(dim=1).item()

        # De-normalize image
        img_disp = img.permute(1, 2, 0).cpu().numpy()
        img_disp = img_disp * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
        img_disp = np.clip(img_disp, 0, 1)

        # Determine prediction correctness (for display only)
        correct = pred_label == true_label
        img_with_border = add_border_to_image(img_disp, correct)
        caption = f"True: {class_names[true_label]} | Pred: {class_names[pred_label]}"
        wandb_img = wandb.Image(img_with_border, caption=caption)

        # Collect up to 3 samples per class
        if len(samples_per_class[true_label]) < target_total_per_class:
            samples_per_class[true_label].append(wandb_img)

        # Exit once all classes are satisfied
        if all(len(v) >= target_total_per_class for v in samples_per_class.values()):
            break

#flatten to a list
wandb_imgs = [img for images in samples_per_class.values() for img in images]

# Log to wandb
wandb.log({"Classwise Predictions Grid": wandb_imgs})
wandb.finish()