In [None]:
!pip install -q --no-deps bert-score
!pip install --quiet bitsandbytes
!pip install evaluate
!pip install nltk

**Imports and Initial Setup**

In [None]:
import os
import pandas as pd
from PIL import Image
from tqdm.auto import tqdm
import torch
from datasets import Dataset, DatasetDict
from transformers import (
    BlipProcessor,
    BlipForQuestionAnswering,
    default_data_collator,
    TrainingArguments,
    Trainer,
    BertTokenizer,
    BertModel
)
from peft import (
    prepare_model_for_kbit_training,
    LoraConfig,
    get_peft_model,
    PeftModel,
)
import wandb
import types
import re
from sklearn.metrics import accuracy_score, f1_score
from bert_score import score as bert_score
import nltk
from nltk.corpus import wordnet as wn
from torch.utils.data import IterableDataset

# Download required NLTK resources
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('averaged_perceptron_tagger_eng')

# Set device to GPU if available, otherwise CPU
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

**Patching BLIP Models**
* **Problem**: The BLIP model's forward methods assume inputs like input_ids (token IDs for text) or attention_mask have a batch dimension. For example, they expect input_ids to be shaped like [batch_size, sequence_length]. If you pass a single sample with shape [sequence_length], the model will fail because it can't process the unexpected shape.
* **Solution**: The patch checks if the input (input_ids, attention_mask, or labels) is missing the batch dimension (i.e., has only one dimension). If so, it adds a batch dimension by using unsqueeze(0), transforming the shape from [sequence_length] to [1, sequence_length]. Then, it calls the original forward method with the corrected inputs

In [None]:
def patch_blip_models():
    # Import BLIP text model classes for modification
    from transformers.models.blip.modeling_blip_text import BlipTextModel, BlipTextLMHeadModel
    
    # Store original forward method of BlipTextModel for patching
    original_text_forward = BlipTextModel.forward
    def patched_text_forward(self, input_ids=None, attention_mask=None, position_ids=None, 
                             head_mask=None, inputs_embeds=None, encoder_embeds=None,
                             encoder_hidden_states=None, encoder_attention_mask=None, 
                             past_key_values=None, use_cache=None, output_attentions=None,
                             output_hidden_states=None, return_dict=None, is_decoder=False):
        # Check if input_ids lacks batch dimension (i.e., is 1D)
        if input_ids is not None and len(input_ids.shape) == 1:
            # Add batch dimension to input_ids (e.g., [seq_len] -> [1, seq_len])
            input_ids = input_ids.unsqueeze(0)
            # Add batch dimension to attention_mask if provided
            if attention_mask is not None:
                attention_mask = attention_mask.unsqueeze(0)
        # Call original forward method with corrected inputs
        return original_text_forward(self, input_ids=input_ids, attention_mask=attention_mask,
                                      position_ids=position_ids, head_mask=head_mask,
                                      inputs_embeds=inputs_embeds, encoder_embeds=encoder_embeds,
                                      encoder_hidden_states=encoder_hidden_states,
                                      encoder_attention_mask=encoder_attention_mask,
                                      past_key_values=past_key_values, use_cache=use_cache,
                                      output_attentions=output_attentions,
                                      output_hidden_states=output_hidden_states,
                                      return_dict=return_dict, is_decoder=is_decoder)
    
    # Store original forward method of BlipTextLMHeadModel for patching
    original_lm_forward = BlipTextLMHeadModel.forward
    def patched_lm_forward(self, input_ids=None, attention_mask=None, position_ids=None, 
                           head_mask=None, inputs_embeds=None, encoder_hidden_states=None, 
                           encoder_attention_mask=None, labels=None, past_key_values=None,
                           use_cache=None, output_attentions=None, output_hidden_states=None,
                           return_dict=None, return_logits=False, is_decoder=True, reduction="mean"):
        # Ensure input_ids has batch dimension for text encoding
        if input_ids is not None and len(input_ids.shape) == 1:
            input_ids = input_ids.unsqueeze(0)
            # Add batch dimension to attention_mask if provided
            if attention_mask is not None:
                attention_mask = attention_mask.unsqueeze(0)
        # Ensure labels has batch dimension for training
        if labels is not None and len(labels.shape) == 1:
            labels = labels.unsqueeze(0)
        # Call original forward method with corrected inputs and labels
        return original_lm_forward(self, input_ids=input_ids, attention_mask=attention_mask,
                                    position_ids=position_ids, head_mask=head_mask,
                                    inputs_embeds=inputs_embeds, encoder_hidden_states=encoder_hidden_states,
                                    encoder_attention_mask=encoder_attention_mask, labels=labels,
                                    past_key_values=past_key_values, use_cache=use_cache,
                                    output_attentions=output_attentions, output_hidden_states=output_hidden_states,
                                    return_dict=return_dict, return_logits=return_logits,
                                    is_decoder=is_decoder, reduction=reduction)
    
    # Replace original forward methods with patched versions
    BlipTextModel.forward = patched_text_forward
    BlipTextLMHeadModel.forward = patched_lm_forward

