# Installing Dependencies

In [None]:
!pip install -U transformers accelerate peft
!pip install bitsandbytes
!pip install -U evaluate
!pip install -U rouge_score

# Imports


In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
import torch
from PIL import Image
import pandas as pd
from tqdm import tqdm
import os
from peft import LoraConfig, get_peft_model,PeftModel
from transformers import TrainingArguments, Trainer
import os
from torch.utils.data import Dataset
import evaluate
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# PATHS

In [None]:
# device = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
image_folder = "/kaggle/input/vr-dataset-final-20k/images/unique_images"  # <-- Change this
csv_path = "/kaggle/input/vr-dataset-final-20k/annotations.csv"  # <-- Change this

## Loading the Model

In [None]:
processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl")

model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-flan-t5-xl",
    device_map={"": 0},
    load_in_8bit=True,
    torch_dtype=torch.float16
)
config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    target_modules="all-linear"

)
model = get_peft_model(model, config)
model.print_trainable_parameters()

## Loading the Dataset

In [None]:
class VQADataset(Dataset):
    def __init__(self, csv_path, image_folder, processor, max_samples=None):
        self.data = pd.read_csv(csv_path)

        if max_samples is not None:
            self.data = self.data[:max_samples]  # Take only the first max_samples rows

        self.image_folder = image_folder
        self.processor = processor

        print(f"[INFO] Loaded {len(self.data)} samples from '{csv_path}'")
        print(f"[INFO] Image folder: {image_folder}")

    def __len__(self):
        return len(self.data)
        
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image_path = os.path.join(self.image_folder, row['image_name'])
        image = Image.open(image_path).convert("RGB")
    
        question = row['question']
        answer = row['answer']
    
        # Prepare inputs (question + image)
        inputs = self.processor(images=image, text=question, return_tensors="pt",
                                padding="max_length", truncation=True, max_length=128)
        inputs = {k: v.squeeze(0).to(device) for k, v in inputs.items()}
    
        # Get the first word (before space), strip to be safe
        first_word = answer.strip().split()[0]
    
        # Tokenize just the first word (may result in multiple tokens)
        tokenized = self.processor.tokenizer(first_word, return_tensors="pt",
                                             padding="max_length", truncation=True, max_length=128)
    
        input_ids = tokenized["input_ids"].squeeze(0)
        attention_mask = tokenized["attention_mask"].squeeze(0)
    
        # Create labels: only keep first-word tokens (those with attention mask = 1), rest are -100
        labels = torch.where(attention_mask == 1, input_ids, torch.full_like(input_ids, -100)).to(device)

    
        inputs['labels'] = labels
    
        return inputs

# Training

In [None]:
# Load dataset
full_dataset = VQADataset(csv_path, image_folder, processor,5)

# # Split the dataset
train_size = int(0.8 * len(full_dataset))
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, len(full_dataset) - train_size])

# Training arguments
training_args = TrainingArguments(
    output_dir="./blip2-vqa-lora",
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,
    num_train_epochs=40,
    learning_rate=2e-4,
    save_total_limit=2,
    weight_decay=0.01,
    fp16=True,
    report_to="none",
    logging_dir="./logs",                       # optional
    logging_strategy="epoch"
)

In [None]:
sample = train_dataset[0]
for k, v in sample.items():
    print(f"{k}: shape = {v.shape}, dtype = {v.dtype}")
    if k in ['input_ids']:
        print(f"{k} tokens: {processor.tokenizer.decode(v.tolist(), skip_special_tokens=True)}")
    elif k == 'labels':
        valid_token_ids = v[v != -100].tolist()  # Remove -100 before decoding
        print(f"{k} tokens: {processor.tokenizer.decode(valid_token_ids, skip_special_tokens=True)}")


In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=processor.tokenizer
)

# Train
trainer.train()

# Inference

In [None]:
bleu = evaluate.load("bleu")
rouge = evaluate.load("rouge")
accuracy_metric = evaluate.load("accuracy")


checkpoint_path = "/kaggle/working/blip2-vqa-lora/checkpoint-500"

processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl")
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-flan-t5-xl",
    device_map={"": 0},
    load_in_8bit=True,
    torch_dtype=torch.float16
)

model = PeftModel.from_pretrained(base_model, checkpoint_path).eval()



model.eval()

In [None]:
for batch in tqdm(test_loader):
    pixel_values = batch['pixel_values'].to("cuda:1")  # shape: [1, 3, H, W]
    input_ids = batch['input_ids'].to("cuda:1")      # question tokens
    labels = batch['labels'].to("cuda:1")              # ground truth answer tokens

    with torch.no_grad():
        outputs = model.generate(pixel_values=pixel_values, input_ids=input_ids, max_new_tokens=30)
        pred = processor.tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
        ref = processor.tokenizer.decode(labels[0], skip_special_tokens=True)

    predictions.append(pred.strip())
    references.append(ref.strip())

# BLEU expects list of list of references and list of predictions
bleu_score = bleu.compute(predictions=predictions, references=[[ref] for ref in references])
rouge_score = rouge.compute(predictions=predictions, references=references)

# Accuracy (exact match)
exact_matches = [int(p.lower().strip() == r.lower().strip()) for p, r in zip(predictions, references)]
accuracy = sum(exact_matches) / len(exact_matches)

print("BLEU:", bleu_score)
print("ROUGE:", rouge_score)
print("Exact Match Accuracy:", accuracy)

In [None]:
def predict(image_path, question):
    # Open image
    image = Image.open(image_path).convert("RGB")

    # Prepare inputs
    inputs = processor(text=question, images=image, return_tensors="pt").to(model.device)

    # Generate the answer using the model
    with torch.no_grad():
        output = model.generate(
            **inputs,
            max_new_tokens=5,  # You can change max_new_tokens as needed
            do_sample=False,  # You can set this to True if you want randomness
            num_beams=1
        )

    # Decode the output to get the predicted answer
    decoded = processor.tokenizer.decode(output[0], skip_special_tokens=True)

    return decoded.strip(), image

df = pd.read_csv(csv_path)

# Example to display the result
row = df.iloc[20]  # Adjust this to select a row from your dataframe
image_name = row["image_name"]  # Assuming 'image_name' is the column in your CSV
question = row["question"]

# Construct the full image path
image_path = os.path.join(image_folder, image_name)  # This joins the directory and the image name

# Predict the answer using the model
predicted_answer, image = predict(image_path, question)

# Display the image along with the question and predicted answer
plt.figure(figsize=(5, 5))
plt.imshow(image)
plt.axis('off')
plt.title(f"Q: {question}\nPredicted A: {predicted_answer}\nGT: {row['answer']}")
plt.show()
