This notebook is meant for doing things which are necessary for report point of view

In [1]:
# Plotting the prediction images from the test folder
# Importing the necessary libraries #
import torch
import numpy as np
import torch.nn as nn
import torchvision.transforms.functional as F
import lightning as L
from typing import List
from lightning.pytorch import Trainer
from torchvision.datasets import MNIST
from torchvision import transforms
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import precision_score
from torchmetrics import Accuracy

In [2]:
torch.cuda.empty_cache()
torch.set_float32_matmul_precision("medium")

In [3]:
import wandb
from lightning.pytorch.loggers import WandbLogger
wandb.login()
# Initializing wandb logger #
wandb_logger = WandbLogger(
    entity="A2_DA6401_DL",
    project="Lightning_CNN",       
)

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mae21b105[0m to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [4]:
wandb.login(key="5ef7c4bbfa350a2ffd3c198cb9289f544e3a0910")

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /home/joel/.netrc


True

In [5]:
# Function to give the activation function #
def return_activation_function(activation : str = "ReLU"):
    possible_activations = ["ReLU", "Mish", "GELU", "SELU", "SiLU", "LeakyReLU" ]
    # Assertion to be made for the activations possible #
    assert activation in possible_activations, f"activation not in {possible_activations}"

    if activation == "ReLU":
        return nn.ReLU()
    elif activation == "GELU":
        return nn.GELU()
    elif activation == "SiLU":
        return nn.SiLU()
    elif activation == "SELU":
        return nn.SELU()
    elif activation == "Mish":
        return nn.Mish()
    else:
        return nn.LeakyReLU()

In [6]:
class CNN_(nn.Module):
    def __init__(self, config = None):
        super().__init__()
        # Configuration to build the CNN #
        self.config = config
        
        # Some assertions to be made #
        assert config["no_of_conv_blocks"]==len(config["no_of_filters"]), "The filter number do not match with number of conv layers"
        assert config["no_of_conv_blocks"]==len(config["filter_sizes"]), "The filter sizes do not match with number of conv layers"
        assert config["no_of_conv_blocks"]==len(config["conv_strides"]), "The strides do not match with number of conv layers"
        assert config["no_of_conv_blocks"]==len(config["conv_padding"]), "The padding do not match with number of conv layers"
        assert config["no_of_conv_blocks"]==len(config["max_pooling_stride"]), "The max pooling stride do not match with number of conv layers"

        # building the convolution blocks #
        conv_blocks = []
        for block_no in range(config["no_of_conv_blocks"]):
            # Getting the hyper-parameters from the config #
            if block_no == 0:
                in_channels = config["input_channels"]
            else:
                in_channels = config["no_of_filters"][block_no-1]
            out_channels = config["no_of_filters"][block_no]
            filter_size = config["filter_sizes"][block_no]
            stride = config["conv_strides"][block_no]
            padding = config["conv_padding"][block_no]
            if padding == None:
                padding = int((filter_size - 1)/2) if filter_size > 1 else 0
            # Defining the block to add to conv_blocks #
            block_add = nn.Sequential(
                nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=filter_size, stride=stride, padding=padding),
                nn.BatchNorm2d(num_features=out_channels) if config["batch_norm_conv"] else nn.Identity(),
                return_activation_function(activation=config["conv_activation"][block_no]),
                nn.MaxPool2d(kernel_size=config["max_pooling_kernel_size"][block_no],stride=config["max_pooling_stride"][block_no]) if config["max_pooling_stride"][block_no] != None else nn.Identity(),
                nn.Dropout(config["dropout_conv"]) if config["dropout_conv"]>0 else nn.Identity(),
            )
            # Appending the blocks to the total #
            conv_blocks.append(block_add)

        # Converting the list to a sequential module #
        self.conv_blocks = nn.Sequential(*conv_blocks)

        # Calculating the size of the output #
        dummy_in = torch.randn(size=(1, config["input_channels"],config["input_size"][0], config["input_size"][1]))
        dummy_out = self.conv_blocks(dummy_in).flatten()
        flat_size = len(dummy_out)

        # building the fc blocks #
        fc_blocks = []
        for block_no in range(config["no_of_fc_layers"]):
            if block_no == 0:
                in_channels = flat_size
            else:
                in_channels = config["fc_neurons"][block_no-1]
            out_channels = config["fc_neurons"][block_no]
            block_add = nn.Sequential(
                nn.Linear(in_features=in_channels, out_features=out_channels),
                nn.BatchNorm1d(out_channels) if config["batch_norm_fc"] else nn.Identity(),
                return_activation_function(activation=config["fc_activations"][block_no]),
                nn.Dropout(config["dropout_fc"]) if config["dropout_fc"]>0 else nn.Identity(),
            )
            # Appending to the fc final
            fc_blocks.append(block_add)

        # converting the list to a sequential module #
        self.fc_layers = nn.Sequential(*fc_blocks)

        # Output layer #
        self.output_layer = nn.Sequential(
            nn.Linear(in_features=config["fc_neurons"][-1], out_features=config["num_classes"]),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        x = self.conv_blocks(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        x = self.output_layer(x)
        return x

In [7]:
class Lightning_CNN(L.LightningModule):
    def __init__(self, config):
        super().__init__()
        self.save_hyperparameters()

        # Define the model
        self.model = CNN_(config=config)

        # Defining the loss and optimizers
        self.loss_fn = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.parameters(), lr = config["learning_rate"])

        # Defining the metrics
        self.acc_metric = Accuracy(task="multiclass", num_classes=config["num_classes"], average="weighted")

    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx):
        input_, target_ = batch
        output_ = self(input_)
        # Finding the loss to backprop #
        loss = self.loss_fn(output_, target_)
        # Logging the metrics #
        self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True, sync_dist=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        input_, target_ = batch
        output_ = self(input_)
        # Finding the loss to backprop #
        loss = self.loss_fn(output_, target_)

        output_pred = torch.argmax(output_, dim=1) 
        acc = self.acc_metric(output_pred, target_)
        # Logging the metrics #
        self.log("val_loss", loss, prog_bar=True, logger=True, sync_dist=True)
        self.log("val_acc", acc, prog_bar=True, logger=True, sync_dist=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        input_, target_ = batch
        output_ = self(input_)
        # Finding the loss to backprop #
        loss = self.loss_fn(output_, target_)
        
        output_pred = torch.argmax(output_, dim=1) 
        acc = self.acc_metric(output_pred, target_)
        # Logging the metrics #
        self.log("test_loss", loss, prog_bar=True, logger=True, sync_dist=True)
        self.log("test_acc", acc, prog_bar=True, logger=True, sync_dist=True)
        return loss
    
    def configure_optimizers(self):
        return self.optimizer


In [9]:
# testing 
best_model_path = "/home/joel/Pytorch_CUDA/checkpoints/best-checkpoint_2.ckpt"
model = Lightning_CNN.load_from_checkpoint(best_model_path)

In [11]:
# Get labels for stratified split
labels = [sample[1] for sample in test_dataset.samples]
# Stratified split #
from sklearn.model_selection import train_test_split

_ , print_indices = train_test_split(
    np.arange(len(labels)),
    test_size=30,
    stratify=labels,
)

# Create subsets
print_dataset = Subset(test_dataset, print_indices)
print_dataset.dataset.transform = data_transforms["orient_"] 

batch_size = 30
num_workers = 2 # Adaptive number of workers

print_loader = DataLoader(
    print_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=num_workers,
    #pin_memory=True,
    drop_last=True  # Helps with batch norm stability
)


In [12]:
wandb.init(config=None, entity="A2_DA6401_DL", project="Reporter")
table = wandb.Table(columns=["Ground Truth", "Image 1", "Image 2", "Image 3"])

val = 0
for batch in print_loader:
    x,y = batch

pred = torch.argmax(model(x), dim=1)

for i in range(10):
    true_label = print_dataset.dataset.classes[i]

    table.add_data(
        true_label,
        wandb.Image(x[y==i][0], caption=print_dataset.dataset.classes[pred[y==i][0]]),
        wandb.Image(x[y==i][1], caption=print_dataset.dataset.classes[pred[y==i][1]]),
        wandb.Image(x[y==i][2], caption=print_dataset.dataset.classes[pred[y==i][2]]),
    )
    
    val += torch.sum(pred == i)
wandb.log({"Grid of test predictions table": table})

val

    

tensor(30)

In [13]:
wandb.finish()

In [14]:
print_dataset.dataset.classes

['Amphibia',
 'Animalia',
 'Arachnida',
 'Aves',
 'Fungi',
 'Insecta',
 'Mammalia',
 'Mollusca',
 'Plantae',
 'Reptilia']

In [15]:
val = 0
for batch in print_loader:
    x, y = batch
    input = x
    
    print(torch.argmax(model(input),dim=1))
    print(y)

    val += torch.sum(y == torch.argmax(model(input),dim=1))

val

tensor([3, 8, 2, 4, 3, 0, 3, 8, 5, 7, 7, 9, 4, 6, 9, 0, 2, 1, 7, 8, 8, 5, 9, 9,
        1, 0, 2, 5, 4, 8])
tensor([6, 9, 2, 8, 9, 7, 3, 0, 6, 7, 3, 0, 4, 3, 2, 9, 5, 1, 7, 8, 8, 5, 0, 6,
        1, 2, 4, 5, 1, 4])


tensor(13)