In [None]:
import os
import re
import pickle
import nltk
import pandas as pd
import numpy as np
import tensorflow as tf
import torch
import torchvision
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from keras.callbacks import TensorBoard

from PIL import Image
from tqdm.notebook import tqdm
from keras.models import Model
from keras.applications.vgg19 import VGG19, preprocess_input
from keras.utils import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer, AutoFeatureExtractor, AutoModel,            
    TrainingArguments, Trainer, logging
)
from datasets import load_dataset, set_caching_enabled, Dataset
# from nltk.corpus import wordnet

# # nltk setup
# nltk.download('wordnet')

# Environment setup
os.environ["WANDB_DISABLED"] = "true"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# os.environ['HF_HOME'] = os.path.join(".", "cache")
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
# set_caching_enabled(True)
# Logging setup
logging.set_verbosity_error()

# Device setup
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
torch.cuda.empty_cache()


In [None]:
df = pd.read_csv("/kaggle/input/sampled-dataset-for-vqa/sampled_train_dataset_kaggle.csv")
df

In [None]:
df[df['hashed_image_id']==5]

In [None]:
dir_path = "/kaggle/input/sampled-dataset-for-vqa"
questions = df['question']

images = []

for image_path in tqdm(df["kaggle_image_path"]):
#     print(image_path)
    images.append(os.path.join(dir_path, image_path))
# print(image_path)
# questions
# images

In [None]:
test_df = pd.read_csv("/kaggle/input/sampled-dataset-for-vqa/sampled_validation_dataset_kaggle.csv")
test_df

# Preparing the classification problem on the visual question answering portion

In [None]:
list_vocabulary = list(df['multiple_choice_answer'].unique())

In [None]:
def label_encoder(word):
    if word in list_vocabulary:
        index = list_vocabulary.index(word)
        return index
    else:
        return len(list_vocabulary)

In [None]:
print(len(list_vocabulary))

In [None]:
# df

In [None]:
df['Labels'] = df['multiple_choice_answer'].apply(lambda x: label_encoder(x))
df

In [None]:
test_df['Labels'] = test_df['multiple_choice_answer'].apply(lambda x: label_encoder(x))
test_df

In [None]:
# df[df['image_id']==25]
# /kaggle/input/sampled-dataset-for-vqa/train2014/train2014/COCO_train2014_000000000025.jpg

In [None]:
df_train = df
df_test = test_df
df_test

In [None]:

df_train.to_csv("data_train.csv", index=None)
df_test.to_csv("data_eval.csv", index=None)

In [None]:
dataset = load_dataset(
    "csv", 
    data_files={
        "train": "data_train.csv",
        "test": "data_eval.csv"
    }
)

In [None]:
print(dataset['train'])

# Multimodal Coattention Layer

