<a href="https://colab.research.google.com/github/Swapnil7-lab/DA6401_Assignment_2/blob/main/DA6401_DL_2_partA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
print(torch.device('cuda:0'))
print(torch.__version__)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0
2.6.0+cu124
cuda


In [8]:
!wget 'https://storage.googleapis.com/wandb_datasets/nature_12K.zip'
!unzip -q nature_12K.zip

--2025-04-03 04:19:56--  https://storage.googleapis.com/wandb_datasets/nature_12K.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.31.207, 142.251.111.207, 142.251.16.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.31.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3816687935 (3.6G) [application/zip]
Saving to: ‘nature_12K.zip’


2025-04-03 04:20:33 (98.5 MB/s) - ‘nature_12K.zip’ saved [3816687935/3816687935]



In [9]:
# imports
# Standard library imports
import os
import random
import pathlib

# Third-party library imports
import math
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# PyTorch imports
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split

# Torchvision imports
import torchvision
from torchvision import transforms, datasets
from torchvision.datasets import ImageFolder

# Set seeds for reproducibility
torch.manual_seed(1)
np.random.seed(1)

# Define the device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



# Data Preparation and Transformation
def load_data(bs, augment_data=False):
    # Configuration parameters
    img_size = (300, 300)
    norm_mean = (0.5, 0.5, 0.5)
    norm_std = (0.5, 0.5, 0.5)

    # Base image transformations
    base_transform = [
        transforms.Resize(img_size),
        transforms.ToTensor(),
        transforms.Normalize(norm_mean, norm_std)
    ]

    # Augmentation additions
    augmentation_layers = [
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10)
    ] if augment_data else []

    # Create transformation pipelines
    train_transforms = transforms.Compose([*augmentation_layers, *base_transform])
    test_transforms = transforms.Compose(base_transform)

    # Dataset paths configuration
    data_root = "/content/inaturalist_12K"
    train_dir = os.path.join(data_root, 'train')
    eval_dir = os.path.join(data_root, 'val')

    # Dataset preparation
    train_full = ImageFolder(train_dir, transform=train_transforms)
    eval_set = ImageFolder(eval_dir, transform=test_transforms)

    # Data partitioning
    total_train = len(train_full)
    val_portion = 0.2
    train_samples = int(total_train * (1 - val_portion))
    val_samples = total_train - train_samples

    # Dataset splitting
    train_subset, val_subset = random_split(train_full, [train_samples, val_samples])

    # Data loading configuration
    loader_config = {
        'batch_size': bs,
        'num_workers': 2,
        'pin_memory': True
    }

    # Create data loaders
    train_loader = DataLoader(train_subset, shuffle=True, **loader_config)
    val_loader = DataLoader(val_subset, shuffle=False, **loader_config)
    test_loader = DataLoader(eval_set, shuffle=False, **loader_config)

    # Class label extraction
    class_labels = [item.name for item in pathlib.Path(train_dir).iterdir()]
    class_labels.sort()

    return train_loader, val_loader, test_loader, class_labels





In [10]:
#Simple CNN
def flatten(k=[11, 9, 7, 5, 3], w=300, s=1, p=1):
    r = w
    i = 0  # Initialize the counter for the while loop

    while i < len(k):  # Loop until the counter reaches the length of k
        print("r", r)
        r = (r + 2 * p - k[i]) + 1
        r = int(r / 2) + 1
        i += 1  # Increment the counter

    return r



class CNN(nn.Module):
    def __init__(self, in_channels=3, num_class=10, num_filters=4, kernel_sizes=[11,9,7,5,3],
                 fc_neurons=64, batch_norm=True, dropout=0.3, filter_multiplier=2, activation='LeakyRelu'):

        super(CNN, self).__init__()
        # Preserve original parameter assignments
        self.in_channels = in_channels
        self.num_class = num_class
        self.num_filters = num_filters
        self.kernel_sizes = kernel_sizes
        self.fc_neurons = fc_neurons
        self.activation = activation
        self.batch_norm = batch_norm
        self.dropout = dropout
        self.filter_multiplier = filter_multiplier

        # Layer construction through systematic pattern
        prev_channels = in_channels
        for layer_idx in range(len(kernel_sizes)):
            # Calculate output channels using exponential growth
            out_channels = num_filters * (filter_multiplier ** layer_idx)

            # Convolutional block components
            setattr(self, f'conv{layer_idx+1}', nn.Conv2d(
                prev_channels, out_channels,
                kernel_size=kernel_sizes[layer_idx],
                stride=1,
                padding=1
            ).to(device))

            if batch_norm:
                setattr(self, f'bn{layer_idx+1}', nn.BatchNorm2d(out_channels))

            setattr(self, f'relu{layer_idx+1}', nn.LeakyReLU())
            setattr(self, f'pool{layer_idx+1}', nn.MaxPool2d(2, 2, padding=1))

            prev_channels = out_channels

        # Calculate spatial dimension reduction
        self.r = flatten(kernel_sizes)
        print("ok pool5")
        print("ok flatten")
        print(self.r)

        # Fully connected section with dynamic sizing
        final_channels = num_filters * (filter_multiplier ** (len(kernel_sizes)-1))
        self.fc1 = nn.Linear(
            final_channels * self.r * self.r,
            fc_neurons
        )
        self.relu6 = nn.LeakyReLU()
        self.drop = nn.Dropout(dropout)
        self.fc2 = nn.Linear(fc_neurons, num_class)

    def forward(self, x):
        # Unified processing loop for convolutional blocks
        for block_idx in range(1, 6):
            x = getattr(self, f'conv{block_idx}')(x)
            if self.batch_norm:
                x = getattr(self, f'bn{block_idx}')(x)
            x = getattr(self, f'relu{block_idx}')(x)
            x = getattr(self, f'pool{block_idx}')(x)

        # Flatten and classify
        x = x.view(x.size(0), -1)
        x = self.relu6(self.fc1(x))
        return self.fc2(self.drop(x))






