In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#this code loads the first 200 examples from the validation split of the 
#"HuggingFaceM4/VQAv2" dataset into the variable named dataset.

from datasets import load_dataset

# dataset = load_dataset("HuggingFaceM4/VQAv2")
dataset = load_dataset("HuggingFaceM4/VQAv2", split=["train[:25%]", "validation[:25%]"])
dataset

In [None]:
# Access the train split
train_dataset = dataset[0]

# Print the first row
print(train_dataset[0])

In [None]:
train_dataset[0]['image']

In [None]:
# Access the validation split
validation_dataset = dataset[1]

# Print the first row
print(validation_dataset[0])

In [None]:
validation_dataset[0]['image']

# Preprocessing image

In [None]:
#Read only the Answer space from this model (labels and the config file)
from transformers import ViltConfig
config = ViltConfig.from_pretrained("dandelin/vilt-b32-finetuned-vqa")

In [None]:
print(config.id2label)

In [None]:
len(train_dataset)

In [None]:
len(validation_dataset)

In [None]:
from tqdm.notebook import tqdm

def get_score(count: int) -> float:
    return min(1.0, count / 3)

def add_labels_scores(annotation):

    if(annotation['answers'] != None):
        answers = annotation['answers']
        answer_count = {}
        for answer in answers:
            answer_ = answer["answer"]
            answer_count[answer_] = answer_count.get(answer_, 0) + 1
        labels = []
        scores = []
        for answer in answer_count:
            if answer not in config.label2id:
                continue
            labels.append(config.label2id[answer])
            score = get_score(answer_count[answer])
            scores.append(score)
        annotation['labels'] = labels
        annotation['scores'] = scores
 
    return annotation


In [None]:
from PIL import Image
import numpy as np
from IPython.display import display


#This is not the subsetting, so we take the whole train, the subsetting happens way below
num_samples_to_display = len(train_dataset)
subset_train = train_dataset.select(range(num_samples_to_display))

In [None]:
from PIL import Image
import numpy as np
from IPython.display import display

#This is not the subsetting, so we take the whole validation, the subsetting happens way below
num_samples_to_display = len(validation_dataset)
subset_val = validation_dataset.select(range(num_samples_to_display))

In [None]:
def showImage(istrain=True, id=None):
    if istrain:
        data = subset_train
    else:
        data = subset_val
    if id == None:
        id = np.random.randint(len(data))
    
    modified_item = add_labels_scores(data[id])
    #print(f"Sample {id}: {modified_item}\n")
    image = modified_item['image']

    print(image)
    display(image)

    print("Question:\t", modified_item["question"])
    print("Answer:\t", modified_item["answers"])
    print("Labels:\t", modified_item["labels"])
    print("Scores:\t", modified_item["scores"])
    print("Scores for these labels:\t",[config.id2label[label] for label in modified_item["labels"]])

In [None]:
showImage(True)

In [None]:
showImage(False)

In [None]:
import torch
from PIL import Image

class VQADataset(torch.utils.data.Dataset):
    """VQA (v2) dataset."""

    def __init__(self, questions, annotations, preprocessor,tokenizer):
        self.questions = questions
        self.annotations = annotations
        self.preprocessor = preprocessor
        self.tokenizer=tokenizer

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        # get image + text
        annotations = self.annotations[idx]
        questions = self.questions[idx]
        image = annotations['image']
        image = image.convert("RGB")  # Explicitly convert the PIL Image object to RGB mode        
        image = np.array(image)
        text = questions['question']
        
        encoding = self.preprocessor(image, return_tensors="pt")
        encoded_text = self.tokenizer(
            text=text,
            padding='max_length',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )

        encoding ["input_ids"]= encoded_text['input_ids']
        encoding ["token_type_ids"]= encoded_text['token_type_ids']
        encoding ["attention_mask"]= encoded_text['attention_mask']


        # remove batch dimension
        for k,v in encoding.items():
          encoding[k] = v.squeeze()
        # add labels
        labels = annotations['labels']
        scores = annotations['scores']
        # based on: https://github.com/dandelin/ViLT/blob/762fd3975c180db6fc88f577cf39549983fa373a/vilt/modules/objectives.py#L301
        targets = torch.zeros(len(config.id2label))
        for label, score in zip(labels, scores):
              targets[label] = score
        encoding["labels"] = targets

        return encoding


# Subsetting

In [None]:
import random

# Specify the number of samples you want to use
num_samples = 15000

# Randomly sample indices for our subset
indices = random.sample(range(len(subset_train)), num_samples)

# Create subset from the sampled indices
subset_questions = [{'question': subset_train[i]['question']} for i in indices]
subset_annotations = [add_labels_scores(subset_train[i]) for i in indices]

In [None]:
# Specify the number of samples you want to use
num_samples = 3000

# Randomly sample indices for our subset
val_indices = random.sample(range(len(subset_val)), num_samples)
subset_val_questions = [{'question': subset_val[i]['question']} for i in val_indices]
subset_val_annotations = [add_labels_scores(subset_val[i]) for i in val_indices]

In [None]:
print(subset_questions[0])
print(subset_annotations[0])
     