In [None]:
class FeedForwardNeuralNetwork(nn.Module):
    def __init__(self,embedding_dim):
        super(FeedForwardNeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(embedding_dim,2*embedding_dim)
        self.fc2 = nn.Linear(2*embedding_dim,embedding_dim)
        self.activation_function = nn.GELU()
#         self.layer_norm = nn.LayerNorm(embedding_dim)
    def forward(self,input_data):
        output = self.fc1(input_data)
        output = self.activation_function(output)
        output = self.fc2(output)
        output = self.activation_function(output)
#         output = self.layer_norm(output)
        return output
        

In [None]:
class AttentionBert(nn.Module):
    def __init__(self,hidden_size, num_heads):
        super(AttentionBert, self).__init__()
#         self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.layer_norm1 = nn.LayerNorm(self.hidden_size).to(device)
        self.ffn = FeedForwardNeuralNetwork(self.hidden_size).to(device)
#         self.query_linear = nn.Linear(input_size, hidden_size)
#         self.key_linear = nn.Linear(input_size, hidden_size)
#         self.value_linear = nn.Linear(input_size, hidden_size)
        
        self.multihead_attention = nn.MultiheadAttention(self.hidden_size, self.num_heads).to(device)
    def forward(self, query,input_features):
        # Transform query, key, and value
#         query = self.query_linear(query)
#         key = self.key_linear(input_features)
#         value = self.value_linear(input_features)
        query = query
        key = input_features
        value = input_features
        
        # Transpose for multihead attention
        query = query.transpose(0, 1)  # (seq_len, batch_size, hidden_size)
        key = key.transpose(0, 1)  # (seq_len, batch_size, hidden_size)
        value = value.transpose(0, 1)  # (seq_len, batch_size, hidden_size)
#         print(query.get_device())
#         print(key.get_device())
#         print(value.get_device())
        # Compute co-attention and add to the query of the layer
        co_attention_output, _ = self.multihead_attention(query, key, value)
#         query = query.transpose(0, 1)
        residual_output = torch.add(co_attention_output,query)
        
        # Transpose back to original shape and normalize
        residual_output = residual_output.transpose(0, 1) # (batch_size, seq_len, hidden_size)
#         print(residual_output.get_device())
        normalized_residual_output = self.layer_norm1(residual_output)
        
        #Send input to feedforward neural network and add and nor
        feedforwardoutput = self.ffn.forward(normalized_residual_output)
        residual_output2 = torch.add(feedforwardoutput,normalized_residual_output)
#         print(residual_output2.get_device())
        normalized_residual_output2 = self.layer_norm1(residual_output2)
        
        return normalized_residual_output2

In [None]:
class SelfTRM(nn.Module):
    def __init__(self,num_layers,num_heads,input_size):
        super(SelfTRM,self).__init__()
        self.num_layers = num_layers
        self.layers = []
        for i in range(self.num_layers):
            self.layers.append(AttentionBert(input_size,num_heads))
    def forward(self,input):
#         print(input.get_device())
        for i in range(self.num_layers):
            output = self.layers[i].forward(input,input)
            input = output
        return output

In [None]:
class MLP_classifier(nn.Module):
    def __init__(self,input_size,output_size):
        super(MLP_classifier,self).__init__()
        self.linear1 = nn.Linear(input_size,input_size*2)
        self.activation_function1 =  nn.GELU()
        self.linear2 = nn.Linear(input_size*2,output_size)
        self.activation_function2 = nn.Softmax()
    def forward(self,input):
        output = self.linear1(input)
        output = self.activation_function1(output)
        output = self.linear2(output)
        return output
#         output = self.activation_function2(output)
        

In [None]:
class BiattentionforImageandWord(nn.Module):
    def __init__(self,num_layers,num_heads,input_size):
        super(BiattentionforImageandWord,self).__init__()
        self.layers_image = []
        self.layers_word = []
        self.input_size = input_size
        self.num_heads = num_heads
        self.num_layers = num_layers
        for i in range(self.num_layers):
            self.layers_image.append(AttentionBert(input_size,num_heads))
            self.layers_image.append(AttentionBert(input_size,num_heads))
            self.layers_word.append(AttentionBert(input_size,num_heads))
            self.layers_word.append(AttentionBert(input_size,num_heads))
    def forward(self,query_image,key_image,query_word,key_word):
        for i in range(self.num_layers):
            output_image1 = self.layers_image[2*i].forward(query_image,key_word)
            output_image2 = self.layers_image[2*i+1].forward(output_image1,output_image1)
            output_word1 = self.layers_word[2*i].forward(query_word,key_image)
            output_word2 = self.layers_word[2*i+1].forward(output_word1,output_word1)
            query_image = key_image = output_image2
            query_word = key_word = output_word2
        return output_image2,output_word2    

In [None]:
class MultimodalBert(nn.Module):
    def __init__(self,text,image,num_layers,num_heads,input_size,output_size):
        super(MultimodalBert,self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.word_model = AutoModel.from_pretrained(text)
        self.image_model = AutoModel.from_pretrained(image)
        self.CoTRM = BiattentionforImageandWord(num_layers,num_heads,self.input_size)
        self.wordTRM = SelfTRM(num_layers,num_heads,self.input_size)
        self.MLPclassifier = MLP_classifier(self.input_size*2,self.output_size)
        self.criterion = nn.CrossEntropyLoss()
    def forward(self,
            input_ids: torch.LongTensor,
            pixel_values: torch.FloatTensor,
            attention_mask: Optional[torch.LongTensor] = None,
            token_type_ids: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):
        output = self.word_model(input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,)
#                 output =  self.word_model(**encoded_text)
        word_features = output.last_hidden_state
        output = self.image_model(
            pixel_values=pixel_values,
            return_dict=True,
        )
        image_features = output.last_hidden_state
        #     (self,image_features,word_features,labels = None):
        
        #Self Transformer before CoTRM
        self_trm_output = self.wordTRM.forward(word_features)
        #CoTRM Portion
        CoTRM_word_input = self_trm_output 
        CoTRM_image_input = image_features
        CoTRM_output_image,CoTRM_output_word = self.CoTRM.forward(CoTRM_image_input,CoTRM_image_input,CoTRM_word_input,CoTRM_word_input)
        # Merged output and MLP classifier
        merged_input = torch.cat([CoTRM_output_image[:,0,:],CoTRM_output_word[:,0,:]],dim=1)
        final_output = self.MLPclassifier.forward(merged_input)
#         class_outputs = np.argmax(final_output,axis = 1)
        out = {
            "out": final_output
        }
        if labels is not None:
            loss = self.criterion(final_output, labels)
            out["loss"] = loss
        return out

In [None]:
# The dataclass decorator is used to automatically generate special methods to classes, 
# including __init__, __str__ and __repr__. It helps reduce some boilerplate code.
@dataclass
class MultimodalCollator:
    tokenizer: AutoTokenizer

    preprocessor: AutoFeatureExtractor
        
    def tokenize_text(self, texts: List[str]):
        #Encoded text 
        encoded_text = self.tokenizer(
            text=texts, 
            padding='longest',
            max_length=40,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
#         output =  self.word_model(**encoded_text)
#         word_features = output.last_hidden_state
#         return {
#             "word_features": word_features
#         }
        return {
                "input_ids": encoded_text['input_ids'].squeeze(),
#                 "token_type_ids": encoded_text['token_type_ids'].squeeze(),
#                 "attention_mask": encoded_text['attention_mask'].squeeze(),
            }

    def preprocess_images(self, images: List[str]):
        #Fixed
#         print("Done")
        
        processed_images = self.preprocessor(
            images=[Image.open(os.path.join("/kaggle/input/sampled-dataset-for-vqa", image_name.replace("\\", "/"))).convert('RGB') for image_name in images],
            return_tensors="pt",
        )
#         print("done2")
#         output = self.image_model(**processed_images) #Of the form (Num_images,Num_tokens,Hidden_size)
#         image_features = output.last_hidden_state
#         return{
#             "image_features": image_features
#         }
        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }
            
    def __call__(self, raw_batch_dict):
        #Fix the label return value
        return {
            **self.tokenize_text(
                raw_batch_dict['question']
                if isinstance(raw_batch_dict, dict) else
                [i['question'] for i in raw_batch_dict]
            ),
            **self.preprocess_images(
                raw_batch_dict["kaggle_image_path"]
                if isinstance(raw_batch_dict, dict) else
                [i["kaggle_image_path"] for i in raw_batch_dict]
            ),
            'labels': torch.tensor(
                raw_batch_dict['Labels']
                if isinstance(raw_batch_dict, dict) else
                [i['Labels'] for i in raw_batch_dict],
                dtype=torch.int64
            ),
        }

In [None]:
def createMultimodalVQACollatorAndModel(text='bert-base-uncased', image='google/vit-base-patch16-224-in21k',num_layers = 3,num_heads=3,hidden_size = 768,vocabulary_size =9129):
    # Initialize the correct text tokenizer and image feature extractor, and use them to create the collator
    tokenizer = AutoTokenizer.from_pretrained(text)
#     word_model = AutoModel.from_pretrained(text)
    preprocessor = AutoFeatureExtractor.from_pretrained(image)
#     image_model = AutoModel.from_pretrained(image)
    multimodal_collator = MultimodalCollator(tokenizer=tokenizer, preprocessor=preprocessor)
    
    # Initialize the multimodal model with the appropriate weights from pretrained models
    multimodal_model = MultimodalBert(text,image,num_layers = num_layers,num_heads=num_heads,input_size = hidden_size,output_size = vocabulary_size).to(device)
#     multimodal_model = multimodal_model.to(device)
    return multimodal_collator, multimodal_model

In [None]:
!pip install evaluate

import evaluate

metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load('recall')

In [None]:
# # Wrapper around the wup_measure(...) function to process batch inputs
# # def batch_wup_measure(labels, preds):
# #     wup_scores = [wup_measure(label_encoder(label), label_encoder(pred)) for label, pred in zip(labels, preds)]
# #     return np.mean(wup_scores)

# # # Function to compute all relevant performance metrics, to be passed into the trainer
# # def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
# #     logits, labels = eval_tuple
# #     preds = logits.argmax(axis=-1)
# #     return {
# # #         "wups": batch_wup_measure(labels, preds),
# #         "acc": accuracy_score(labels, preds),
# #         "f1": f1_score(labels, preds, average='weighted')
# #     }
# def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]):
#     """
#     Computes evaluation metrics for a given set of logits and labels.

#     Args:
#         eval_tuple (Tuple): Tuple containing logits and corresponding ground truth labels.

#     Returns:
#         Dict: Dictionary of computed metrics, including WUP similarity, accuracy, and F1 score.
#     """
#     logits, labels = eval_tuple

#     # Calculate predictions
#     preds = logits.argmax(axis=-1)
# #     print(preds)
# #     print("Done")
# #     print("\n")
# #     Compute metrics
#     metrics = {
#         "eval_acc": accuracy_score(labels, preds),
#         "eval_f1": f1_score(labels, preds, average='weighted')
#     }
#     return metrics
# # #     print( metric.compute(predictions=preds, references=labels))
# #     return metric.compute(predictions=preds, references=labels)

In [None]:
# Wrapper around the wup_measure(...) function to process batch inputs
# def batch_wup_measure(labels, preds):
#     wup_scores = [wup_measure(label_encoder(label), label_encoder(pred)) for label, pred in zip(labels, preds)]
#     return np.mean(wup_scores)

# # Function to compute all relevant performance metrics, to be passed into the trainer
# def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
#     logits, labels = eval_tuple
#     preds = logits.argmax(axis=-1)
#     return {
# #         "wups": batch_wup_measure(labels, preds),
#         "acc": accuracy_score(labels, preds),
#         "f1": f1_score(labels, preds, average='weighted')
#     }
from sklearn.metrics import recall_score
def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]):
    """
    Computes evaluation metrics for a given set of logits and labels.

    Args:
        eval_tuple (Tuple): Tuple containing logits and corresponding ground truth labels.

    Returns:
        Dict: Dictionary of computed metrics, including WUP similarity, accuracy, and F1 score.
    """
    logits, labels = eval_tuple

    # Calculate predictions
    preds = logits.argmax(axis=-1)
