<a href="https://colab.research.google.com/github/PramodC140/Multimodel-VQA/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# Install required libraries (uncomment the line below if running for the first time)
!pip install transformers nltk numpy datasets==2.16.0




In [13]:
import numpy as np
import pandas as pd
import os
import random
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

from datasets import load_dataset, set_caching_enabled
from PIL import Image
import torch
import torch.nn as nn

from transformers import (
    AutoTokenizer, AutoFeatureExtractor,
    AutoModel, AutoConfig,
    TrainingArguments, Trainer,
    logging
)

import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

import nltk
nltk.download('wordnet')
nltk.download('omw-1.4')
from nltk.corpus import wordnet

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Set up caching for Hugging Face libraries
os.environ['HF_HOME'] = os.path.join(".", "cache")
set_caching_enabled(True)
logging.set_verbosity_error()

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


Using device: cuda
Tesla T4
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
from google.colab import files

# Upload training and test datasets
uploaded = files.upload()  # Upload data_train.csv and data_eval.csv

# Upload answer_space.txt
uploaded = files.upload()  # Upload answer_space.txt

# Set paths to dataset files
train_path = "/content/data_train.csv"
test_path = "/content/data_eval.csv"
answer_space_path = "/content/answer_space.txt"

# Load dataset using the datasets library
dataset = load_dataset(
    "csv",
    data_files={
        "train": train_path,
        "test": test_path
    }
)

# Load answer space and process it (each line contains one answer)
with open(answer_space_path) as f:
    answer_space = f.read().splitlines()

# Map dataset to include numerical labels corresponding to answer_space entries.
# For multiple answers in a cell, we select the first answer (after removing extra spaces).
dataset = dataset.map(
    lambda examples: {
        'label': [
            answer_space.index(ans.replace(" ", "").split(",")[0])
            for ans in examples['answer']
        ]
    },
    batched=True
)


Saving data_eval.csv to data_eval (1).csv
Saving data_train.csv to data_train (1).csv


Saving answer_space.txt to answer_space (1).txt


In [15]:
import IPython.display as display

def show_example(train=True, idx=None):
    dataset_type = "train" if train else "test"
    data = dataset[dataset_type]
    if idx is None:
        idx = np.random.randint(len(data))

    # Adjust the image path as per your Drive folder structure
    image_path = f"/content/drive/MyDrive/projectIE/dataset/dataset/images/{data[idx]['image_id']}.png"
    image = Image.open(image_path)
    display.display(image)

    question = data[idx]["question"]
    answer = data[idx]["answer"]
    label = data[idx]["label"]

    print(f"Question:\t {question}")
    print(f"Answer:\t\t {answer} (Label: {label})")
    return answer

# Test the function (uncomment to test)
# show_example()


