# This notebook is the task that was given as a pre requisite for the project 5 of SRIP 2024
## Pratham Sharda(pratham.sharda@iitgn.ac.in)

# Binary classification with resnet

So here first i have divided the dataset for two classes(I have take it for crab and flamingo) and first have run it without crossvalidation due to time constraint

In [None]:
import torch
import torchvision
from torchvision import datasets, transforms

traindir = "D:/SRIP/Binary_task/Training"
testdir = "D:/SRIP/Binary_task/Validation"

#transformations
train_transforms = transforms.Compose([transforms.Resize((224,224)),
                                       transforms.ToTensor(),                                
                                       torchvision.transforms.Normalize(
                                           mean=[0.485, 0.456, 0.406],
                                           std=[0.229, 0.224, 0.225],
    ),
                                       ])
test_transforms = transforms.Compose([transforms.Resize((224,224)),
                                      transforms.ToTensor(),
                                      torchvision.transforms.Normalize(
                                          mean=[0.485, 0.456, 0.406],
                                          std=[0.229, 0.224, 0.225],
    ),
                                      ])

#datasets
train_data = datasets.ImageFolder(traindir,transform=train_transforms)
test_data = datasets.ImageFolder(testdir,transform=test_transforms)

#dataloader
trainloader = torch.utils.data.DataLoader(train_data, shuffle = True, batch_size=16)
testloader = torch.utils.data.DataLoader(test_data, shuffle = True, batch_size=16)

In [None]:

def make_train_step(model, optimizer, loss_fn):
  def train_step(x,y):
    #make prediction
    yhat = model(x)
    #enter train mode
    model.train()
    #compute loss
    loss = loss_fn(yhat,y)

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    #optimizer.cleargrads()

    return loss
  return train_step

In [None]:
from torchvision import datasets, models, transforms
import torch.nn as nn

device = "cuda" if torch.cuda.is_available() else "cpu"
model = models.resnet18(pretrained=True)

#freeze all params
for params in model.parameters():
  params.requires_grad_ = False

#add a new final layer
nr_filters = model.fc.in_features  #number of input features of last layer
model.fc = nn.Linear(nr_filters, 1)

model = model.to(device)

In [None]:
from torch.nn.modules.loss import BCEWithLogitsLoss
from torch.optim import lr_scheduler

#loss
loss_fn = BCEWithLogitsLoss() #binary cross entropy with sigmoid, so no need to use sigmoid in the model

#optimizer
optimizer = torch.optim.Adam(model.fc.parameters()) 

#train step
train_step = make_train_step(model, optimizer, loss_fn)

In [None]:
%%capture
!pip install tqdm
from tqdm import tqdm


losses = []
val_losses = []

epoch_train_losses = []
epoch_test_losses = []

n_epochs = 10
early_stopping_tolerance = 3
early_stopping_threshold = 0.03

for epoch in range(n_epochs):
  epoch_loss = 0
  for i ,data in tqdm(enumerate(trainloader), total = len(trainloader)): #iterate ove batches
    x_batch , y_batch = data
    x_batch = x_batch.to(device) #move to gpu
    y_batch = y_batch.unsqueeze(1).float() #convert target to same nn output shape
    y_batch = y_batch.to(device) #move to gpu


    loss = train_step(x_batch, y_batch)
    epoch_loss += loss/len(trainloader)
    losses.append(loss)
    
  epoch_train_losses.append(epoch_loss)
  print('\nEpoch : {}, train loss : {}'.format(epoch+1,epoch_loss))

  #validation doesnt requires gradient
  with torch.no_grad():
    cum_loss = 0
    for x_batch, y_batch in testloader:
      x_batch = x_batch.to(device)
      y_batch = y_batch.unsqueeze(1).float() #convert target to same nn output shape
      y_batch = y_batch.to(device)

      #model to eval mode
      model.eval()

      yhat = model(x_batch)
      val_loss = loss_fn(yhat,y_batch)
      cum_loss += loss/len(testloader)
      val_losses.append(val_loss.item())


    epoch_test_losses.append(cum_loss)
    print('Epoch : {}, val loss : {}'.format(epoch+1,cum_loss))  
    
    best_loss = min(epoch_test_losses)
    
    #save best model
    if cum_loss <= best_loss:
      best_model_wts = model.state_dict()
    
    #early stopping
    early_stopping_counter = 0
    if cum_loss > best_loss:
      early_stopping_counter +=1

    if (early_stopping_counter == early_stopping_tolerance) or (best_loss <= early_stopping_threshold):
      print("/nTerminating: early stopping")
      break #terminate training
    
