<a href="https://colab.research.google.com/github/AdiY2j/CS6910_Assignment2/blob/main/partA/cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tqdm



In [2]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import defaultdict
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm

In [3]:
torch.cuda.is_available()

True

In [4]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
device

'cuda:0'

In [5]:
!cp -a '/content/drive/MyDrive/inaturalist_12K/train' '/content/inaturalist/'

In [7]:

# Define the directory containing your dataset
train_dir = '/kaggle/input/inaturalist-12k/inaturalist_12K/train'

data_aug = False

if data_aug :
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
else :
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

# Create a PyTorch dataset from the image folder
train_dataset = ImageFolder(train_dir, transform=transform)


validation_ratio = 0.2
class_labels = [label for _, label in train_dataset]

# Create a dictionary to store indices for each class
class_indices = defaultdict(list)
for idx, label in enumerate(class_labels):
    class_indices[label].append(idx)

# Initialize lists to store training and validation indices
train_indices = []
val_indices = []

# Split each class into training and validation indices
for label, indices in class_indices.items():
    num_samples = len(indices)
    num_validation_samples = int(validation_ratio * num_samples)
    np.random.shuffle(indices)  # Shuffle indices for random selection
    train_indices.extend(indices[num_validation_samples:])
    val_indices.extend(indices[:num_validation_samples])

# Create SubsetRandomSampler for training and validation sets
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

# Create dataloaders for training and validation sets
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
val_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=val_sampler)


In [None]:
temp = []
for imag, lab in train_loader:
    temp.append(lab)

In [None]:
c = 0
mp = {}
for i in range(len(temp)):
    for j in range(len(temp[i])):
        #print(temp[i][j])
        if temp[i][j].item() in mp :
            mp[temp[i][j].item()] += 1
        else :
            mp[temp[i][j].item()] = 1
        c += 1

print(mp)

