In [2]:
import torch.nn as nn
import torchvision

In [None]:
!pip install wandb
import wandb
wandb.init(project='Assignment 2', entity='CS23M028')

In [9]:
from PIL import Image
from numpy import asarray
import os
import numpy as np
import math

import torch
from torchvision import datasets, transforms
import torch
import torch.nn.functional as F  # Parameterless functions, like (some) activation functions
import torchvision.datasets as datasets  # Standard datasets
import torchvision.transforms as transforms  # Transformations we can perform on our dataset for augmentation
from torch import optim  # For optimizers like SGD, Adam, etc.
from torch import nn  # All neural network modules
from torch.utils.data import (
    DataLoader,Subset
) 
from tqdm import tqdm

In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
#Applying the transformation to scale down the image for computational reasons 
resizeHeight = 256
resizeWidth = 256

transform = transforms.Compose([transforms.Resize((resizeHeight,resizeWidth)),
transforms.ToTensor(),transforms.Lambda(lambda x: x/255.0)])


augmen_transform = transforms.Compose([

    transforms.RandomHorizontalFlip(),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3)], p=0.5),

])

In [None]:
#Loading the data from the disk
data_dir = '/kaggle/input/inaturalist12k/Data/inaturalist_12K/train/'
train_dataset = datasets.ImageFolder(data_dir, transform=transform)

In [None]:
from torch.utils.data import DataLoader, random_split


# Define the percentage of validation data
validation_ratio = 0.2  # 20%

num_validation_samples = int(validation_ratio * len(train_dataset))

train_size = len(train_dataset) - num_validation_samples
train_dataset, val_dataset = random_split(train_dataset, [train_size, num_validation_samples])

In [None]:
class_indices = [[] for _ in range(len(train_dataset.classes))]

for i, (image, label) in enumerate(train_dataset):
    class_indices[label].append(i)

train_indices = []
val_indices = []
Augtest_indices = []

for indices in class_indices:
  n = len(indices)
  train_size = int(0.8 * n)
  val_size = int(0.2 * n)
  Augtest_size = n/5

  train_indices.extend(indices[:train_size])
  val_indices.extend(indices[train_size:train_size+val_size])
  Augtest_indices.extend(indices[:Augtest_size])

new_train_dataset = Subset(train_dataset, train_indices)
new_val_dataset = Subset(train_dataset, val_indices)
Augtest_dataset = Subset(train_dataset, Augtest_indices)

In [None]:
data_dir = '/kaggle/input/inaturalist12k/Data/inaturalist_12K/val'
test_dataset = datasets.ImageFolder(data_dir, transform=transform)

In [7]:
#Computing the dense layer input feature count from the necessary input parameters
def cnn_calc(n,p,f,s):
  return int(((math.floor(n + 2*p - f)/(s*1.0)) + 1))


def calculate_denseLayer(filter_seq,channel_list):
  padding = 1
  stride = 1
  hidden_count = 5
  initial_nodes_w = 256
  initial_nodes_b = 256

  for layers in range(1,hidden_count+1):
    initial_nodes_w = cnn_calc(initial_nodes_w,padding,filter_seq[layers-1],stride)
    initial_nodes_w = cnn_calc(initial_nodes_w,0,2,2)

    initial_nodes_b = cnn_calc(initial_nodes_b,padding,filter_seq[layers-1],stride)
    initial_nodes_b = cnn_calc(initial_nodes_b,0,2,2)

  lastConvNodes = initial_nodes_w*initial_nodes_b*channel_list[-1]
  return lastConvNodes