#load best model
model.load_state_dict(best_model_wts)

In [None]:
import matplotlib.pyplot as plt 

def inference(test_data):
  idx = torch.randint(1, len(test_data), (1,))
  sample = torch.unsqueeze(test_data[idx][0], dim=0).to(device)

  if torch.sigmoid(model(sample)) < 0.5:
    print("Prediction : Crab")
  else:
    print("Prediction : Flamingo")


  plt.imshow(test_data[idx][0].permute(1, 2, 0))

inference(test_data)

# Binary classification with cross validation(3 fold) 

In [None]:
import torch
import torchvision
from torchvision import datasets, models, transforms
import torch.nn as nn
from torch.nn.modules.loss import BCEWithLogitsLoss
from torch.optim import lr_scheduler
import numpy as np
from sklearn.model_selection import KFold
import copy
from tqdm import tqdm
import matplotlib.pyplot as plt

# Define directories (adjust paths as needed)
traindir = "D:/SRIP/Binary_task/Training"  # Assuming this now includes both training and validation data

# Transformations
transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Dataset
dataset = datasets.ImageFolder(traindir, transform=transforms)

# KFold configuration
kfold = KFold(n_splits=3, shuffle=True, random_state=42)

# Training parameters
n_epochs = 10
batch_size = 16

# Loss function
loss_fn = BCEWithLogitsLoss()

# Cross-validation
best_loss_overall = np.inf
best_model_wts_overall = None

for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):
    print(f"Fold {fold+1}/{kfold.n_splits}")

    # Sampler for splitting data
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)

    # Data loaders for training and validation in this fold
    trainloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=train_subsampler)
    testloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=test_subsampler)

    # Model setup
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = models.resnet18(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, 1)
    model.to(device)

    # Optimizer
    optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)

    best_loss = np.inf
    best_model_wts = copy.deepcopy(model.state_dict())

    # Training loop
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in tqdm(trainloader, desc=f"Epoch {epoch+1}/{n_epochs}, Fold {fold+1}"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            labels = labels.unsqueeze(1).float()  # Ensure labels are float for BCELoss
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(trainloader.sampler)

        # Validation phase
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in testloader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                labels = labels.unsqueeze(1).float()
                loss = loss_fn(outputs, labels)
                val_loss += loss.item() * inputs.size(0)

        val_loss /= len(testloader.sampler)
        print(f"Fold {fold+1}, Epoch {epoch+1}: Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}")

        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())

    if best_loss < best_loss_overall:
        best_loss_overall = best_loss
        best_model_wts_overall = copy.deepcopy(best_model_wts)

# Load the best model weights from cross-validation
model.load_state_dict(best_model_wts_overall)

# You can now proceed with testing or inference using this model
# For example, here's a simple function to do inference on a single image from the dataset
def inference(dataset, model, device):
    model.eval()
    idx = np.random.randint(0, len(dataset))
    img, _ = dataset[idx]
    img = img.unsqueeze(0).to(device)  # Add batch dimension and transfer to device
    
    with torch.no_grad():
        output = model(img)
        prediction = torch.sigmoid(output).item()

    plt.imshow(dataset[idx][0].permute(1, 2, 0))
    plt.title(f"Prediction: {'Flamingo' if prediction >= 0.5 else 'Crab'}")
    plt.show()

# Example inference
inference(dataset, model, device)


# So now i have created the dataset for 5 class and one vs all classification

So I have take here for one vs rest classification the example of elephant that is is elephant or is not elephant
For 5 class I have converted the dataset into 5 groups where each group is divided based on some particular characteristics(mammals,reptiles,birds,insects and aquatic animals).This has been explicitly mentioned in the notebook below