In [13]:
# Configure device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define hyperparameters
in_channels = 3
num_class = 10
learning_rate = 0.0005
batch_size = 64
epochs = 15
data_aug = True

# Load dataset
train_loader, val_loader, test_loader, classes = load_data(batch_size, data_aug)
print(classes)

# Display a batch of training data
trainfeature, trainlabel = next(iter(train_loader))
print(f"Feature Batch Shape: {trainfeature.size()}")
print(f"Label Batch Shape: {trainlabel.size()}")

# Initialize the model
model = CNN(in_channels, num_class, 16, [3, 3, 3, 3, 3], 128, False, 0, 2, 'LeakyRelu').to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(model.parameters(), lr=learning_rate, weight_decay=0.0001)

# Training loop using while
epoch = 0
while epoch < epochs:
    model.train()  # Set model to training mode

    train_iter = iter(train_loader)
    batch_idx = 0

    while batch_idx < len(train_loader):
        # Get the next batch of data and targets
        data, targets = next(train_iter)

        # Transfer data to the appropriate device
        data, targets = data.to(device), targets.to(device)

        # Zero out gradients from the previous step
        optimizer.zero_grad()

        # Forward pass through the model
        scores = model(data)
        loss = criterion(scores, targets)

        # Backward pass and optimization step
        loss.backward()
        optimizer.step()

        batch_idx += 1

    # Evaluation mode for validation/testing
    model.eval()
    test_loss = 0.0
    num_correct = 0
    num_samples = 0

    test_iter = iter(test_loader)
    test_idx = 0

    with torch.no_grad():
        while test_idx < len(test_loader):
            # Get the next batch of validation/testing data and targets
            data, targets = next(test_iter)

            # Transfer data to the appropriate device
            data, targets = data.to(device), targets.to(device)

            # Forward pass for validation/testing
            scores = model(data)
            test_loss += criterion(scores, targets).item()

            # Calculate predictions and accuracy
            _, predictions = scores.max(1)
            num_correct += (predictions == targets).sum().item()
            num_samples += predictions.size(0)

            test_idx += 1

    # Compute average loss and accuracy for validation/testing
    test_loss /= len(test_loader)
    test_acc = num_correct / num_samples

    # Print epoch statistics
    print(f'Epoch [{epoch + 1}/{epochs}], Train Loss: {loss.item():.4f}, '
          f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc * 100:.2f}%')

    epoch += 1

# Save the best model to a file
best_model_path = 'best_model.pth'
torch.save(model.state_dict(), best_model_path)
print(f"Best model saved to {best_model_path}")


['.DS_Store', 'Amphibia', 'Animalia', 'Arachnida', 'Aves', 'Fungi', 'Insecta', 'Mammalia', 'Mollusca', 'Plantae', 'Reptilia']
Feature Batch Shape: torch.Size([64, 3, 300, 300])
Label Batch Shape: torch.Size([64])
r 300
r 151
r 76
r 39
r 20
ok pool5
ok flatten
11


KeyboardInterrupt: 

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
in_channels = 3
num_class = 10
learning_rate = 0.0001
batch_size = 32
epochs = 15
data_aug=False

# Load data
train_loader,val_loader,test_loader,classes=load_data(batch_size,data_aug)
print(classes)
trainfeature, trainlabel = next(iter(train_loader))
print(f"Feature Batch Shape: {trainfeature.size()}")
print(f"Label Batch Shape: {trainlabel.size()}")



# Initialize network
model = CNN(3,10,16,[7,5,5,3,3],64,True,0.2,2,'Mish').to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer=optim.NAdam(model.parameters(),lr=learning_rate,weight_decay=0.0001)