In [None]:
print(subset_val_questions[0])
print(subset_val_annotations[0])

In [None]:
from transformers import AutoTokenizer,AutoFeatureExtractor
text='roberta-base'
image='google/vit-base-patch16-224-in21k'
tokenizer = AutoTokenizer.from_pretrained(text)
preprocessor=AutoFeatureExtractor.from_pretrained(image)

In [None]:
vqa2_dataset = VQADataset(questions=subset_questions,
                     annotations=subset_annotations,
                     preprocessor=preprocessor,
                     tokenizer=tokenizer)

In [None]:
vqa2_dataset_val = VQADataset(questions=subset_val_questions,
                     annotations=subset_val_annotations,
                     preprocessor=preprocessor,
                     tokenizer=tokenizer)

In [None]:
vqa2_dataset[0].keys()

In [None]:
vqa2_dataset_val[0].keys()

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
  input_ids = [item['input_ids'] for item in batch]
  #[print(len(item)) for item in input_ids]
  pixel_values = [item['pixel_values'] for item in batch]
  attention_mask = [item['attention_mask'] for item in batch]
  token_type_ids = [item['token_type_ids'] for item in batch]
  labels = [item['labels'] for item in batch]
  
  # # create padded pixel values and corresponding pixel mask
  # encoding = processor.feature_extractor.pad_and_create_pixel_mask(pixel_values, return_tensors="pt")
  
  # create new batch
  batch = {}
  batch['pixel_values'] = torch.stack(pixel_values)
  batch['input_ids'] = torch.stack(input_ids)
  batch['token_type_ids'] = torch.stack(token_type_ids)
  batch['attention_mask'] = torch.stack(attention_mask)
  # batch['pixel_mask'] = encoding['pixel_mask']
  batch['labels'] = torch.stack(labels)
  
  return batch

train_dataloader = DataLoader(vqa2_dataset, collate_fn=collate_fn, batch_size=4, shuffle=True,num_workers=4)
val_dataloader = DataLoader(vqa2_dataset_val, collate_fn=collate_fn, batch_size=4,num_workers=4)

In [None]:
batch = next(iter(train_dataloader))
for k,v in batch.items():
  print(k, v.shape)

In [None]:
## PyTorch
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim

from typing import Dict, List, Optional, Tuple

# Define the Model

In [None]:
class MultimodalVQAModel(nn.Module):
    def __init__(
            self,
            num_labels: int = len(config.id2label),
            intermediate_dim: int = 512,
            pretrained_text_name: str = 'roberta-base',
            pretrained_image_name: str = 'google/vit-base-patch16-224-in21k'):
     
        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name
        
        self.text_encoder = AutoModel.from_pretrained(
            self.pretrained_text_name,
        )
        self.image_encoder = AutoModel.from_pretrained(
            self.pretrained_image_name,
        )
        self.fusion = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
        )
        
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        
        self.criterion = nn.CrossEntropyLoss()
    def forward(
            self,
            pixel_values: torch.FloatTensor,
            input_ids: torch.LongTensor,
            token_type_ids: Optional[torch.LongTensor] = None,
            attention_mask: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):
        
        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        fused_output = self.fusion(
            torch.cat(
                [
                    encoded_text['pooler_output'],
                    encoded_image['pooler_output'],
                ],
                dim=1
            )
        )
        logits = self.classifier(fused_output)
        out = {
            "logits": logits
        }
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss
        return out

In [None]:
# Function for setting the seed
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
set_seed(42)

# Ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cpu") if not torch.cuda.is_available() else torch.device("cuda:0")
print("Using device", device)

# Model Structure

In [None]:
from transformers import AutoModel

In [None]:
model = MultimodalVQAModel()
model

# Forward Pass

In [None]:
example = vqa2_dataset[0]
print(example.keys())
# add batch dimension + move to GPU
example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}
print(example)
# forward pass
model.to(device)
outputs = model(**example)

In [None]:
labels = np.random.randint(len(config.id2label), size=5)
preds = np.random.randint(len(config.id2label), size=5)

def showAnswers(ids):
    print([config.id2label[id] for id in ids])

showAnswers(labels)
showAnswers(preds)

# Number of parameters of the model

In [None]:
def countTrainableParameters(model):
    num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("No. of trainable parameters:\t{0:,}".format(num_params))


In [None]:
countTrainableParameters(model) # For BERT-ViT model

# Using Lightning module for training

In [None]:
!pip install pytorch-lightning

In [None]:
import torch
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