In [None]:
# def visualize_conv_layers(model, input_image):
#     # Assuming input_image is a torch.Tensor of shape (C, H, W) and normalized
#     activation = {}
#     def get_activation(name):
#         def hook(model, input, output):
#             activation[name] = output.detach()
#         return hook
    
#     # Register hooks
#     model.conv1.register_forward_hook(get_activation('conv1'))
#     model.conv2.register_forward_hook(get_activation('conv2'))
#     model.conv3.register_forward_hook(get_activation('conv3'))
    
#     # Forward pass
#     output = model(input_image.unsqueeze(0)) # Add batch dimension
    
#     # Plotting
#     for name, act in activation.items():
#         num_feature_maps = act.size(1)
#         # Plot feature maps
#         # You can use matplotlib to create subplots and plot each feature map


Creating one vs rest dataset

In [None]:
import os
import shutil

# Define the source directory where the current dataset is stored
source_directory = 'D:/SRIP/archive/animals/animals'

# Define the destination directory where the new folder structure will be created
destination_directory = 'D:/SRIP/one_vs_rest_dataset'

# Define the names of the new subdirectories
elephant_dir = os.path.join(destination_directory, 'elephant')
other_animals_dir = os.path.join(destination_directory, 'other_animals')

# Create the destination directory and subdirectories if they don't already exist
os.makedirs(elephant_dir, exist_ok=True)
os.makedirs(other_animals_dir, exist_ok=True)

# Loop through each folder in the source directory
for folder_name in os.listdir(source_directory):
    # Define the path to the current folder
    current_folder_path = os.path.join(source_directory, folder_name)
    
    # Check if the current folder is indeed a directory
    if os.path.isdir(current_folder_path):
        # Determine the destination directory based on whether the folder is 'elephant' or not
        if folder_name.lower() == 'elephant':
            dest_dir = elephant_dir
        else:
            dest_dir = other_animals_dir
        
        # Loop through each file in the current folder
        for filename in os.listdir(current_folder_path):
            # Define the source and destination file paths
            source_file_path = os.path.join(current_folder_path, filename)
            destination_file_path = os.path.join(dest_dir, filename)
            
            # Copy the file from the source to the destination
            shutil.copy(source_file_path, destination_file_path)

print("Dataset reorganization complete.")


3 fold dataset creation for elephant vs rest animals 

In [None]:
import os
import shutil
from sklearn.model_selection import KFold
import numpy as np

# Define the path to the directory containing the elephant and other_animals folders
dataset_directory = 'D:/SRIP/one_vs_rest_dataset'

# Define the main categories
categories = ['elephant', 'other_animals']

# Initialize KFold with 3 splits
kf = KFold(n_splits=3, shuffle=True, random_state=42)

# Process each category separately to split them into folds
for category in categories:
    # Path to the specific category directory
    category_path = os.path.join(dataset_directory, category)
    
    # List all files in the category directory
    files = np.array(os.listdir(category_path))
    
    # Apply KFold splitting
    for fold, (train_idx, val_idx) in enumerate(kf.split(files)):
        # Paths for train and validation directories for this fold
        fold_dir = os.path.join(dataset_directory, f'fold_{fold+1}', category)
        
        # Create the fold directory if it doesn't exist
        os.makedirs(fold_dir, exist_ok=True)
        
        # Validation files for this fold
        val_files = files[val_idx]
        
        # Copy validation files to the fold directory
        for file in val_files:
            src_file_path = os.path.join(category_path, file)
            dst_file_path = os.path.join(fold_dir, file)
            shutil.copy(src_file_path, dst_file_path)

print("3-Fold dataset split complete.")


5 Class dataset creation from the original dataset

In [None]:
import os
import shutil

# Define your source and destination folders
src_folder = 'D:/SRIP/archive/animals/animals'
dest_folder = 'D:/SRIP/5_class_dataset'