In [8]:
class SpeciesCNN(nn.Module):
    def __init__(self, num_classes, num_filters=32, filter_size=3, dense_neurons=512, activation = 'ReLU', batch_norm = False, dropout_val = 0.0, filter_org = 'double'):
        super(SpeciesCNN, self).__init__()
        act_func = nn.ReLU()
        filter_multiplier = 1
        match activation :
            case 'ReLU':
                act_func = nn.ReLU
            case 'GELU':
                act_func = nn.GELU
            case 'SiLU':
                act_func = nn.SiLU
            case 'Mish':
                act_func = nn.Mish
            case 'LeakyReLU':
                act_func = nn.LeakyReLU
            case 'Sigmoid':
                act_func = nn.Sigmoid

        match filter_org :
            case 'same':
                filters = [num_filters] * 5
            case 'double':
                filters = [32 * (2 ** i) for i in range(5)]
            case 'half':
                filters = [32 // (2 ** i) for i in range(5)]



        # Convolutional block 1

        self.conv_1 = nn.Conv2d(in_channels=3, out_channels=filters[0], kernel_size=filter_size, padding=1)
        self.act_1 = act_func()
        self.pool_1 = nn.MaxPool2d(kernel_size=2, stride=2)
        if batch_norm :
            self.batch_1 = nn.BatchNorm2d(filters[0])

        # Convolutional block 2
        self.conv_2 = nn.Conv2d(in_channels=filters[0], out_channels=filters[1], kernel_size=filter_size, padding=1)
        self.act_2 = act_func()
        self.pool_2 = nn.MaxPool2d(kernel_size=2, stride=2)
        if batch_norm :
            self.batch_2 = nn.BatchNorm2d(filters[1])

        # Convolutional block 3
        self.conv_3 = nn.Conv2d(in_channels=filters[1], out_channels=filters[2], kernel_size=filter_size, padding=1)
        self.act_3 = act_func()
        self.pool_3 = nn.MaxPool2d(kernel_size=2, stride=2)
        if batch_norm :
            self.batch_3 = nn.BatchNorm2d(filters[2])

        # Convolutional block 4
        self.conv_4 = nn.Conv2d(in_channels=filters[2], out_channels=filters[3], kernel_size=filter_size, padding=1)
        self.act_4 = act_func()
        self.pool_4 = nn.MaxPool2d(kernel_size=2, stride=2)
        if batch_norm :
            self.batch_4 = nn.BatchNorm2d(filters[3])

        # Convolutional block 5
        self.conv_5 = nn.Conv2d(in_channels=filters[3], out_channels=filters[4], kernel_size=filter_size, padding=1)
        self.act_5 = act_func()
        self.pool_5 = nn.MaxPool2d(kernel_size=2, stride=2)
        if batch_norm :
            self.batch_5 = nn.BatchNorm2d(filters[4])


        self.flatten = nn.Flatten()

        # Dense layers
        self.fc1 = nn.Linear(filters[4]*7*7, dense_neurons)
        self.fc1_activation = act_func()
        if dropout_val > 0.0 :
            self.dropout1 = nn.Dropout(dropout_val)
        self.fc2 = nn.Linear(dense_neurons, num_classes)

    def forward(self, x):
        x = self.pool_1(self.act_1(self.conv_1((x))))
        if hasattr(self, 'batch_1'):
            x = self.batch_1(x)

        x = self.pool_2(self.act_2(self.conv_2((x))))
        if hasattr(self, 'batch_2'):
            x = self.batch_2(x)

        x = self.pool_3(self.act_3(self.conv_3((x))))
        if hasattr(self, 'batch_3'):
            x = self.batch_3(x)

        x = self.pool_4(self.act_4(self.conv_4((x))))
        if hasattr(self, 'batch_4'):
            x = self.batch_4(x)

        x = self.pool_5(self.act_5(self.conv_5((x))))
        if hasattr(self, 'batch_5'):
            x = self.batch_5(x)

        x = self.flatten(x)
        x = self.fc1_activation(self.fc1(x))
        if hasattr(self, 'dropout1'):
            x = self.dropout1(x)
        x = F.softmax(self.fc2(x), dim=1)
        return x

# Define the model
model = SpeciesCNN(num_classes=10)

# Print the model architecture
print(model)


SmallCNN(
  (conv_1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act_1): ReLU()
  (pool_1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv_2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act_2): ReLU()
  (pool_2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv_3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act_3): ReLU()
  (pool_3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv_4): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act_4): ReLU()
  (pool_4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv_5): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (act_5): ReLU()
  (pool_5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_featur

In [None]:
for i, data in enumerate(train_loader):
    inp, lab = data
    print(f'Ip Shape : {inp.shape}')
    print(f'N/w Shape : {model(inp).shape}')
    break

In [None]:
num_params = 0
for x in model.parameters():
    num_params += len(torch.flatten(x))

print(f'Number of parameters in the model: {num_params : ,}')

In [None]:
def train_per_epoch(model, train_loader, loss_func, optimizer, epoch):
    model.train(True)  # Set the model to training mode

    train_loss = 0.0
    correct_ans = 0
    num_samples = 0

    for data in tqdm(train_loader):
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = loss_func(outputs, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item() * inputs.size(0)
        z, predicted = torch.max(outputs, 1)
        correct_ans += (predicted == labels).sum().item()
        num_samples += labels.size(0)


    epoch_loss = train_loss / num_samples
    epoch_accuracy = correct_ans / num_samples

    return epoch_loss, epoch_accuracy

In [None]:
def val_per_epoch(model, val_loader, loss_func, optimizer, epoch):
    model.eval()

    correct_ans = 0
    num_samples = 0
    val_loss = 0.0

    for data in tqdm(val_loader):
        inputs, labels = data[0].to(device), data[1].to(device)

        with torch.no_grad():
            outputs = model(inputs)
            loss = loss_func(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            z, predicted = torch.max(outputs, 1)
            correct_ans += (predicted == labels).sum().item()
            num_samples += labels.size(0)

    epoch_val_loss = val_loss / num_samples
    epoch_val_accuracy = correct_ans / num_samples

    return epoch_val_loss, epoch_val_accuracy


In [11]:
# Define the model
model = SpeciesCNN(num_classes=10, num_filters = 64, activation = 'LeakyReLU', batch_norm = True, dropout_val=0.2, filter_org = 'double')

model.to(device)

# Define loss function and optimizer
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)

# Function to train the model
def train_model(model, train_loader, val_loader, loss_func, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        train_loss, train_acc = train_per_epoch(model, train_loader, loss_func, optimizer, epoch)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}')
        val_loss, val_acc = val_per_epoch(model, val_loader, loss_func, optimizer, epoch)
        print(f'Epoch {epoch+1}/{num_epochs}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}')

# Train the model
train_model(model, train_loader, val_loader, loss_func, optimizer, num_epochs=10)

  0%|          | 0/251 [00:00<?, ?it/s]

8008
Epoch 1/5, Loss: 2.2628, Accuracy: 0.1444


  0%|          | 0/251 [00:00<?, ?it/s]

8008
Epoch 2/5, Loss: 2.1317, Accuracy: 0.2248


  0%|          | 0/251 [00:00<?, ?it/s]

8008
Epoch 3/5, Loss: 2.0067, Accuracy: 0.2876


  0%|          | 0/251 [00:00<?, ?it/s]

8008
Epoch 4/5, Loss: 1.9309, Accuracy: 0.3167


  0%|          | 0/251 [00:00<?, ?it/s]

8008
Epoch 5/5, Loss: 1.8785, Accuracy: 0.3400