# Train Network
for epoch in range(epochs):
    # Set the model to training mode
    model.train()

    for batch_idx, (data, targets) in enumerate(train_loader):
        # Get data to cuda if possible
        data = data.to(device=device)
        targets = targets.to(device=device)

        optimizer.zero_grad()
        # forward
        scores = model(data)
        loss = criterion(scores, targets)

        # backward

        loss.backward()

        # gradient descent or adam step
        optimizer.step()

    # Set the model to evaluation mode
    model.eval()

    # Track the total loss and number of correct predictions
    test_loss = 0
    num_correct = 0
    num_samples = 0

    # Evaluate the model on the validation set
    with torch.no_grad():
        for data, targets in test_loader:
            data = data.to(device=device)
            targets = targets.to(device=device)

            scores = model(data)
            test_loss += criterion(scores, targets).item()

            _, predictions = scores.max(1)
            num_correct += (predictions == targets).sum()
            num_samples += predictions.size(0)

    # Calculate the average validation loss and accuracy
    test_loss /= len(test_loader)
    test_acc = float(num_correct) / num_samples

    # Print the epoch number, loss, and accuracy
    print('Epoch [{}/{}], Train Loss: {:.4f}, Test Loss: {:.4f}, Test Acc: {:.2f}%'
          .format(epoch+1, epochs, loss.item(), test_loss, test_acc*100))

# Check accuracy on training & test to see how good our model
# Save best model
best_model_path = 'best_model.pth'
torch.save(model.state_dict(), best_model_path)
print(f"Best model saved to {best_model_path}")

In [None]:
#loading the best model and testing it on Test Data
best_model_path = 'best_model.pth'

loaded_model = CNN(3,10,16,[7,5,5,3,3],64,True,0.2,2,'Mish').to(device)
loaded_model.load_state_dict(torch.load(best_model_path)) # it takes the loaded dictionary, not the path file itself