# Mapping of animals to their classes
class_mapping = {
    'Mammals': ["Antelope", "Bear", "Bison", "Cat", "Chimpanzee", "Cow", "Coyote", "Deer", "Dog", "Dolphin", "Elephant", "Fox", "Gorilla", "Kangaroo", "Koala", "Leopard", "Lion", "Otter", "Panda", "Porcupine", "Raccoon", "Reindeer", "Rhinoceros", "Tiger", "Whale", "Wolf", "Zebra"],
    'Birds': ["Bat", "Eagle", "Flamingo", "Hummingbird", "Owl", "Parrot", "Pelecaniformes", "Penguin", "Pigeon", "Sparrow", "Turkey", "Woodpecker"],
    'Aquatic and Amphibious Animals': ["Crab", "Dolphin", "Goldfish", "Jellyfish", "Lobster", "Octopus", "Oyster", "Seahorse", "Seal", "Shark", "Starfish"],
    'Insects and Arthropods': ["Bee", "Beetle", "Butterfly", "Caterpillar", "Cockroach", "Dragonfly", "Fly", "Grasshopper", "Ladybugs", "Mosquito", "Moth"],
    'Reptiles and Others': ["Lizard", "Snake", "Turtle"]  # Assuming 'Crocodile' and 'Tortoise' aren't in your list but could be added if they were.
}

# Create the destination folder if it doesn't exist
if not os.path.exists(dest_folder):
    os.makedirs(dest_folder)

# Create class folders and move files
for class_name, animals in class_mapping.items():
    class_folder = os.path.join(dest_folder, class_name)
    if not os.path.exists(class_folder):
        os.makedirs(class_folder)
    
    for animal in animals:
        animal_folder = os.path.join(src_folder, animal)
        if os.path.exists(animal_folder):
            for filename in os.listdir(animal_folder):
                src_file = os.path.join(animal_folder, filename)
                dest_file = os.path.join(class_folder, filename)
                # To avoid overwriting files with the same name from different folders, you could add a check here
                shutil.move(src_file, dest_file)

print("Images have been successfully reorganized into class-based folders.")


Craeting 3 fold dataset from the 5 class dataset so that we can have 3 folds for training and testing

In [None]:
import os
import shutil
from sklearn.model_selection import KFold
import numpy as np

# Define the path to the directory containing the elephant and other_animals folders
dataset_directory = 'D:/SRIP/5_class_dataset'

# Define the main categories
categories = ['Mammals', 'Birds', 'Aquatic and Amphibious Animals', 'Insects and Arthropods', 'Reptiles and Others']

# Initialize KFold with 3 splits
kf = KFold(n_splits=3, shuffle=True, random_state=42)

# Process each category separately to split them into folds
for category in categories:
    # Path to the specific category directory
    category_path = os.path.join(dataset_directory, category)
    
    # List all files in the category directory
    files = np.array(os.listdir(category_path))
    
    # Apply KFold splitting
    for fold, (train_idx, val_idx) in enumerate(kf.split(files)):
        # Paths for train and validation directories for this fold
        fold_dir = os.path.join(dataset_directory, f'fold_{fold+1}', category)
        
        # Create the fold directory if it doesn't exist
        os.makedirs(fold_dir, exist_ok=True)
        
        # Validation files for this fold
        val_files = files[val_idx]
        
        # Copy validation files to the fold directory
        for file in val_files:
            src_file_path = os.path.join(category_path, file)
            dst_file_path = os.path.join(fold_dir, file)
            shutil.copy(src_file_path, dst_file_path)

print("3-Fold dataset split complete.")


In [None]:
# import torch.nn as nn
# import torch.nn.functional as F

# class CustomCNN(nn.Module):
#     def __init__(self, num_classes=2): # Default binary for one-vs-rest
#         super(CustomCNN, self).__init__()
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
#         self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.fc1 = nn.Linear(128 * 28 * 28, 512) # Adjust the size according to your input
#         self.fc2 = nn.Linear(512, num_classes)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.pool(F.relu(self.conv3(x)))
#         x = x.view(-1, 128 * 28 * 28) # Adjust the size according to your input
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x


# MODEL FOR 5 CLASS 

Now that we have created the datset for the model.


This custom CNN is designed for image classification tasks. It consists of two convolutional layers followed by max pooling layers for feature extraction and spatial downsampling. The convolutional layers use ReLU activation functions. The output from the convolutional layers is flattened and passed through two fully connected layers with ReLU activation and at last having softmax activation function as it is multiclass classification.