#     print(preds)
#     print("Done")
#     print("\n")
#     Compute metrics
    metrics = {
        "eval_acc": accuracy_score(labels, preds),
        "eval_f1": f1_score(labels, preds, average='weighted'),
        "eval_precision": precision_metric.compute(predictions=preds, references=labels,average="weighted"),
#         "eval_recall": recall_metric.compute(prediction=preds, references=labels,average='macro'),
        "eval_recall": recall_score(labels, preds, average='weighted')
    }
    return metrics
# #     print( metric.compute(predictions=preds, references=labels))
#     return metric.compute(predictions=preds, references=labels)

In [None]:
# from transformers import AutoModel
# model2 = AutoModel.from_pretrained("/kaggle/input/vqamodel1/", use_safetensors=True)

In [None]:

multi_args = TrainingArguments(
    output_dir="/kaggle/working/vilbert/",
    seed=12345, 
    evaluation_strategy="epoch",
    logging_strategy="epoch",
    save_strategy="epoch",
    save_steps=100,
    save_total_limit=1,             # Since models are large, save only the last 3 checkpoints at any given time while training 
    metric_for_best_model="acc",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    remove_unused_columns=False,
    num_train_epochs=5,
    fp16=False,
    dataloader_num_workers=4,
    load_best_model_at_end=True,
    report_to="tensorboard",
)

# Initialize the actual collator and multimodal model
collator, model = createMultimodalVQACollatorAndModel("bert-base-uncased", "google/vit-base-patch16-224-in21k")

# Initialize the trainer with the dataset, collator, model, hyperparameters and evaluation metrics
multi_trainer = Trainer(
	model,
	multi_args,
	train_dataset=dataset['train'],
	eval_dataset=dataset['test'],
	data_collator=collator,
	compute_metrics=compute_metrics
#     , callbacks = [TensorBoardCallback()]
)

# Start the training loop
train_multi_metrics = multi_trainer.train()
model_path = os.path.join("/kaggle/working/vilbert/", "pytorch_model.bin")
torch.save(model.state_dict(), model_path)
# Run the model on the evaluation set to obtain final metrics
eval_multi_metrics = multi_trainer.evaluate()