In [16]:
@dataclass
class MultimodalCollator:
    tokenizer: AutoTokenizer
    preprocessor: AutoFeatureExtractor

    def tokenize_text(self, texts: List[str]) -> Dict[str, torch.Tensor]:
        encoded_text = self.tokenizer(
            text=texts,
            padding='longest',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
        return {
            "input_ids": encoded_text['input_ids'],
            "token_type_ids": encoded_text['token_type_ids'],
            "attention_mask": encoded_text['attention_mask'],
        }

    def preprocess_images(self, image_ids: List[str]) -> Dict[str, torch.Tensor]:
        # Adjust the image directory path as needed
        images = [
            Image.open(os.path.join("/content/drive/MyDrive/projectIE/dataset/dataset/images", f"{image_id}.png")).convert('RGB')
            for image_id in image_ids
        ]
        processed_images = self.preprocessor(
            images=images,
            return_tensors="pt",
        )
        return {"pixel_values": processed_images['pixel_values']}

    def __call__(self, raw_batch_dict) -> Dict[str, torch.Tensor]:
        if isinstance(raw_batch_dict, dict):
            question_batch = raw_batch_dict['question']
            image_id_batch = raw_batch_dict['image_id']
            label_batch = raw_batch_dict['label']
        else:
            question_batch = [d['question'] for d in raw_batch_dict]
            image_id_batch = [d['image_id'] for d in raw_batch_dict]
            label_batch = [d['label'] for d in raw_batch_dict]

        tokenized = self.tokenize_text(question_batch)
        images = self.preprocess_images(image_id_batch)
        tokenized.update(images)
        tokenized['labels'] = torch.tensor(label_batch, dtype=torch.long)
        return tokenized


In [17]:
class ImprovedMultimodalVQAModel(nn.Module):
    def __init__(self, num_labels, intermediate_dim=512,
                 text_model_name='bert-base-uncased',
                 image_model_name='microsoft/beit-base-patch16-224-pt22k-ft22k'):
        super(ImprovedMultimodalVQAModel, self).__init__()
        self.num_labels = num_labels

        # Initialize text encoder (BERT)
        self.text_encoder = AutoModel.from_pretrained(text_model_name)
        # Initialize image encoder (BEIT)
        self.image_encoder = AutoModel.from_pretrained(image_model_name)

        # Get hidden sizes (assumes BERT and BEIT have a 'hidden_size' attribute in config)
        text_hidden_size = self.text_encoder.config.hidden_size
        image_hidden_size = self.image_encoder.config.hidden_size

        # Fusion network: a two-layer MLP with BatchNorm and Dropout
        self.fusion = nn.Sequential(
            nn.Linear(text_hidden_size + image_hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.BatchNorm1d(intermediate_dim),
            nn.Dropout(0.5),
            nn.Linear(intermediate_dim, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        # Classifier head
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, input_ids, pixel_values, attention_mask=None, token_type_ids=None, labels=None):
        # Text encoding
        text_out = self.text_encoder(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        # For text, use pooler_output if available; else use first token of last_hidden_state
        if hasattr(text_out, 'pooler_output') and text_out.pooler_output is not None:
            text_features = text_out.pooler_output
        else:
            text_features = text_out.last_hidden_state[:, 0]

        # Image encoding
        image_out = self.image_encoder(pixel_values)
        # For BEIT, pooler_output might not be available; so use the [CLS] token (first token)
        if hasattr(image_out, 'pooler_output') and image_out.pooler_output is not None:
            image_features = image_out.pooler_output
        else:
            image_features = image_out.last_hidden_state[:, 0]

        # Concatenate features
        combined = torch.cat([text_features, image_features], dim=1)
        fused = self.fusion(combined)
        logits = self.classifier(fused)

        output = {"logits": logits}
        if labels is not None:
            loss = self.criterion(logits, labels)
            output["loss"] = loss
        return output


In [18]:
def wup_measure(a, b, similarity_threshold=0.925):
    def get_semantic_field(word):
        weight = 1.0
        semantic_field = wordnet.synsets(word, pos=wordnet.NOUN)
        return semantic_field, weight

    def get_stem_word(word):
        weight = 1.0
        return word, weight

    a, global_weight_a = get_stem_word(a)
    b, global_weight_b = get_stem_word(b)
    global_weight = min(global_weight_a, global_weight_b)

    if a == b:
        return 1.0 * global_weight
    if a == "" or b == "":
        return 0

    interp_a, weight_a = get_semantic_field(a)
    interp_b, weight_b = get_semantic_field(b)
    if not interp_a or not interp_b:
        return 0

    global_max = 0.0
    for x in interp_a:
        for y in interp_b:
            local_score = x.wup_similarity(y)
            if local_score and local_score > global_max:
                global_max = local_score

    interp_weight = 0.1 if global_max < similarity_threshold else 1.0
    return global_max * weight_a * weight_b * interp_weight * global_weight

def batch_wup_measure(labels, preds):
    wup_scores = [wup_measure(answer_space[label], answer_space[pred]) for label, pred in zip(labels, preds)]
    return np.mean(wup_scores)

def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
    logits, labels = eval_tuple
    preds = logits.argmax(axis=-1)
    metrics = {
        "wups": batch_wup_measure(labels, preds),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro')
    }
    return metrics


In [19]:
def create_multimodal_collator_and_model(image_model='microsoft/beit-base-patch16-224-pt22k-ft22k'):
    # Initialize tokenizer and feature extractor
    tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
    preprocessor = AutoFeatureExtractor.from_pretrained(image_model)

    multimodal_collator = MultimodalCollator(
        tokenizer=tokenizer,
        preprocessor=preprocessor,
    )

    multimodal_model = ImprovedMultimodalVQAModel(
        num_labels=len(answer_space),
        intermediate_dim=512,
        text_model_name='bert-base-uncased',
        image_model_name=image_model
    ).to(device)

    return multimodal_collator, multimodal_model

collator, model = create_multimodal_collator_and_model()


In [20]:
args = TrainingArguments(
    output_dir=os.path.join("/content/drive/MyDrive/projectIE/dataset/dataset", "BEIT_BERT_improved"),
    seed=12345,
    evaluation_strategy="epoch",
    logging_strategy="epoch",
    save_strategy="epoch",
    metric_for_best_model='wups',
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    remove_unused_columns=False,
    num_train_epochs=35,
    fp16=True,
    dataloader_num_workers=8,
    load_best_model_at_end=True,
    learning_rate=2e-5,
    warmup_steps=500,
    weight_decay=0.01
)

from transformers import Trainer

trainer = Trainer(
    model,
    args,
    train_dataset=dataset['train'],
    eval_dataset=dataset['test'],
    data_collator=collator,
    compute_metrics=compute_metrics
)


In [None]:
# Train the model
train_multi_metrics = trainer.train()

# Evaluate the model
eval_multi_metrics = trainer.evaluate()
print("Evaluation Metrics:")
print(eval_multi_metrics)


{'loss': 6.4422, 'grad_norm': 29.83637046813965, 'learning_rate': 8.400000000000001e-06, 'epoch': 1.0}
{'eval_loss': 6.315776348114014, 'eval_wups': 0.03809254720618344, 'eval_acc': 0.0033491979552265115, 'eval_f1': 0.0004139393921622247, 'eval_precision': 0.0014036525845060501, 'eval_recall': 0.000368567234223855, 'eval_runtime': 148.3858, 'eval_samples_per_second': 38.231, 'eval_steps_per_second': 1.2, 'epoch': 1.0}
{'loss': 6.2225, 'grad_norm': 22.21900177001953, 'learning_rate': 1.684e-05, 'epoch': 2.0}
{'eval_loss': 5.939362049102783, 'eval_wups': 0.1325058713511131, 'eval_acc': 0.07949938304248193, 'eval_f1': 0.003921868823150953, 'eval_precision': 0.0030069070952878424, 'eval_recall': 0.008136913046446282, 'eval_runtime': 148.2577, 'eval_samples_per_second': 38.264, 'eval_steps_per_second': 1.201, 'epoch': 2.0}
{'loss': 5.5699, 'grad_norm': 14.575221061706543, 'learning_rate': 1.9614665708123655e-05, 'epoch': 3.0}
{'eval_loss': 5.149524211883545, 'eval_wups': 0.18995782003096673

In [None]:
torch.save(model.state_dict(), 'BEIT_BERT_improved_weights.pth')


In [None]:
# Reload the model (if needed)
model = ImprovedMultimodalVQAModel(
    num_labels=len(answer_space),
    intermediate_dim=512,
    text_model_name='bert-base-uncased',
    image_model_name='microsoft/beit-base-patch16-224-pt22k-ft22k'
)
model.load_state_dict(torch.load('BEIT_BERT_improved_weights.pth', map_location=device))
model.to(device)
model.eval()

# Sample 10 random indices from the test dataset
random_sample_indices = random.sample(range(len(dataset["test"])), k=10)
sample_batch = [dataset["test"][i] for i in random_sample_indices]
sample = collator(sample_batch)

input_ids = sample["input_ids"].to(device)
token_type_ids = sample["token_type_ids"].to(device)
attention_mask = sample["attention_mask"].to(device)
pixel_values = sample["pixel_values"].to(device)
labels_tensor = sample["labels"].to(device)

with torch.no_grad():
    outputs = model(input_ids, pixel_values, attention_mask, token_type_ids, labels_tensor)
    logits = outputs["logits"]
    predictions = logits.argmax(dim=-1).cpu().numpy()

print("Predictions:")
print([answer_space[pred] for pred in predictions])
print("Ground Truth:")
print([answer_space[label] for label in labels_tensor.cpu().numpy()])

# Compute and display Wu-Palmer similarity for this batch
print("Batch Wu-Palmer Similarity:", batch_wup_measure(labels_tensor.cpu().numpy(), predictions))


In [None]:
# Extract log history
log_history = trainer.state.log_history

# Separate training and evaluation logs
train_logs = [log for log in log_history if 'loss' in log and 'eval_loss' not in log]
eval_logs = [log for log in log_history if 'eval_loss' in log]

epochs = [log.get('epoch', idx) for idx, log in enumerate(eval_logs)]
train_loss = []
for epoch in epochs:
    epoch_losses = [log['loss'] for log in train_logs if abs(log.get('epoch', 0) - epoch) < 0.01]
    train_loss.append(epoch_losses[-1] if epoch_losses else None)

val_loss = [log['eval_loss'] for log in eval_logs]
val_acc = [log['eval_acc'] for log in eval_logs if 'eval_acc' in log]

plt.figure(figsize=(15,6))
plt.subplot(1,2,1)
plt.plot(epochs, train_loss, 'b-', label='Training Loss')
plt.plot(epochs, val_loss, 'r-', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training vs. Validation Loss')
plt.legend()

plt.subplot(1,2,2)
# If eval_acc was not logged, you may adjust accordingly
plt.plot(epochs, val_acc, 'orange', label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
def count_trainable_parameters(model):
    num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Number of trainable parameters: {num_params:,}")

count_trainable_parameters(model)
