In [None]:
import numpy as np
import os
from glob import glob
from PIL import Image
import torch
from torch.utils import data
import torchvision.transforms as transforms
import numpy as n
import torch.nn as nn
import torch.optim as optim
import argparse
import sys
import csv
from sklearn.preprocessing import normalize
import pandas as pd
import IPython.display as ipd
import torch.nn.functional as F
import torchvision
from torchvision import models, datasets


In [None]:
class CustomDataset(data.Dataset):
    def __init__(self, image_dir, classes_file, predicate_file, transform=None):
        self.transform = transform
        self.predicate_continuous_mat = np.array(np.genfromtxt(predicate_file, dtype='float32'))

      # i am normalizing only positive values , not negative as negative is -1 and i dont want my model to ever get its features
        self.predicate_continuous_mat[self.predicate_continuous_mat > 0] /= 100.0 

        #matches names to index (index starts from 0)
        self.class_to_index = {}
        with open(classes_file) as f:
            for line in f:
                index, class_name = line.split()
                self.class_to_index[class_name.strip()] = int(index) - 1  

        self.img_names = []
        self.img_index = []
        for class_name, class_index in self.class_to_index.items():
            folder_dir = os.path.join(image_dir, class_name)
            file_descriptor = os.path.join(folder_dir, '*.jpg')
            files = glob(file_descriptor)
            for file_name in files:
                self.img_names.append(file_name)
                self.img_index.append(class_index)

        print(f"Loaded {len(self.img_names)} images from {image_dir}")

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, index):
        img_path = self.img_names[index]
        image = Image.open(img_path)
        if image.getbands()[0] == 'L':  
            image = image.convert('RGB')
        if self.transform:
            image = self.transform(image)
        img_index = self.img_index[index]
        predicate = self.predicate_continuous_mat[img_index, :]
        return image, predicate, img_path, img_index

#class defined for first making image to square by padding and then resizing it as if resied rectangle images, data will be lost
class PadToSquare:
    def __call__(self, image):
        width, height = image.size
        max_side = max(width, height)
        padding = (
            (max_side - width) // 2, 
            (max_side - height) // 2, 
            (max_side - width + 1) // 2, 
            (max_side - height + 1) // 2
        )
        return transforms.functional.pad(image, padding, fill=0, padding_mode='constant')

class CustomResNet(nn.Module):
    def __init__(self, num_predicates, num_classes):
        super(CustomResNet, self).__init__()
        self.resnet = torchvision.models.resnet50(pretrained=True)#loads pre trained resnet50 model
        
       
        self.resnet.fc = nn.Identity()# this line removes FC layer 
        
        #these lines adds 2 new layers , one for the prediactes and other for the classes 
        self.fc_predicates = nn.Linear(2048, num_predicates)
        self.fc_classes = nn.Linear(num_predicates, num_classes)
    
    def forward(self, x):
        x = self.resnet(x)
        predicates = self.fc_predicates(x)
        class_logits = self.fc_classes(predicates)
        return class_logits, predicates




num_predicates = 85
num_classes = 50


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CustomResNet(num_predicates=num_predicates, num_classes=num_classes).to(device) # shifts to gpu




# cross entropy with label smoothing
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing=0.1):
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.smoothing = smoothing
        self.confidence = 1.0 - smoothing

    def forward(self, x, target):
        logprobs = F.log_softmax(x, dim=-1)
        nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1))
        nll_loss = nll_loss.squeeze(1)
        smooth_loss = -logprobs.mean(dim=-1)
        loss = self.confidence * nll_loss + self.smoothing * smooth_loss
        return loss.mean()




#vse loss function
class VSELoss(nn.Module):
    def __init__(self, scale_factor=1.0):
        super(VSELoss, self).__init__()
        self.scale_factor = scale_factor

    def forward(self, y_pred, y_true):
        return torch.mean((y_pred - y_true) ** 2 * (1 + self.scale_factor * (y_pred - y_true) ** 2))




criterion_predicates = VSELoss(scale_factor=1.0)  
criterion_classes = LabelSmoothingCrossEntropy(smoothing=0.1) 