In [None]:
# import torch.nn as nn
# import torch.nn.functional as F

# class CustomCNN(nn.Module):
#     def __init__(self, num_classes=2): # Default binary for one-vs-rest
#         super(CustomCNN, self).__init__()
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
#         self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.fc1 = nn.Linear(128 * 28 * 28, 512) # Adjust the size according to your input
#         self.fc2 = nn.Linear(512, num_classes)

#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))
#         x = self.pool(F.relu(self.conv2(x)))
#         x = self.pool(F.relu(self.conv3(x)))
#         x = x.view(-1, 128 * 28 * 28) # Adjust the size according to your input
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x


In [110]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, ConcatDataset
from torchvision import datasets, transforms
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import numpy as np

class CustomCNN(nn.Module):
    def __init__(self):
        super(CustomCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2)
        self.fc1 = nn.Linear(64 * 56 * 56, 1024)
        self.fc2 = nn.Linear(1024, 5)  # Assuming 5 classes

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 56 * 56)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x


Now i am dividing each fold into training and test dataset and then fitting on the model and calculating the metrics to check its performance

In [111]:
def load_datasets(data_dir, fold):
    image_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    train_datasets = []
    test_dataset = None
    for i in range(1, 4):  # For fold 1, fold 2, fold 3
        fold_path = f'{data_dir}/fold_{i}'
        if i == fold:
            test_dataset = datasets.ImageFolder(root=fold_path, transform=image_transform)
        else:
            train_datasets.append(datasets.ImageFolder(root=fold_path, transform=image_transform))

    train_dataset = ConcatDataset(train_datasets)
    return train_dataset, test_dataset


In [109]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

data_dir = 'D:/SRIP/5_class_dataset'

for fold in range(1, 4):
    print(f"Training on Fold {fold}")
    train_dataset, test_dataset = load_datasets(data_dir, fold)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    # Training
    model.train()
    for epoch in range(10):  # Number of epochs
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            print(f"Epoch {epoch+1}, Loss: {loss.item()}")
            
    
    # Evaluation
    model.eval()
    correct = 0
    total = 0
    true_labels = []
    predicted_labels = []
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())
    print(f'Accuracy of the network on fold {fold} test images: {100 * correct / total}%')
    
    # Compute confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    sns.heatmap(cm, annot=True, fmt='d')
    plt.title(f'Confusion Matrix for Fold {fold}')
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    plt.show()
    
    # Print classification report
    print(f'Classification Report for Fold {fold}:')
    print(classification_report(true_labels, predicted_labels, target_names=train_dataset.classes))


Training on Fold 1
Epoch 1, Loss: 1.6240241527557373
Epoch 1, Loss: 8.057486534118652
Epoch 1, Loss: 5.731037139892578
Epoch 1, Loss: 6.722264289855957
Epoch 1, Loss: 3.2569351196289062
Epoch 1, Loss: 1.7579600811004639
Epoch 1, Loss: 1.6667135953903198
Epoch 1, Loss: 1.6080600023269653
Epoch 1, Loss: 1.5718687772750854
Epoch 1, Loss: 1.5815346240997314
Epoch 1, Loss: 1.6057149171829224
Epoch 1, Loss: 1.6013282537460327
Epoch 1, Loss: 1.5987441539764404
Epoch 1, Loss: 1.534095287322998
Epoch 1, Loss: 1.5452967882156372
Epoch 1, Loss: 1.5654211044311523
Epoch 1, Loss: 1.5082027912139893
Epoch 1, Loss: 1.6085922718048096
Epoch 1, Loss: 1.6096221208572388
Epoch 1, Loss: 1.4163612127304077
Epoch 1, Loss: 1.7120956182479858
Epoch 1, Loss: 1.5537605285644531
Epoch 1, Loss: 1.4108872413635254
Epoch 1, Loss: 1.3780244588851929
Epoch 1, Loss: 1.5497326850891113
Epoch 1, Loss: 1.4746969938278198
Epoch 1, Loss: 1.4788645505905151
Epoch 1, Loss: 1.4680649042129517
Epoch 1, Loss: 1.4550009965896606

KeyboardInterrupt: 