class LightningMultimodalVQAModel(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = model

    def forward(self, pixel_values, input_ids, token_type_ids=None, attention_mask=None, labels=None):
        return self.model(pixel_values=pixel_values, input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask, labels=labels)

    def training_step(self, batch, batch_idx):
        output = self(**batch)
        loss = output["loss"]
        self.log('train_loss', loss)
        
        # Compute additional metrics
        preds = torch.argmax(output["logits"], dim=1).detach().cpu().numpy()
        labels = torch.argmax(batch["labels"], dim=1).detach().cpu().numpy()

        accuracy = accuracy_score(labels, preds)
        precision = precision_score(labels, preds, average='weighted')
        recall = recall_score(labels, preds, average='weighted')
        f1 = f1_score(labels, preds, average='weighted')

        self.log('train_accuracy', accuracy, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_precision', precision, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_recall', recall, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_f1', f1, on_step=True, on_epoch=True, prog_bar=True)
        
        return loss

    def validation_step(self, batch, batch_idx):
        output = self(**batch)
        loss = output["loss"]
        self.log('val_loss', loss)

        # Compute additional metrics
        preds = torch.argmax(output["logits"], dim=1).detach().cpu().numpy()
        labels = torch.argmax(batch["labels"], dim=1).detach().cpu().numpy()

        accuracy = accuracy_score(labels, preds)
        precision = precision_score(labels, preds, average='weighted')
        recall = recall_score(labels, preds, average='weighted')
        f1 = f1_score(labels, preds, average='weighted')

        self.log('val_accuracy', accuracy, on_step=True, on_epoch=True, prog_bar=True)
        self.log('val_precision', precision, on_step=True, on_epoch=True, prog_bar=True)
        self.log('val_recall', recall, on_step=True, on_epoch=True, prog_bar=True)
        self.log('val_f1', f1, on_step=True, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx):
        output = self(**batch)
        loss = output["loss"]
        self.log('test_loss', loss)

        # Compute additional metrics
        preds = torch.argmax(output["logits"], dim=1).detach().cpu().numpy()
        labels = torch.argmax(batch["labels"], dim=1).detach().cpu().numpy()

        accuracy = accuracy_score(labels, preds)
        precision = precision_score(labels, preds, average='weighted')
        recall = recall_score(labels, preds, average='weighted')
        f1 = f1_score(labels, preds, average='weighted')

        self.log('test_accuracy', accuracy, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_precision', precision, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_recall', recall, on_step=True, on_epoch=True, prog_bar=True)
        self.log('test_f1', f1, on_step=True, on_epoch=True, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=5e-5)
        return optimizer



In [None]:
from sklearn.metrics import accuracy_score,f1_score



In [None]:
# Initialize your model
multimodal_model = model
lightning_model = LightningMultimodalVQAModel(multimodal_model)

# Define the checkpoint callback
checkpoint_callback = ModelCheckpoint(
    dirpath="my_model/checkpoint/",
    save_top_k=1,  # Save only the best model
    verbose=True,
    monitor="val_accuracy",
    mode="max"
)

# Initialize the trainer
trainer = pl.Trainer(max_epochs=15,  callbacks=[checkpoint_callback])

# Fit the model
trainer.fit(lightning_model, train_dataloader, val_dataloader)

In [None]:
# %load_ext tensorboard
# %tensorboard --logdir lightning_logs/ --port 4000

# Inference

In [None]:
# # Load the model from the checkpoint
# checkpoint_path = "/kaggle/working/my_model/checkpoint/epoch=0-step=1250.ckpt"
# lightning_model = LightningMultimodalVQAModel.load_from_checkpoint(checkpoint_path, model=MultimodalVQAModel())

# # Place the model into evaluation mode and move it to the correct device
# lightning_model = lightning_model.to(device)
# lightning_model.eval()

# For Validation

In [None]:
# id=56
# showImage(False,id)
# example = vqa2_dataset_val[id]
# example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}

# # Forward pass
# with torch.no_grad():
#     input_ids = example["input_ids"]
#     print(input_ids.shape)
#     outputs = lightning_model(**example)
#     logits = outputs["logits"]
#     top2_values, top2_indices = logits.topk(2, dim=-1)
#     predicted_classes = top2_indices.squeeze().tolist()
#     print("Predicted answers:", [config.id2label[predicted_class] for predicted_class in predicted_classes])

In [None]:
# id=41
# showImage(False,id)
# example = vqa2_dataset_val[id]
# example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}

# # Forward pass
# with torch.no_grad():
#     outputs = lightning_model(**example)
#     logits = outputs["logits"]
#     top2_values, top2_indices = logits.topk(2, dim=-1)
#     predicted_classes = top2_indices.squeeze().tolist()
#     print("Predicted answers:", [config.id2label[predicted_class] for predicted_class in predicted_classes])

In [None]:
# id=90
# showImage(False,id)
# example = vqa2_dataset_val[id]
# example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}
# example

# # Forward pass
# with torch.no_grad():
#     outputs = lightning_model(**example)
#     logits = outputs["logits"]
#     top2_values, top2_indices = logits.topk(2, dim=-1)
#     predicted_classes = top2_indices.squeeze().tolist()
#     print("Predicted answers:", [config.id2label[predicted_class] for predicted_class in predicted_classes])

In [None]:
# id=154
# showImage(False,id)
# example = vqa2_dataset_val[id]
# example = {k: v.unsqueeze(0).to(device) for k,v in example.items()}

# # Forward pass
# with torch.no_grad():
#     outputs = lightning_model(**example)
#     logits = outputs["logits"]
#     top2_values, top2_indices = logits.topk(2, dim=-1)
#     predicted_classes = top2_indices.squeeze().tolist()
#     print("Predicted answers:", [config.id2label[predicted_class] for predicted_class in predicted_classes])