# Apply the patches to BLIP models for compatibility with single-sample inputs
patch_blip_models()

**Custom Forward for LoRA Integration**
* Customizes the model's forward method for LoRA compatibility

In [None]:
def create_custom_forward(model):
    original_forward = model.base_model.forward
    def custom_forward(self, input_ids=None, attention_mask=None, pixel_values=None,
                       decoder_input_ids=None, decoder_attention_mask=None, output_attentions=None,
                       output_hidden_states=None, labels=None, return_dict=None, 
                       interpolate_pos_encoding=None, **kwargs):
        # Add batch dimension if needed
        if input_ids is not None and len(input_ids.shape) == 1:
            input_ids = input_ids.unsqueeze(0)
            if attention_mask is not None:
                attention_mask = attention_mask.unsqueeze(0)
        # Filter allowed inputs
        allowed_inputs = {
            "input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": pixel_values,
            "decoder_input_ids": decoder_input_ids, "decoder_attention_mask": decoder_attention_mask,
            "output_attentions": output_attentions, "output_hidden_states": output_hidden_states,
            "labels": labels, "return_dict": return_dict, "interpolate_pos_encoding": interpolate_pos_encoding
        }
        filtered_inputs = {k: v for k, v in allowed_inputs.items() if v is not None}
        return original_forward(**filtered_inputs)
    # Assign custom forward to model
    model.base_model.forward = types.MethodType(custom_forward, model.base_model)
    return model

**Model and Processor Setup**

In [None]:
MODEL_NAME = "Salesforce/blip-vqa-base"

# Load BLIP processor and model
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
model = BlipForQuestionAnswering.from_pretrained(MODEL_NAME, device_map="auto")
model = prepare_model_for_kbit_training(model)

# Configure LoRA
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=[
        "qkv", "projection", "self.query", "self.key", "self.value"
    ],
    lora_dropout=0.1,
    bias="none",
    task_type="QUESTION_ANS"
)
model = get_peft_model(model, lora_config)
model = create_custom_forward(model)

**Training Parameters and W&B Initialization**

In [None]:
# Define training parameters
DATA_CSV = "/kaggle/input/vrproject2dataset/vqa_dataset.csv"
IMAGE_DIR = "/kaggle/input/vrproject2/abo-images-small/images/small"
BATCH_SIZE = 4  # Reduced for memory efficiency
NUM_EPOCHS = 5
LEARNING_RATE = 5e-5

# Initialize Weights & Biases for logging
wandb.init(project="blip-vqa-lora", name="blip-vqa-run", config={
    "model_name": MODEL_NAME, "batch_size": BATCH_SIZE, "num_epochs": NUM_EPOCHS,
    "learning_rate": LEARNING_RATE, "lora_r": 16, "lora_alpha": 32, "lora_dropout": 0.1
}, mode="offline")

**Dataset Loading and Splitting**

In [None]:
train_split_ratio = 0.9
# Load and split dataset
full_df = pd.read_csv(DATA_CSV)
train_split = int(train_split_ratio * len(full_df))
df = full_df[:train_split].reset_index(drop=True)  # 90% for training/validation
train_df = df.sample(frac=0.9, random_state=42).reset_index(drop=True)  # 90% of df for training
val_df = df.drop(train_df.index).reset_index(drop=True)  # Remaining 10% for validation

print(train_split)
SUBSET_SIZE = train_split