# i have defined this train class to modify my code to make finetuning easier and keeping all parameters at one place , using parsed arguments to calltrain function 
def train(num_epochs, eval_interval, learning_rate, output_filename, batch_size):
    train_params = {'batch_size': batch_size, 'num_workers': 3}
    val_params = {'batch_size': batch_size, 'num_workers': 3}
    train_process_steps = transforms.Compose([
        PadToSquare(),
        transforms.RandomRotation(15),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.3, contrast=0.3),
        transforms.Resize((224, 224)),  
        transforms.ToTensor()
    ])##torchvision transformations 
    dataset = CustomDataset('/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/train', '/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/classes.txt', '/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/predicate-matrix-continuous.txt', train_process_steps)
    

    #nothing , just dividing my dataset into train and validation set and defining loaders
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    split = int(np.floor(0.05 * dataset_size)) 
    np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]

    train_sampler = data.SubsetRandomSampler(train_indices)
    val_sampler = data.SubsetRandomSampler(val_indices)

    train_loader = data.DataLoader(dataset, sampler=train_sampler, **train_params)
    val_loader = data.DataLoader(dataset, sampler=val_sampler, **val_params)

    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)  #using adamW optimizer

   
    
    
    
    #training loop starts 
    for epoch in range(num_epochs):
        model.train()
        for i, (images, predicates, img_names, indexes) in enumerate(train_loader):
            if images.shape[0] < 2:
                break
            images = images.to(device)
            predicates = predicates.to(device)
            
            #forwardpass
            class_logits, pred_predicates = model(images)
            class_targets = indexes.to(device)  # Assuming `indexes` represent class labels

            loss_predicates = criterion_predicates(pred_predicates, predicates)
            loss_classes = criterion_classes(class_logits, class_targets)
            loss = 0.5 * loss_predicates + 0.5 * loss_classes  ## the place where i managed weights of loss

           #doing backward pass and optimizing , normal things
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i+1) % eval_interval == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}, Predicate Loss: {loss_predicates.item():.4f}, Class Loss: {loss_classes.item():.4f}")
        
        #evaluating on validation set 
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for images, predicates, img_names, indexes in val_loader:
                images = images.to(device)
                predicates = predicates.to(device)
                class_logits, pred_predicates = model(images)
                class_targets = indexes.to(device)
                
                
                loss_predicates = criterion_predicates(pred_predicates, predicates)
                loss_classes = criterion_classes(class_logits, class_targets)
                loss = 0.5 * loss_predicates + 0.5 * loss_classes  ## the place where i managed weights of loss (same changes done on validtion, otherwise nothing can be concluded from loss trend)

                val_loss += loss.item()
        
        val_loss /= len(val_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Validation Loss: {val_loss:.4f}")
    
    
    torch.save(model.state_dict(), output_filename) ##saving trained model 


# i know that typically parse statements should not be used in python notebooks,it was an experiment can i use this efficiently in ipynb files 
#another reason was that i want all my hyperparameters to be placed at same place , so that nothing gets left unchanged and my traning dont gets meaningless if any of the change is left 
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Train a model with CustomResNet and combined loss')
    parser.add_argument('--num_epochs', type=int, default=10, help='Number of training epochs')
    parser.add_argument('--eval_interval', type=int, default=100, help='Evaluation interval')
    parser.add_argument('--learning_rate', type=float, default=0.00005, help='Learning rate')  
    parser.add_argument('--output_filename', type=str, default='model.pth', help='Output filename for the trained model')
    parser.add_argument('--batch_size', type=int, default=32, help='Batch size')

    args, unknown = parser.parse_known_args()

    train(args.num_epochs, args.eval_interval, args.learning_rate, args.output_filename, args.batch_size)


In [None]:
#this is training cell

# should be defined at starting by convetion but as i am not distributing in cells , it wont be neat even if i put above 
test_transform = transforms.Compose([
    PadToSquare(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])


model.load_state_dict(torch.load('model.pth'))
model.eval()

class_to_index = {}
with open('/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/classes.txt') as f:
    for line in f:
        index, class_name = line.split()
        class_to_index[int(index) - 1] = class_name.strip()


def get_modified_weights(model):### saves modified weights 
    return model.fc_classes.weight.detach().cpu().numpy()

#this function is made to calculate the closest class
def find_closest_class(model, image, unseen_weights, seen_weights):
    with torch.no_grad():
        image = test_transform(image).unsqueeze(0).to(device)
        class_logits, predicates = model(image)
        predicates = predicates.cpu().numpy().flatten()
        
        # here normalization of predicates is done alligned with weights
        predicates = normalize(predicates.reshape(1, -1), norm='l2')[0]

    # these lines calculate cosine similarities with all the classes(both seen and unseen classes)
    seen_similarity = np.dot(seen_weights, predicates)
    unseen_similarity = np.dot(unseen_weights, predicates)
    
    ## exact lines where closest class is found
    if np.max(seen_similarity) > np.max(unseen_similarity):
        return np.argmax(seen_similarity)
    else:
        return 40 + np.argmax(unseen_similarity)  # Assuming unseen classes indices start from 40



modified_weights = get_modified_weights(model)


predicate_continuous_mat = np.array(np.genfromtxt('/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/predicate-matrix-continuous.txt', dtype='float32'))
predicate_continuous_mat[predicate_continuous_mat > 0] /= 100.0 # normalizes P-M-C


seen_weights = modified_weights[:40]
unseen_weights = predicate_continuous_mat[40:]

# normalizes the weights 
seen_weights = normalize(seen_weights, norm='l2')
unseen_weights = normalize(unseen_weights, norm='l2')

# predictions are stored in this file 
predictions = []
test_image_dir = '/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/test'

for test_image_path in glob(os.path.join(test_image_dir, '*.jpg')):
    test_image = Image.open(test_image_path).convert('RGB')
    label_index = find_closest_class(model, test_image, unseen_weights, seen_weights)
    animal_name = class_to_index.get(label_index, "Unknown")
    image_id = os.path.basename(test_image_path)
    predictions.append((image_id, animal_name))

# making csv file for the outputs 
csv_filename = 'predictions.csv'
with open(csv_filename, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['image_id', 'class']) 

    for image_id, animal_name in predictions:
        writer.writerow([image_id, animal_name])

print(f"CSV file '{csv_filename}' created successfully.")


In [None]:

## code for sorting csv file and creating a download link 

df = pd.read_csv('predictions.csv')
df_sorted = df.sort_values(by='image_id')
sorted_csv_filename = 'sorted_predictions.csv'
df_sorted.to_csv(sorted_csv_filename, index=False)

print(f"Sorted CSV file '{sorted_csv_filename}' created successfully.")

file_path = sorted_csv_filename
assert os.path.isfile(file_path), f"{file_path} does not exist."
download_link = f'<a href="{file_path}" download>Download sorted_predictions.csv</a>'
ipd.display(ipd.HTML(download_link))