In [3]:
#Model declaration and layer declaration
class CNN(nn.Module):
    def __init__(self, in_channels=3, num_classes=10,channel_list=[32,32,64,64,128],kernel = [3,3,3,3,3],fcLayerNodes = 192,dropOut = 0,activation_function = torch.nn.ReLU(),batchNorm='false'):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=channel_list[0],
            kernel_size=kernel[0],
            stride=1,
            padding=1,
        )
        self.dropout1 = nn.Dropout(dropOut)
        self.batchNorm1 = nn.BatchNorm2d(channel_list[0])
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv2d(
            in_channels=channel_list[0],
            out_channels=channel_list[1],
            kernel_size=kernel[1],
            stride=1,
            padding=1,
        )
        self.dropout2 = nn.Dropout(dropOut)
        self.batchNorm2 = nn.BatchNorm2d(channel_list[1])
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv2d(
            in_channels=channel_list[1],
            out_channels=channel_list[2],
            kernel_size=kernel[2],
            stride=1,
            padding=1,
        )
        self.dropout3 = nn.Dropout(dropOut)
        self.batchNorm3 = nn.BatchNorm2d(channel_list[2])
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv4 = nn.Conv2d(
            in_channels=channel_list[2],
            out_channels=channel_list[3],
            kernel_size=kernel[3],
            stride=1,
            padding=1,
        )
        self.dropout4 = nn.Dropout(dropOut)
        self.batchNorm4 = nn.BatchNorm2d(channel_list[3])
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv5 = nn.Conv2d(
            in_channels=channel_list[3],
            out_channels=channel_list[4],
            kernel_size=kernel[4],
            stride=1,
            padding=1,
        )
        self.dropout5 = nn.Dropout(dropOut)
        self.batchNorm5 = nn.BatchNorm2d(channel_list[4])
        self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2)

        #self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(calculate_denseLayer(kernel,channel_list), fcLayerNodes)
        self.dropout6 = nn.Dropout(dropOut)
        self.batchNorm6 = nn.BatchNorm1d(fcLayerNodes)
        self.fc2 = nn.Linear(fcLayerNodes, num_classes)
        self.activation = activation_function
        self.isBatchNorm = batchNorm

#Forward Pass

    def forward(self, x):
        if(self.isBatchNorm == 'false'):
            x = self.activation(self.conv1(x))
            x = self.dropout1(x)
            x = self.pool1(x)

            x = self.activation(self.conv2(x))
            x = self.dropout2(x)
            x = self.pool2(x)

            x = self.activation(self.conv3(x))
            x = self.dropout3(x)
            x = self.pool3(x)

            x = self.activation(self.conv4(x))
            x = self.dropout4(x)
            x = self.pool4(x)

            x = self.activation(self.conv5(x))
            x = self.dropout5(x)
            x = self.pool5(x)

            x = x.reshape(x.shape[0], -1)
            x = self.activation(self.fc1(x))
            x = self.dropout6(x)
            x = self.fc2(x)
        else:
            x = self.activation(self.batchNorm1(self.conv1(x)))
            x = self.dropout1(x)
            x = self.pool1(x)

            x = self.activation(self.batchNorm2(self.conv2(x)))
            x = self.dropout2(x)
            x = self.pool2(x)

            x = self.activation(self.batchNorm3(self.conv3(x)))
            x = self.dropout3(x)
            x = self.pool3(x)

            x = self.activation(self.batchNorm4(self.conv4(x)))
            x = self.dropout4(x)
            x = self.pool4(x)

            x = self.activation(self.batchNorm5(self.conv5(x)))
            x = self.dropout5(x)
            x = self.pool5(x)

            x = x.reshape(x.shape[0], -1)
            x = self.activation(self.batchNorm6(self.fc1(x)))
            x = self.dropout6(x)
            x = self.fc2(x)
            
        return x

In [None]:
#Training the CNN from the appropriate parameters
def trainCnn(config,train_data,val_data,test_data):
  num_classes = 10
  input_channels = 3
  num_epochs = config.epochs #epochs
  filter_seq =  config.filter_customization
  activation = config.activation #activation
  dropout = config.dropout #dropout
  learning_rate = config.learning_rate #learning_rate
  batch_Size = config.batch_size #batch_size
  data_augmen = config.data_aug #data_aug
  denseLayerNodes = config.fc_layer_nodes #fc_layer_nodes
  fullyConnect = 2
  kernel_list = config.kernel_customization
  optimizer = config.optimizer
  augmentation = config.data_aug
  weightDecay = config.weight_decay
  isBatchNorm = config.batch_norm
  activation_fun = config.activation
  if(config.activation == 'relu'):
    activation_fun = torch.nn.ReLU()
  elif(config.activation == 'selu'):
    activation_fun = torch.nn.SELU()
  else:
    activation_fun = torch.nn.ELU()

#Model Object creation
  model = CNN( in_channels=input_channels, num_classes=num_classes,channel_list=filter_seq,kernel=kernel_list,fcLayerNodes=denseLayerNodes,dropOut = dropout,activation_function=activation_fun,batchNorm = isBatchNorm).to(device)
#Setting up the loss function with the optimizers
  criterion = nn.CrossEntropyLoss()
  if(optimizer == 'adam'):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)
  elif(optimizer == 'sgd'):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)
  else:
    optimizer = optim.NAdam(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)