# Calculate training steps
gradient_accumulation_steps = 8 #Number of batches to accumulate gradients over before updating the model (used to simulate a larger batch size).
effective_batch_size = BATCH_SIZE * gradient_accumulation_steps
total_train_samples = int(SUBSET_SIZE * train_split_ratio)
max_steps = (total_train_samples // effective_batch_size) * NUM_EPOCHS


**VQA Dataset Generator**
* **Purpose:** The class processes each sample on-the-fly, converting images and text into the format required by the model (e.g., tokenized text and normalized images)

In [None]:
class VQADatasetGenerator(IterableDataset):
    def __init__(self, df, image_dir, processor):
        self.df = df  # Pandas DataFrame with VQA data (columns: path, question, answer)
        self.image_dir = image_dir  # Directory containing images
        self.processor = processor  # BlipProcessor for text and image preprocessing

    def __iter__(self):
        for _, row in self.df.iterrows():  # Iterate over DataFrame rows
            # Load image
            img_path = os.path.join(self.image_dir, row["path"])
            image = Image.open(img_path).convert("RGB")  # Load and convert to RGB
            
            # Prepare question and answer
            question = f"Question: {row['question']} Answer:"  # Format question
            answer = str(row['answer']).strip().lower()  # Clean answer
            answer = re.sub(r'[^\w\s]', '', answer)  # Remove punctuation

            # Process inputs
            enc = self.processor(
                images=image,
                text=question,
                return_tensors="pt",  # Return PyTorch tensors
                padding="max_length",  # Pad to max length
                truncation=True,  # Truncate if too long
                max_length=32  # Max length for question
            )
            
            # Process answer for decoder
            decoder_tokens = self.processor.tokenizer(
                answer,
                padding="max_length",
                truncation=True,
                max_length=10,  # Max length for answer
                return_tensors="pt"
            )
            decoder_input_ids = decoder_tokens.input_ids.squeeze(0)  # Remove batch dimension

            # Yield processed example
            yield {
                "input_ids": enc.input_ids.squeeze(0),  # Token IDs for question
                "attention_mask": enc.attention_mask.squeeze(0),  # Attention mask for question
                "pixel_values": enc.pixel_values.squeeze(0),  # Processed image
                "decoder_input_ids": decoder_input_ids,  # Token IDs for answer
                "labels": decoder_input_ids  # Same as decoder_input_ids for training
            }

print(max_steps)  # Print total training steps calculated in Cell 6

**Training Arguments**

In [None]:
# Configure training hyperparameters and settings for the Trainer
training_args = TrainingArguments(
    output_dir="./blip-vqa-output",
    per_device_train_batch_size=BATCH_SIZE, # Number of samples per batch for training (per GPU)
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=8,  # Accumulate gradients over 8 batches before updating parameters
    num_train_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE, #Learning rate for optimizer
    fp16=False,  # Mixed precision disabled
    logging_steps=100, # Log training metrics every 100 steps
    eval_steps=200, # Evaluate model every 200 steps
    save_steps=1500,  #Save model checkpoint every 1500 steps
    save_total_limit=10,
    remove_unused_columns=False,
    label_names=["labels"],
    report_to="wandb", # Log metrics to Weights & Biases
    run_name="blip-vqa-run", # Name of the training run for W&B tracking
    lr_scheduler_type="linear", # Use a linear learning rate decay schedule
    warmup_steps=100,
    max_steps=max_steps # Total number of training steps (overrides epochs if set)
)

**Compute Metrics**
* Evaluates model performance during training

In [None]:
def compute_metrics(eval_pred):
    # Decode predictions and labels
    generated_ids, labels = eval_pred
    pred_texts = processor.batch_decode(generated_ids, skip_special_tokens=True)
    label_texts = processor.batch_decode(labels, skip_special_tokens=True)
    pred_texts = [re.sub(r'[^\w\s]', '', p.strip().lower()) for p in pred_texts]
    label_texts = [re.sub(r'[^\w\s]', '', l.strip().lower()) for l in label_texts]
    # Calculate metrics
    acc = sum(1 for p, l in zip(pred_texts, label_texts) if p == l) / len(pred_texts)
    f1 = f1_score(label_texts, pred_texts, average="macro", zero_division=0)
    wandb.log({"eval_accuracy": acc, "eval_f1": f1})
    return {"accuracy": acc, "f1": f1}

**Trainer Setup and Training**

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=VQADatasetGenerator(train_df, IMAGE_DIR, processor), #converts train_df to format expected by the model
    eval_dataset=VQADatasetGenerator(val_df, IMAGE_DIR, processor),
    data_collator=default_data_collator, # combines multiple points from train_dataset to form batches 
    compute_metrics=compute_metrics
)

trainer.train()

# Save trained model and processor
model.save_pretrained("blip-vqa-adapters")
processor.save_pretrained("blip-vqa-adapters")
print("Training complete!")
wandb.finish()

**Inference Setup**

In [None]:
SUBSET = 5000
MODEL_DIR = "blip-vqa-adapters"
processor = BlipProcessor.from_pretrained('Salesforce/blip-vqa-base')
base_model = BlipForQuestionAnswering.from_pretrained(MODEL_NAME, device_map="auto")
model = PeftModel.from_pretrained(base_model, MODEL_DIR).to(DEVICE)
model.eval()

# # Setup BERTScore model
# bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# bert_model = BertModel.from_pretrained('bert-base-uncased').to(DEVICE)
# bert_model.eval()

**Metric Calculation Functions**