def calculate_accuracy(model, test_loader,criterion):
    model.eval()
    total = 0
    correct = 0
    cost=0
    acc=0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            cost +=criterion(outputs,labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images
            del labels
    acc=100 * correct / total
    cost/=len(test_loader)

    return cost,acc
loss,acc=calculate_accuracy(loaded_model,test_loader,nn.CrossEntropyLoss())
print(loss,acc)

print(loaded_model.state_dict())


In [None]:
best_model_path = 'best_model.pth'

loaded_model = CNN(3,10,16,[7,5,5,3,3],64,True,0.2,2,'Mish')
loaded_model.load_state_dict(torch.load(best_model_path)) # it takes the loaded dictionary, not the path file itself

# Initialize wandb
wandb.init(project="CS6910_Assignment_2_Q2")


# Define a function to generate predictions and sample images from the test data
def generate_predictions(model, data_loader):
    # Set the model to evaluation mode
    model.eval()

    # Create a list to store the predictions and sample images
    predictions = []
    sample_images = []

    # Generate predictions and sample images
    with torch.no_grad():
        for batch, _ in data_loader:
            # Forward pass through the model
            output = model(batch)

            # Get the predicted class labels
            _, predicted = torch.max(output, 1)

            # Convert the predicted labels to image tensors
            predicted_images = torchvision.utils.make_grid(batch[predicted])

            # Append the predictions and sample images to the lists
            predictions.append(predicted_images)
            sample_images.append(torchvision.utils.make_grid(batch))

    # Concatenate the predictions and sample images into grids
    prediction_grid = torchvision.utils.make_grid(predictions, nrow=3)
    sample_grid = torchvision.utils.make_grid(sample_images, nrow=3)

    # Return the grids
    return prediction_grid, sample_grid

# Generate the prediction and sample image grids
prediction_grid, sample_grid = generate_predictions(loaded_model, test_loader)

# Log the grids to wandb
wandb.log({
    'Predictions': wandb.Image(prediction_grid),
    'Sample Images': wandb.Image(sample_grid)
})

# Finish the run
wandb.finish()


In [None]:

from signal import signal,SIGPIPE, SIG_DFL
signal(SIGPIPE,SIG_DFL)
!pip install wandb -qU
import wandb
!wandb login --relogin 3d199b9bde866b3494cda2f8bb7c7a633c9fdade

In [None]:

sweep_config = {
    "name" : "DA6401_Assignment_2",
    "method" : "bayes",
    'metric': {
        'name': 'val_acc',
        'goal': 'maximize'
    },
    "parameters" : {
        "optimizer" : {
            "values" : ['adam','nadam','sgd']
        },
        "activation" : {
            "values" : ['LeakyRelu','Selu','Gelu','Mish']
        },
        "batch_size": {
            "values": [32, 64, 128]
        },
        'learning_rate':{
            "values": [0.001,0.0001,0.0003,0.0005]
        },
        "dropout": {
            "values": [0,0.2,0.3]
        },
        "batch_norm": {
              "values": [True,False]
        },
        "data_aug": {
              "values": [True,False]
        },
        'kernel_sizes':{
            'values': [[3,3,3,3,3],[5,5,5,5,5],[7,5,5,3,3], [11,9,7,5,3]]
        },
        'filter_multiplier': {
            'values': [1, 2, 0.5]
        },
        'num_filters': {
            'values': [4,8,16]
        },
        "fc_neurons": {
              "values": [32, 64, 128]
          }
    }
}
def opti(model,opt='adam',lr=0.0005):
    print("in opti")
    if opt == "sgd":
        opt= optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    elif opt == "adam":
        opt = optim.Adam(model.parameters(),lr=lr,weight_decay=0.0001)
    elif opt == "nadam":
        opt = optim.NAdam(model.parameters(),lr=lr,weight_decay=0.0001)
    print('exit opti')
    return opt

def calculate_accuracy(model, test_loader,criterion):
    model.eval()
    total = 0
    correct = 0
    cost=0
    acc=0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            cost +=criterion(outputs,labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images
            del labels
    acc=100 * correct / total
    cost/=len(test_loader)

    return cost,acc
def train():
    config_default={
      'epochs':15,
      'batch_size':32,
      'learning_rate':0.001,
      'dropout':0.3,
      'batch_norm':True,
      'data_aug':True,
      'kernel_sizes':[5,5,5,5,5],
      'filter_multiplier': 2,
      'num_filters': 16,
      "fc_neurons": 64
  }
    wandb.init(config=config_default)
    c= wandb.config
    name = "nfliter_"+str(c.num_filters)+"op_"+str(c.optimizer)+"_ac_"+str(c.activation)+"_n_"+str(c.learning_rate)+"_bs_"+str(c.batch_size)+"_dp_"+str(c.dropout)+"_bn_"+str(c.batch_norm)

    wandb.init(name=name)

    # Retrieve the hyperparameters from the config
    lr = c.learning_rate
    bs = c.batch_size
    epochs = 15
    act= c.activation
    opt= c.optimizer

    dp = c.dropout
    bn = c.batch_norm
    da=c.data_aug
    ks=c.kernel_sizes
    fm=c.filter_multiplier
    nf=c.num_filters
    fc=c.fc_neurons


    # Load the dataset
    train_loader,val_loader,test_loader,classes=load_data(bs,da)

    print("data loaded ====================================================")

    # Initialize network
    model= CNN(in_channels=3, num_class=10,num_filters=nf,kernel_sizes=ks,fc_neurons=fc,batch_norm=bn,dropout=dp,filter_multiplier=fm,activation=act).to(device)
    print("model ini==============================================================")
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer=opti(model,opt,lr)
    print("done")
    # Train Network
    for epoch in range(epochs):
        print('epoch enter')
        # Set the model to training mode
        model.train()

        for batch_idx, (data, targets) in enumerate(train_loader):
            # Get data to cuda if possible
            data = data.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()
            # forward
            scores = model(data)
            loss = criterion(scores, targets)

            # backward

            loss.backward()

            # gradient descent or adam step
            optimizer.step()
            del data
            del targets

        # Calculate the test accuracy
        train_loss,train_acc = calculate_accuracy(model, train_loader,criterion)
        val_loss,val_acc = calculate_accuracy(model, val_loader,criterion)
        test_loss,test_acc = calculate_accuracy(model, test_loader,criterion)

        torch.cuda.empty_cache()
        # Log the metrics to WandB
        wandb.log({'epoch': epoch+1,'loss':loss.item(), 'train_loss': loss.item(),'test_loss':test_loss,'val_loss':val_loss,'test_acc': test_acc,'train_acc': train_acc,'val_acc': val_acc})


    # Save the best model
    wandb.save('model.h5')
    return


In [None]:

# Initialize the WandB sweep
sweep_id = wandb.sweep(sweep_config, project='DA6401_Assignment_2')
wandb.agent(sweep_id, function=train,count=5)


In [None]:

# Initialize the WandB sweep
sweep_id = wandb.sweep(sweep_config, project='DA6401_Assignment_2')
wandb.agent(sweep_id, function=train,count=5)


In [None]:

# Initialize the WandB sweep
sweep_id = wandb.sweep(sweep_config, project='DA6401_Assignment_2')
wandb.agent(sweep_id, function=train,count=5)


In [None]:

# Initialize the WandB sweep
sweep_id = wandb.sweep(sweep_config, project='DA6401_Assignment_2')
wandb.agent(sweep_id, function=train,count=5)