#Training of the model
  for epoch in range(num_epochs):
      for batch_idx, (data, targets) in enumerate(tqdm(train_data)):
          data = data.to(device=device)
          targets = targets.to(device=device)

          scores = model(data)
          loss = criterion(scores, targets)

          optimizer.zero_grad()
          loss.backward()
          optimizer.step()
      val_acc,val_loss = check_accuracy(val_data, model)
      #train_acc1,train_loss1 = check_accuracy(train_data, model)  
      train_acc,train_loss = check_accuracy(train_data, model)
      wandb.log({"Acc_val":val_acc,"Acc_val_loss":val_loss,"Acc_train":train_acc,"Acc_train_loss":train_loss,"epoch":epoch})

In [None]:
#Function to check the accuracy of the model
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    total_loss = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device)
            y = y.to(device=device)

            scores = model(x)
            loss = F.cross_entropy(scores, y)
            total_loss += loss.item()

            _, predictions = scores.max(1) #Compute the classified labels for the input features
            num_correct += (predictions == y).sum().item()
            num_samples += predictions.size(0)

    accuracy = num_correct / num_samples
    average_loss = total_loss / len(loader)

    model.train()
    return accuracy, average_loss

In [None]:
#Parameter value declaration
sweep_config = {
    'name':"my-sweep-cnn",
    'method': 'bayes',
    'metric': {
      'name': 'accuracy',
      'goal': 'maximize'
    },

    'parameters': {
        'epochs': {
            'values': [5,10,20]
        },
        'filter_customization': {
            'values':[[8,16,32,64,128],[16,32,64,128,256],[32,64,128,256,512],[32,64,64,128,256],[16,64,96,256,460],[32,32,32,32,32]]
        },
        'kernel_customization': {
            'values':[[3,3,3,3,3],[3,3,5,5,7],[3,3,5,5,3],[3,5,7,5,5]]
        },
        'activation': {
            'values': ['elu','selu','relu','ELU']
        },
        'dropout': {
            'values':[0,0.1,0.2]
        },
        'learning_rate': {
            'values': [1e-3,1e-4]
        },
        'weight_decay':
        {
            'values':[0,0.0005]
        },
        'optimizer': {
            'values': ['adam','sgd','nadam']
        },
        'batch_size' : {
            'values':[16,32,48,64,80]
        },
        'data_aug' : {
            'values':['true','false']
        },
        'batch_norm' : {
            'values':['true','false']
        },
        
        'fc_layer_nodes' : {
            'values':[128,192,256,512]
        }


        }
}


sweep_id = wandb.sweep(sweep_config, entity="CS23M028", project="Assignment 2")

In [None]:
model1 = CNN()
def fitModel():
  with wandb.init() as run:
    config = wandb.config

    wandb.run.name = "hidden_" + str(config.epochs)+"filter_size"+str(config.filter_customization)+"_acc_"+ str(config.fc_layer_nodes)
    np.random.seed(1)

    train_data = torch.utils.data.DataLoader(train_dataset, batch_size=wandb.config.batch_size, shuffle=True,num_workers = 4, pin_memory=True)
    val_data = torch.utils.data.DataLoader(val_dataset, batch_size=wandb.config.batch_size, shuffle=True,num_workers = 4, pin_memory=True)

    test_data = torch.utils.data.DataLoader(test_dataset, batch_size=wandb.config.batch_size, shuffle=True, num_workers = 4,pin_memory=True)


    trainCnn(config,train_data,val_data,test_data)

wandb.agent(sweep_id,fitModel,entity="CS23M028", project="Assignment 2")