In [None]:
# Map NLTK POS tags to WordNet POS categories for WUP similarity
def get_wordnet_pos(word):
    # Get POS tag for the word using NLTK
    tag = nltk.pos_tag([word])[0][1][0].upper()
    # Define mapping from NLTK tags to WordNet POS categories
    tag_dict = {"J": wn.ADJ, "N": wn.NOUN, "V": wn.VERB, "R": wn.ADV}
    # Return WordNet POS or default to NOUN if tag is unmapped
    return tag_dict.get(tag, wn.NOUN)

# Calculate Wu-Palmer (WUP) similarity for lexical similarity between predictions and references
def calculate_wup_score(preds, refs):
    # Define helper function to compute WUP similarity for a single pred-ref pair
    def wup_sim(pred, ref):
        # Clean and tokenize prediction and reference text
        pred_tokens = re.sub(r'[^\w\s]', '', pred.lower()).split()
        ref_tokens = re.sub(r'[^\w\s]', '', ref.lower()).split()
        # Return 0 if either token list is empty
        if not pred_tokens or not ref_tokens:
            return 0.0
        # Initialize list to store maximum similarities for prediction tokens
        max_similarities = []
        # Iterate over each prediction token
        for p_token in pred_tokens:
            token_max_sim = 0.0
            # Get synsets for prediction token with POS or fallback to all synsets
            p_synsets = wn.synsets(p_token, pos=get_wordnet_pos(p_token)) or wn.synsets(p_token)
            if not p_synsets:
                continue
            # Iterate over each reference token
            for r_token in ref_tokens:
                # Get synsets for reference token with POS or fallback
                r_synsets = wn.synsets(r_token, pos=get_wordnet_pos(r_token)) or wn.synsets(r_token)
                if not r_synsets:
                    continue
                # Compute WUP similarity for all synset pairs
                token_sims = [wn.wup_similarity(p_syn, r_syn) or 0.0 for p_syn in p_synsets for r_syn in r_synsets]
                if token_sims:
                    # Store maximum similarity for this token pair
                    token_max_sim = max(token_sims)
            if token_max_sim > 0:
                # Store non-zero maximum similarity
                max_similarities.append(token_max_sim)
        # Return average similarity or 0 if no valid similarities
        return sum(max_similarities) / len(max_similarities) if max_similarities else 0.0
    # Compute WUP similarity for all pred-ref pairs
    wup_scores = [wup_sim(p, r) for p, r in zip(preds, refs)]
    # Return average WUP score across all pairs
    return sum(wup_scores) / len(wup_scores) if wup_scores else 0.0

**Test Data Preparation and Inference**

In [None]:
# Prepare test dataset by sampling from the remaining data after train/validation split
test_df = full_df[train_split:].sample(n=SUBSET, random_state=42).reset_index(drop=True)

# Initialize lists to store predictions and ground-truth references
preds, refs = [], []
# Iterate over test data in batches for inference
for i in tqdm(range(0, len(test_df), BATCH_SIZE), desc="Inference"):
    batch = test_df.iloc[i:i + BATCH_SIZE]
    
    # Load and convert images to RGB format for the batch
    images = [Image.open(os.path.join(IMAGE_DIR, p)).convert("RGB") for p in batch["path"]]
    
    # Format questions with standard prefix/suffix for model input
    questions = [f"Question: {q} Answer:" for q in batch["question"].tolist()]
    # Process images and questions using BlipProcessor for model-compatible tensors
    inputs = processor(images=images, text=questions, return_tensors="pt", padding=True).to(DEVICE)
    
    with torch.no_grad():
        # Generate answers using beam search for better quality
        generated_ids = model.generate(**inputs, max_new_tokens=10, num_beams=5, early_stopping=True)
    # Decode generated token IDs
    raw = processor.batch_decode(generated_ids, skip_special_tokens=True)
    
    # Clean predictions and ground-truth answers by removing punctuation, converting to lowercase, and trimming
    cleaned_p = [re.sub(r'[^\w\s]', '', r.strip().lower()) for r in raw]
    cleaned_r = [re.sub(r'[^\w\s]', '', str(r).strip().lower()) for r in batch["answer"]]
    
    preds.extend(cleaned_p)
    refs.extend(cleaned_r)

**Calculate Metrics**

In [None]:
acc = accuracy_score(refs, preds)
print(f"String-match Accuracy: {acc*100:.2f}%")

P, R, F1 = bert_score(preds, refs, lang='en', rescale_with_baseline=True, device=DEVICE)
print(f"BERTScore Precision: {P.mean().item()*100:.2f}%")
print(f"BERTScore Recall   : {R.mean().item()*100:.2f}%")
print(f"BERTScore F1       : {F1.mean().item()*100:.2f}%")

f1 = f1_score(refs, preds, average="macro")
print(f" • Macro F1  : {f1 * 100:.2f}%")

wup_score = calculate_wup_score(preds, refs)
print(f"WUP Score: {wup_score*100:.2f}%")