In [4]:
#Use this to return the model which can be used later to evaluate on test data
def ModelCnn(config,train_data):
  num_classes = 10
  input_channels = 3
  num_epochs = config['epochs'] #epochs
  filter_seq = config['filter_customization']
  activation = config['activation'] #activation
  dropout = config['dropout'] #dropout
  learning_rate = config['learning_rate'] #learning_rate
  batch_Size = config['batch_size'] #batch_size
  data_augmen = config['data_aug'] #data_aug
  denseLayerNodes = config['fc_layer_nodes'] #fc_layer_nodes
  fullyConnect = 2
  kernel_list = config['kernel_customization']
  optimizer = config['optimizer']
  augmentation = config['data_aug']
  weightDecay = config['weight_decay']
  isBatchNorm = config['batch_norm']
  activation_fun = config['activation']
  if(config.activation == 'relu'):
    activation_fun = torch.nn.ReLU()
  elif(config.activation == 'selu'):
    activation_fun = torch.nn.SELU()
  else:
    activation_fun = torch.nn.ELU()


  model = CNN( in_channels=input_channels, num_classes=num_classes,channel_list=filter_seq,kernel=kernel_list,fcLayerNodes=denseLayerNodes,dropOut = dropout,activation_function=activation_fun,batchNorm = isBatchNorm).to(device)

  criterion = nn.CrossEntropyLoss()
  if(optimizer == 'adam'):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)
  elif(optimizer == 'sgd'):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)
  else:
    optimizer = optim.NAdam(model.parameters(), lr=learning_rate,weight_decay = config.weight_decay)


  for epoch in range(num_epochs):
      for batch_idx, (data, targets) in enumerate(tqdm(train_data)):
          data = data.to(device=device)
          targets = targets.to(device=device)

          scores = model(data)
          loss = criterion(scores, targets)

          optimizer.zero_grad()
          loss.backward()
          optimizer.step()
      #val_acc,val_loss = check_accuracy(val_data, model)
      #train_acc1,train_loss1 = check_accuracy(train_data, model)  
      train_acc,train_loss = check_accuracy(train_data, model)
      #wandb.log({"Acc_val":val_acc,"Acc_val_loss":val_loss,"Acc_train":train_acc,"Acc_train_loss":train_loss,"epoch":epoch})
  return model

In [None]:
#Best configuration Model
config = {
  "epochs": 10,
  "filter_customization": [32, 64, 128,256,512],
  "kernel_customization": [3, 3, 3, 3, 3],
  "activation": "relu",
  "dropout": 0.1,
  "learning_rate": 0.0001,
  "weight_decay": 0.0005,
  "optimizer": "adam",
  "batch_size": 32,
  "data_aug": "true",
  "batch_norm": "true",
  "fc_layer_nodes": 256
}
train_data = torch.utils.data.DataLoader(train_dataset, batch_size=wandb.config.batch_size, shuffle=True,num_workers = 4, pin_memory=True)
testmodel = ModelCnn(config,train_data)

In [None]:
#To generate the 10 x 3 grid test data classification
class RandomSamplesDataset(Dataset):
    def __init__(self, root, num_samples=3, transform=None):
        self.dataset = datasets.ImageFolder(root, transform=transform)
        self.num_samples = num_samples
        self.classes = self.dataset.classes

    def __len__(self):
        return len(self.classes) * self.num_samples

    def __getitem__(self, idx):
        class_idx = idx // self.num_samples
        class_label = self.classes[class_idx]

        class_indices = [i for i, (_, label) in enumerate(self.dataset.samples) if label == class_idx]
        random_indices = np.random.choice(class_indices, self.num_samples, replace=False)
        
        samples = [self.dataset[i][0] for i in random_indices]
        
        return samples, class_idx

# Specify your dataset root directory
root_dir = '/kaggle/input/inaturalist12k/Data/inaturalist_12K/val'


dataset = RandomSamplesDataset(root_dir, num_samples=3, transform=transform)

# Creating a DataLoader
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)


In [None]:
#Various classes available in the dataset
labels = ['Amphibia','Animalia','Arachnida','Aves','Fungi','Insecta','Mammalia','Mollusca','Plantae','Reptilia']

In [None]:
#Testing the model on the test dataset for classification of the filtered random features to generate 10 x 3 grid
testmodel.to(device)
testmodel.eval()

with torch.no_grad():
    for x, y in dataloader:
        x = x.to(device=device)
        y = y.to(device=device)

        scores = testmodel(x)
        loss = F.cross_entropy(scores, y)
        total_loss += loss.item()

        _, predictions = scores.max(1)
testmodel.train()

# Initialize wandb
wandb.init(project='Assignment 2_1', entity='CS23M028')

# Define the number of rows and columns for the grid
num_rows = 10
num_cols = 3

# Create a new figure for the grid
fig, axs = plt.subplots(num_rows, num_cols, figsize=(10, 30))

count = 0
for batch in dataloader:
    i = int(count/3)
    j = count%3
    
    
    images_list, labels_list = batch
    image_array = images_list[0][0]
    label = int(labels_list[0])

    image_array = np.array(image_array) * 255.0

    image_array = np.transpose(image_array, (1, 2, 0))

    # Plot the image
    axs[i, j].imshow(image_array)
    axs[i, j].axis('off')
    text = "Original:" + str(labels[label])
    text = text + " Predict: " + str(labels[int(predictions[count])])
    
    axs[i, j].text(0.5, -0.1, text, ha='center', transform=axs[i, j].transAxes)
    count = count + 1

# Log the figure to wandb
wandb.log({"example_images_grid": wandb.Image(fig)})