In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pandas as pd
import os
import json
from glob import glob

# Load listing metadata
json_dir = '/kaggle/input/vrmini2/abo-listings/listings/metadata/'
json_files = glob(os.path.join(json_dir, 'listings_*.json'))

dataframes = []
for file in json_files:
    with open(file, 'r') as f:
        lines = f.readlines()
        records = [json.loads(line.strip()) for line in lines if line.strip()]
        df = pd.json_normalize(records)
        dataframes.append(df)

listing_df = pd.concat(dataframes, ignore_index=True)

# Load image metadata
image_df = pd.read_csv('/kaggle/input/vrmini2/abo-images-small/images/metadata/images.csv')

# Rename 'main_image_id' to 'image_id' in listing_df for matching
listing_df_renamed = listing_df.rename(columns={'main_image_id': 'image_id'})

# Merge listing and image metadata on image_id
combined_metadata2 = pd.merge(listing_df_renamed, image_df, on='image_id', how='inner')

# Export the combined DataFrame to a CSV file
combined_metadata2.to_csv('combined_metadata2.csv', index=False)

print("Combined metadata saved to 'combined_metadata2.csv'.")


In [None]:
import pandas as pd

# Load your file
df = pd.read_csv('combined_metadata2.csv')

# Add full_path by prepending the image base directory
base_path = '/kaggle/input/vrmini2/abo-images-small/images/small/'
df['full_path'] = base_path + df['path'].astype(str)

# Save the updated DataFrame
df.to_csv('/kaggle/working/combined_metadata2_with_full_path.csv', index=False)

print("✅ full_path added and saved as combined_metadata2_with_full_path.csv")


In [None]:
import json
import os
import base64
import pandas as pd
from tqdm import tqdm
import time
from google.generativeai import configure, GenerativeModel
from google.api_core.exceptions import ResourceExhausted, ServiceUnavailable

# Configuration variables - modify these as needed
INPUT_FILE = 'combined_metadata2_with_full_path.csv'
OUTPUT_FILE = 'vqa_training_data_9.csv'  # Changed to CSV output
API_KEY = 'AIzaSyDEgRHY6uWUC76xr99V2pzHbwDexyoEbKM'  # Replace with your actual Gemini API key
IMAGE_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"
RETRY_ATTEMPTS = 3
DELAY = 4.5  # Warning: This risks hitting the 15 RPM and 1,500 RPD limits quickly; consider 60.0 for safety

# Range variables
START_INDEX = 53078
END_INDEX = 53578
APPEND_RESULTS = True

def encode_image(image_path):
    """Encode an image file to base64 string."""
    try:
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    except Exception as e:
        print(f"Error encoding image {image_path}: {e}")
        return None

def generate_vqa_data(model, metadata, image_path, retry_attempts=RETRY_ATTEMPTS, delay=DELAY):
    """Generate VQA data for a single product using Gemini API."""
    prompt = """You are an AI assistant helping to generate training data for a Visual Question Answering (VQA) model.
You are provided with:
- A product image
- Detailed product metadata (brand, style, color, features, description, etc.)

Your task is to generate diverse and meaningful questions that require both visual understanding and contextual reasoning from the metadata. The goal is to help train a robust VQA model that generalizes well to unseen product types and questions.

Use both the image and the metadata together to craft the questions. Make sure each question is visually answerable using the image while being enhanced by the metadata. Do not copy metadata text directly into answers — paraphrase or infer instead. Encourage variety in question types and phrasing. Avoid overfitting by ensuring questions are not repeated across images or overly templated.

Guidelines:
- Generate exactly 3 diverse questions per image.
- Questions must be answerable based on the image, optionally supported by metadata.
- Keep answers short and specific (1 word max).
- Use a mix of question types as appropriate for the image:
  - Descriptive
  - Counting
  - Comparative
  - Color recognition
  - Function-based
  - Reasoning-based

Output Format (strict JSON format):
{
  "image_id": "IMAGE_ID_HERE",
  "questions": [
    {
      "question": "QUESTION TEXT HERE",
      "answer": "ANSWER HERE"
    },
    {
      "question": "QUESTION TEXT HERE",
      "answer": "ANSWER HERE"
    },
    {
      "question": "QUESTION TEXT HERE",
      "answer": "ANSWER HERE"
    }
  ]
}

Product Metadata:
"""
    prompt += json.dumps(metadata, indent=2)
    
    # Encode image to base64
    image_data = encode_image(image_path)
    if not image_data:
        return None
    
    # Prepare the image for the API
    image_parts = [
        {
            "mime_type": "image/jpeg",
            "data": image_data
        }
    ]
    
    for attempt in range(retry_attempts):
        try:
            response = model.generate_content(
                contents=[
                    {"role": "user", "parts": [{"text": prompt}, {"inline_data": image_parts[0]}]}
                ],
                generation_config={
                    "temperature": 0.4,
                    "max_output_tokens": 1024,
                }
            )
            
            # Extract JSON from the response
            response_text = response.text
            json_start = response_text.find('{')
            json_end = response_text.rfind('}') + 1
            
            if json_start >= 0 and json_end > json_start:
                json_str = response_text[json_start:json_end]
                try:
                    return json.loads(json_str)
                except json.JSONDecodeError:
                    print(f"Failed to parse JSON response: {json_str}")
            else:
                print(f"No valid JSON found in response: {response_text}")
            
            if attempt < retry_attempts - 1:
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)
                
        except (ResourceExhausted, ServiceUnavailable) as e:
            print(f"API limit exceeded or service unavailable: {e}")
            if attempt < retry_attempts - 1:
                sleep_time = delay * (2 ** attempt)
                print(f"Retrying in {sleep_time} seconds...")
                time.sleep(sleep_time)
            else:
                print("Maximum retry attempts reached.")
                return None
        except Exception as e:
            print(f"Error calling Gemini API: {e}")
            if attempt < retry_attempts - 1:
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                return None
    
    return None

def append_to_csv_file(data, filename):
    """Append VQA results to an existing CSV file or create a new one, using full_path instead of image_id."""
    try:
        # Prepare data as a list of rows for the CSV
        csv_rows = []
        for item in data:
            full_path = item['full_path']  # Use full_path instead of image_id
            for q in item['questions']:
                csv_rows.append({
                    'full_path': full_path,  # Changed column name to full_path
                    'question': q['question'],
                    'answer': q['answer']
                })
        
        # Convert to DataFrame
        df_new = pd.DataFrame(csv_rows)
        
        # If file exists and APPEND_RESULTS is True, append to it
        if APPEND_RESULTS and os.path.exists(filename):
            df_existing = pd.read_csv(filename)
            # If the existing file has an 'image_id' column, rename it to 'full_path' for consistency
            if 'image_id' in df_existing.columns:
                df_existing = df_existing.rename(columns={'image_id': 'full_path'})
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
            df_combined.to_csv(filename, index=False)
        else:
            df_new.to_csv(filename, index=False)
        
        return True
    except Exception as e:
        print(f"Error writing to CSV file: {e}")
        return False

def main():
    # Configure Gemini API
    configure(api_key=API_KEY)
    model = GenerativeModel('gemini-1.5-flash')
    
    # Read the input CSV file
    try:
        df = pd.read_csv(INPUT_FILE)
        
        # Handle the range of items to process
        start_idx = START_INDEX
        end_idx = END_INDEX if END_INDEX is not None else len(df)
        
        # Validate range
        if start_idx < 0:
            start_idx = 0
        if end_idx > len(df):
            end_idx = len(df)
        if start_idx >= end_idx:
            print(f"Invalid range: start ({start_idx}) must be less than end ({end_idx})")
            return
        
        # Select the range of items to process
        data_to_process = df.iloc[start_idx:end_idx]
        
        if data_to_process.empty:
            print("No valid data found in the specified range.")
            return
        
        print(f"Processing items from index {start_idx} to {end_idx-1} ({len(data_to_process)} items)")
    except Exception as e:
        print(f"Error reading input file: {e}")
        return
    
    results = []
    
    # Process each row in the DataFrame
    for idx, row in enumerate(tqdm(data_to_process.iterrows(), total=len(data_to_process), desc="Processing items")):
        try:
            row_data = row[1]
            item_id = row_data['image_id']
            metadata = row_data.drop(['full_path', 'path']).to_dict()
            image_path = row_data['full_path']
            
            print(f"Processing item {start_idx + idx} (ID: {item_id})")
            
            # Generate VQA data
            vqa_data = generate_vqa_data(model, metadata, image_path, delay=DELAY)
            
            if vqa_data:
                vqa_data['image_id'] = item_id
                vqa_data['full_path'] = image_path  # Add full_path to vqa_data for CSV output
                results.append(vqa_data)
                
                time.sleep(DELAY)
        except Exception as e:
            print(f"Error processing item {item_id}: {e}")
    
    # Write results to output file
    try:
        if results:
            success = append_to_csv_file(results, OUTPUT_FILE)
            if success:
                print(f"Saved VQA data for {len(results)} items to {OUTPUT_FILE}")
        else:
            print("No VQA data generated to save.")
    except Exception as e:
        print(f"Error writing output file: {e}")

if __name__ == "__main__":
    main()

In [None]:
# blip2_base_raw.py

# Import required libraries
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Output CSV
output_path = Path("vqa_test_predictions_blip2.csv")

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (replace with your actual token)
hf_token = "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"  # Replace with your Hugging Face API token

# Load BLIP-2
processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b", token=hf_token)
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b", torch_dtype=torch.float16, token=hf_token
).to(device)

# Load dataset
train_df = pd.read_csv("/kaggle/input/vr1234/train_split.csv")
test_df = pd.read_csv("/kaggle/input/vr1234/test_split.csv")

# Evaluate on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        ground_truth = row['answer']
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs)
        answer = processor.decode(output[0], skip_special_tokens=True)
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

test_df['predicted_answer'] = predictions
test_df['correct'] = test_df['predicted_answer'].str.strip().str.lower() == test_df['answer'].str.strip().str.lower()

# Save predictions to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# Evaluation Metrics
tqdm.pandas()

# Load predictions
df = pd.read_csv("vqa_test_predictions_blip2.csv")

# Normalize text
def normalize_text(s):
    return str(s).strip().lower()

df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

# Token-level macro F1
def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()

# Accuracy
accuracy = (df['answer'] == df['predicted_answer']).mean()

# ROUGE Score
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

# BERTScore (Precision/Recall/F1)
P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

# BLEU Score
smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

# Print metrics
print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")

In [None]:
!pip install transformers torch pandas tqdm Pillow

In [None]:
# blip2_base_raw.py

# Install evaluation dependencies
import subprocess
import sys

def install_packages():
    packages = ['bert-score', 'rouge-score', 'nltk']
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install {package}: {e}")
            sys.exit(1)

install_packages()

# Import required libraries
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Output CSV
output_path = Path("vqa_test_predictions_blip2.csv")

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (replace with your actual token)
hf_token = "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"  # Replace this with your Hugging Face API token

# Load BLIP-2 with the token
processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b", token=hf_token)
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b", torch_dtype=torch.float16, token=hf_token
).to(device)

# Load dataset
train_df = pd.read_csv("/kaggle/input/vr1234/train_split.csv")
test_df = pd.read_csv("/kaggle/input/vr1234/test_split.csv")

# Evaluate on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        ground_truth = row['answer']
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs)
        answer = processor.decode(output[0], skip_special_tokens=True)
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

test_df['predicted_answer'] = predictions
test_df['correct'] = test_df['predicted_answer'].str.strip().str.lower() == test_df['answer'].str.strip().str.lower()

# Save predictions to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# Evaluation Metrics
tqdm.pandas()

# Load predictions
df = pd.read_csv("vqa_test_predictions_blip2.csv")

# Normalize text
def normalize_text(s):
    return str(s).strip().lower()

df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

# Token-level macro F1
def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()

# Accuracy
accuracy = (df['answer'] == df['predicted_answer']).mean()

# ROUGE Score
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

# BERTScore (Precision/Recall/F1)
P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

# BLEU Score
smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

# Print metrics
print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")

In [None]:
# blip2_base_raw.py

# Set environment variable to reduce memory fragmentation
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Install evaluation dependencies
import subprocess
import sys
import re
import torch

def install_packages():
    packages = ['bert-score', 'rouge-score', 'nltk']
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install {package}: {e}")
            sys.exit(1)

install_packages()

# Import required libraries
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Output CSV
output_path = Path("vqa_test_predictions_blip2.csv")

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (replace with your actual token)
hf_token = "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"  # Replace this with your Hugging Face API token

# Load BLIP-2 with the correct model ID
processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl", token=hf_token)
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
).to(device)

# Load dataset
train_df = pd.read_csv("/kaggle/input/vr1234/train_split.csv")
test_df = pd.read_csv("/kaggle/input/vr1234/test_split.csv")

# Post-process model output
def post_process_answer(answer, question):
    answer = answer.strip().lower()
    # For yes/no questions, extract key words
    if "yes or no" in question.lower() or "is " in question.lower():
        if "yes" in answer:
            return "yes"
        if "no" in answer:
            return "no"
    return answer

# Evaluate on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        ground_truth = row['answer']
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs)
        answer = processor.decode(output[0], skip_special_tokens=True)
        answer = post_process_answer(answer, question)
        torch.cuda.empty_cache()  # Free memory after each prediction
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

test_df['predicted_answer'] = predictions

# Improved normalization
def normalize_text(s):
    s = str(s).strip().lower()
    s = re.sub(r'[^\w\s]', '', s)  # Remove punctuation
    s = re.sub(r'\s+', ' ', s)     # Normalize internal whitespace
    return s

# Normalize answers
test_df['answer'] = test_df['answer'].astype(str).apply(normalize_text)
test_df['predicted_answer'] = test_df['predicted_answer'].astype(str).apply(normalize_text)

# Compute accuracy
test_df['correct'] = test_df['answer'] == test_df['predicted_answer']

# Debug mismatches
mismatches = test_df[~test_df['correct']].head(5)  # First 5 mismatches
for idx, row in mismatches.iterrows():
    print(f"Question: {row['question']}")
    print(f"Ground Truth: {row['answer']}")
    print(f"Predicted: {row['predicted_answer']}")
    print("---")

# Save predictions to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# Evaluation Metrics
tqdm.pandas()

# Load predictions
df = pd.read_csv("vqa_test_predictions_blip2.csv")

# Normalize again for evaluation
df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

# Token-level macro F1
def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()

# Accuracy
accuracy = (df['answer'] == df['predicted_answer']).mean()

# ROUGE Score
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

# BERTScore (Precision/Recall/F1)
P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

# BLEU Score
smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

# Print metrics
print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")

In [None]:
# blip2_base_raw.py

# Set environment variable to reduce memory fragmentation
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Install evaluation dependencies
import subprocess
import sys
import re
import torch

def install_packages():
    packages = ['bert-score', 'rouge-score', 'nltk']
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install {package}: {e}")
            sys.exit(1)

install_packages()

# Import required libraries
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Output CSV
output_path = Path("vqa_test_predictions_blip2.csv")

# Device setup with fallback to CPU
try:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
except Exception as e:
    print(f"Error setting device: {e}, defaulting to CPU")
    device = torch.device("cpu")

# Hugging Face token
hf_token = "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"

# Load BLIP-2 with the correct model ID
try:
    processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl", token=hf_token)
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)
except torch.cuda.OutOfMemoryError:
    print("CUDA out of memory, switching to CPU...")
    device = torch.device("cpu")
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)

# Load dataset with error handling
try:
    train_df = pd.read_csv("/kaggle/input/vr1234/train_split.csv")
    test_df = pd.read_csv("/kaggle/input/vr1234/test_split.csv")
except FileNotFoundError as e:
    print(f"Dataset not found: {e}")
    sys.exit(1)

# Post-process model output (only for yes/no questions, since prompt should ensure 1-word answers)
def post_process_answer(answer, question):
    answer = answer.strip().lower()
    # For yes/no questions, ensure correct extraction
    if "yes or no" in question.lower() or "is " in question.lower():
        if "yes" in answer:
            return "yes"
        if "no" in answer:
            return "no"
    return answer  # Return the answer as-is, expecting it to be 1 word due to prompt

# Evaluate on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        # Prepend prompt to encourage 1-word answer
        question = "Answer in one word: " + row['question']
        ground_truth = row['answer']
        image = Image.open(image_path).convert("RGB")
        # Ensure both image and question are processed
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs, max_new_tokens=5)  # Limit output length to reduce verbosity
        answer = processor.decode(output[0], skip_special_tokens=True)
        answer = post_process_answer(answer, question)
        torch.cuda.empty_cache()  # Free memory after each prediction
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

test_df['predicted_answer'] = predictions

# Improved normalization
def normalize_text(s):
    s = str(s).strip().lower()
    s = re.sub(r'[^\w\s]', '', s)  # Remove punctuation
    s = re.sub(r'\s+', ' ', s)     # Normalize internal whitespace
    return s

# Normalize answers
test_df['answer'] = test_df['answer'].astype(str).apply(normalize_text)
test_df['predicted_answer'] = test_df['predicted_answer'].astype(str).apply(normalize_text)

# Compute accuracy with relaxed matching
test_df['correct'] = test_df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
)

# Debug mismatches
mismatches = test_df[~test_df['correct']].head(5)  # First 5 mismatches
for idx, row in mismatches.iterrows():
    print(f"Question: {row['question']}")
    print(f"Ground Truth: {row['answer']}")
    print(f"Predicted: {row['predicted_answer']}")
    print("---")

# Save predictions to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# Evaluation Metrics
tqdm.pandas()

# Load predictions
df = pd.read_csv("vqa_test_predictions_blip2.csv")

# Normalize again for evaluation
df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

# Token-level macro F1
def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()

# Accuracy with relaxed matching
accuracy = df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
).mean()

# ROUGE Score
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

# BERTScore (Precision/Recall/F1)
P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

# BLEU Score
smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

# Print metrics
print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")

In [None]:
# Set environment variable to reduce memory fragmentation
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Install evaluation dependencies
import subprocess
import sys
import re
import torch
from transformers import Blip2Processor, Blip2ForConditionalGeneration, Trainer, TrainingArguments
from PIL import Image
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import numpy as np
from sklearn.model_selection import train_test_split

def install_packages():
    packages = ['bert-score', 'rouge-score', 'nltk']
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install {package}: {e}")
            sys.exit(1)

install_packages()

# Output CSV
output_path = Path("vqa_test_predictions_blip3_compatible.csv")

# Device setup with fallback to CPU
try:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
except Exception as e:
    print(f"Error setting device: {e}, defaulting to CPU")
    device = torch.device("cpu")

# Hugging Face token (replace with your valid token if needed)
hf_token = "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"

# Load BLIP-2 with the correct model ID
try:
    processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl", token=hf_token)
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)
except torch.cuda.OutOfMemoryError:
    print("CUDA out of memory, switching to CPU...")
    device = torch.device("cpu")
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)

# Load the train and test datasets from vr1234
try:
    train_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_train.csv")
    test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")
except FileNotFoundError as e:
    print(f"Dataset not found: {e}")
    print("Please ensure the CSV files are at /kaggle/input/vr4567/more_and_12049_train.csv and /kaggle/input/vr4567/more_and_12049_test.csv")
    sys.exit(1)

# Verify that images are accessible from vrmini2
print("Sample image paths from training dataset:")
print(train_df['full_path'].head(5))
print("Ensure these paths exist in /kaggle/input/vrmini2/abo-images-small/images/small/")

# Analyze dataset distribution for better defaults
print("Most common answers in training set:")
print(train_df['answer'].value_counts().head(10))

# Expanded vocabulary of common answers
common_answers = {
    "colors": ["white", "black", "blue", "red", "green", "yellow", "pink", "purple", "orange", "grey", "gray", "brown", "beige", "tan", "silver", "gold", "crimson", "navy", "violet"],
    "materials": ["plastic", "silicone", "metal", "wood", "leather", "suede", "cotton", "polyester", "fabric", "synthetic", "brass", "steel", "canvas", "cloth", "rubber"],
    "yes_no": ["yes", "no"],
    "numbers": ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"],
    "phone_models": ["iphone", "samsung", "vivo", "oppo", "motorola", "lenovo", "redmi", "note", "moto", "lg", "nokia"],
    "styles": ["modern", "floral", "geometric", "abstract", "minimalist", "rustic", "classic", "ankle", "block", "sling", "vintage"],
    "objects": ["heart", "skull", "flowers", "moon", "stars", "cat", "dog", "lion", "butterflies", "teddy", "tree"]
}

# Flatten the vocabulary for post-processing
valid_answers = set()
for category in common_answers.values():
    valid_answers.update(category)

# Synonym mapping
synonym_map = {
    "grey": "gray",
    "tan": "beige",
    "cloth": "fabric",
    "crimson": "red",
    "navy": "blue",
    "violet": "purple"
}

# Custom Dataset for fine-tuning
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df, processor):
        self.df = df
        self.processor = processor

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = row['full_path']
        question = row['question']
        # Adjust prompt based on question type during training
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        answer = row['answer']

        # Load and process image from vrmini2
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Failed to load image {image_path}: {e}")
            # Create a blank image as a fallback
            image = Image.new('RGB', (224, 224), color='gray')
        
        inputs = self.processor(images=image, text=full_prompt, return_tensors="pt", padding=True)

        # Process the answer (target)
        labels = self.processor.tokenizer(answer, return_tensors="pt", padding=True, truncation=True).input_ids

        # Remove batch dimension
        for key in inputs:
            inputs[key] = inputs[key].squeeze(0)
        labels = labels.squeeze(0)

        return {"pixel_values": inputs["pixel_values"], "input_ids": inputs["input_ids"], "attention_mask": inputs["attention_mask"], "labels": labels}

# Function to classify question type and adjust prompt
def get_question_type(question):
    question = question.lower()
    if "color" in question or "colour" in question:
        return "color", "Answer with a color (e.g., red, blue, green): "
    elif "material" in question:
        return "material", "Answer with a material (e.g., plastic, leather, metal): "
    elif "yes or no" in question or "is " in question or "are " in question:
        return "yes_no", "Answer with yes or no: "
    elif "how many" in question:
        return "number", "Answer with a number (e.g., one, two, three): "
    elif "phone" in question or "model" in question:
        return "phone_model", "Answer with a phone model (e.g., iPhone, Samsung): "
    elif "style" in question or "pattern" in question:
        return "style", "Answer with a style or pattern (e.g., floral, modern): "
    else:
        return "object", "Answer with an object (e.g., heart, skull): "

# Improved post-processing
def post_process_answer(answer, question, q_type):
    answer = answer.strip().lower()
    # If multi-word, take the first relevant word
    answer_words = answer.split()
    answer = answer_words[0] if answer_words else answer
    # Apply synonym mapping
    answer = synonym_map.get(answer, answer)
    
    # Handle based on question type
    if q_type == "yes_no":
        if "yes" in answer:
            return "yes"
        if "no" in answer:
            return "no"
        return "yes"  # Default for yes/no questions
    elif q_type == "color":
        if answer in common_answers["colors"]:
            return answer
        return "black"  # Default color (adjust based on dataset distribution)
    elif q_type == "material":
        if answer in common_answers["materials"]:
            return answer
        return "plastic"  # Default material
    elif q_type == "number":
        if answer in common_answers["numbers"]:
            return answer
        return "two"  # Default number
    elif q_type == "phone_model":
        if answer in common_answers["phone_models"]:
            return answer
        return "iphone"  # Default phone model
    elif q_type == "style":
        if answer in common_answers["styles"]:
            return answer
        return "modern"  # Default style
    elif q_type == "object":
        if answer in common_answers["objects"]:
            return answer
        return "heart"  # Default object
    
    # Fallback for unknown question types
    if answer in valid_answers:
        return answer
    return "yes"  # Final default

# Split train_df into training and validation sets
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
train_dataset = VQADataset(train_df, processor)
val_dataset = VQADataset(val_df, processor)

# Define training arguments for fine-tuning (compatible with older transformers versions)
# Calculate steps per epoch to evaluate after each epoch
steps_per_epoch = len(train_dataset) // 2  # Batch size is 2
training_args = TrainingArguments(
    output_dir="./blip2_finetuned",
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=50,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    save_steps=500,
    save_total_limit=1,
    fp16=True,
    eval_strategy="epoch",     # Evaluate after each epoch
    remove_unused_columns=False
)

# Initialize the Trainer for fine-tuning
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

# Fine-tune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Evaluate on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        ground_truth = row['answer']
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=full_prompt, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs, max_new_tokens=3, num_beams=3)
        answer = processor.decode(output[0], skip_special_tokens=True)
        answer = post_process_answer(answer, question, q_type)
        torch.cuda.empty_cache()
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = "yes"
    predictions.append(answer)

test_df['predicted_answer'] = predictions

# Add question type for analysis
test_df['question_type'] = test_df['question'].apply(lambda q: get_question_type(q)[0])

# Improved normalization
def normalize_text(s):
    s = str(s).strip().lower()
    s = re.sub(r'[^\w\s]', '', s)  # Remove punctuation
    s = re.sub(r'\s+', ' ', s)     # Normalize internal whitespace
    return s

# Normalize answers
test_df['answer'] = test_df['answer'].astype(str).apply(normalize_text)
test_df['predicted_answer'] = test_df['predicted_answer'].astype(str).apply(normalize_text)

# Compute overall accuracy with relaxed matching
test_df['correct'] = test_df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
)

# Compute accuracy by question type
accuracy_by_type = test_df.groupby('question_type').apply(
    lambda df: df['correct'].mean()
)
print("\nAccuracy by question type:")
print(accuracy_by_type)

# Debug mismatches
mismatches = test_df[~test_df['correct']].head(5)
for idx, row in mismatches.iterrows():
    print(f"Question: {row['question']}")
    print(f"Ground Truth: {row['answer']}")
    print(f"Predicted: {row['predicted_answer']}")
    print("---")

# Save predictions to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# Evaluation Metrics
tqdm.pandas()

# Load predictions
df = pd.read_csv("vqa_test_predictions_blip3_compatible.csv")

# Normalize again for evaluation
df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

# Token-level macro F1
def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()

# Accuracy with relaxed matching
accuracy = df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
).mean()

# ROUGE Score
rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

# BERTScore (Precision/Recall/F1)
P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

# BLEU Score
smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

# Print metrics
print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")

In [None]:
!pip show torch
!pip show transformers

In [None]:
!pip uninstall -y torch torchvision transformers
!pip install torch==2.0.1 torchvision==0.15.2 transformers==4.31.0

In [None]:
import transformers
print(transformers.__version__)
import torch
print(torch.__version__)

In [None]:
import torch
import torchvision
import transformers
print(torch.__version__, torchvision.__version__, transformers.__version__)

In [None]:
# --- Install and Import Dependencies ---
import sys
import subprocess

def run(cmd):
    print(f"Running: {cmd}")
    subprocess.check_call(cmd, shell=True)

# Uninstall current torch and transformers
run(f"{sys.executable} -m pip uninstall -y torch torchvision torchaudio transformers")

# Install compatible torch and transformers
run(f"{sys.executable} -m pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121")
run(f"{sys.executable} -m pip install transformers==4.40.2")

# Install other dependencies
run(f"{sys.executable} -m pip install bert-score rouge-score nltk pillow pandas tqdm scikit-learn")

def install_packages():
    packages = [
        'torch', 'transformers>=4.51.0', 'bert-score', 'rouge-score', 'nltk', 
        'pillow', 'pandas', 'tqdm', 'scikit-learn'
    ]
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except subprocess.CalledProcessError as e:
            print(f"Failed to install {package}: {e}")
            sys.exit(1)

install_packages()

import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

import torch
from transformers import Blip2Processor, Blip2ForConditionalGeneration, Trainer, TrainingArguments
from PIL import Image
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split
import re
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import numpy as np

# --- Device Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Hugging Face Token ---
hf_token = "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"  # Replace with your token

# --- Load Model and Processor ---
try:
    processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl", token=hf_token)
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)
except torch.cuda.OutOfMemoryError:
    print("CUDA OOM, switching to CPU...")
    device = torch.device("cpu")
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)

# --- Load Data ---
try:
    train_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_train.csv")
    test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")
except FileNotFoundError as e:
    print(f"Dataset not found: {e}")
    sys.exit(1)

# --- Common Answers and Synonyms ---
common_answers = {
    "colors": ["white", "black", "blue", "red", "green", "yellow", "pink", "purple", "orange", "grey", "gray", "brown", "beige", "tan", "silver", "gold", "crimson", "navy", "violet"],
    "materials": ["plastic", "silicone", "metal", "wood", "leather", "suede", "cotton", "polyester", "fabric", "synthetic", "brass", "steel", "canvas", "cloth", "rubber"],
    "yes_no": ["yes", "no"],
    "numbers": ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"],
    "phone_models": ["iphone", "samsung", "vivo", "oppo", "motorola", "lenovo", "redmi", "note", "moto", "lg", "nokia"],
    "styles": ["modern", "floral", "geometric", "abstract", "minimalist", "rustic", "classic", "ankle", "block", "sling", "vintage"],
    "objects": ["heart", "skull", "flowers", "moon", "stars", "cat", "dog", "lion", "butterflies", "teddy", "tree"]
}
valid_answers = set()
for category in common_answers.values():
    valid_answers.update(category)
synonym_map = {
    "grey": "gray", "tan": "beige", "cloth": "fabric", "crimson": "red", "navy": "blue", "violet": "purple"
}

# --- Helper Functions ---
def get_question_type(question):
    question = question.lower()
    if "color" in question or "colour" in question:
        return "color", "Answer with a color (e.g., red, blue, green): "
    elif "material" in question:
        return "material", "Answer with a material (e.g., plastic, leather, metal): "
    elif "yes or no" in question or "is " in question or "are " in question:
        return "yes_no", "Answer with yes or no: "
    elif "how many" in question:
        return "number", "Answer with a number (e.g., one, two, three): "
    elif "phone" in question or "model" in question:
        return "phone_model", "Answer with a phone model (e.g., iPhone, Samsung): "
    elif "style" in question or "pattern" in question:
        return "style", "Answer with a style or pattern (e.g., floral, modern): "
    else:
        return "object", "Answer with an object (e.g., heart, skull): "

def post_process_answer(answer, question, q_type):
    answer = answer.strip().lower()
    answer_words = answer.split()
    answer = answer_words[0] if answer_words else answer
    answer = synonym_map.get(answer, answer)
    if q_type == "yes_no":
        if "yes" in answer:
            return "yes"
        if "no" in answer:
            return "no"
        return "yes"
    elif q_type == "color":
        if answer in common_answers["colors"]:
            return answer
        return "black"
    elif q_type == "material":
        if answer in common_answers["materials"]:
            return answer
        return "plastic"
    elif q_type == "number":
        if answer in common_answers["numbers"]:
            return answer
        return "two"
    elif q_type == "phone_model":
        if answer in common_answers["phone_models"]:
            return answer
        return "iphone"
    elif q_type == "style":
        if answer in common_answers["styles"]:
            return answer
        return "modern"
    elif q_type == "object":
        if answer in common_answers["objects"]:
            return answer
        return "heart"
    if answer in valid_answers:
        return answer
    return "yes"

def normalize_text(s):
    s = str(s).strip().lower()
    s = re.sub(r'[^\w\s]', '', s)
    s = re.sub(r'\s+', ' ', s)
    return s

# --- Custom Dataset ---
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df, processor):
        self.df = df
        self.processor = processor
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = row['full_path']
        question = row['question']
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        answer = row['answer']
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Failed to load image {image_path}: {e}")
            image = Image.new('RGB', (224, 224), color='gray')
        inputs = self.processor(images=image, text=full_prompt, return_tensors="pt", padding=True)
        labels = self.processor.tokenizer(answer, return_tensors="pt", padding=True, truncation=True).input_ids
        for key in inputs:
            inputs[key] = inputs[key].squeeze(0)
        labels = labels.squeeze(0)
        return {
            "pixel_values": inputs["pixel_values"], 
            "input_ids": inputs["input_ids"], 
            "attention_mask": inputs["attention_mask"], 
            "labels": labels
        }

# --- Data Split ---
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
train_dataset = VQADataset(train_df, processor)
val_dataset = VQADataset(val_df, processor)

# --- Training Arguments (transformers >= 4.51.0 uses 'eval_strategy') ---
steps_per_epoch = len(train_dataset) // 2  # Batch size is 2
training_args = TrainingArguments(
    output_dir="./blip2_finetuned",
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=50,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    save_steps=500,
    save_total_limit=1,
    fp16=True,
    eval_strategy="epoch",   # <-- correct for transformers >= 4.51.0
    remove_unused_columns=False
)

# --- Trainer ---
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# --- Inference on Test Set ---
predictions = []
references = test_df['answer'].tolist()
for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=full_prompt, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs, max_new_tokens=3, num_beams=3)
        answer = processor.decode(output[0], skip_special_tokens=True)
        answer = post_process_answer(answer, question, q_type)
        torch.cuda.empty_cache()
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = "yes"
    predictions.append(answer)

test_df['predicted_answer'] = predictions
test_df['question_type'] = test_df['question'].apply(lambda q: get_question_type(q)[0])

test_df['answer'] = test_df['answer'].astype(str).apply(normalize_text)
test_df['predicted_answer'] = test_df['predicted_answer'].astype(str).apply(normalize_text)

test_df['correct'] = test_df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
)
accuracy_by_type = test_df.groupby('question_type').apply(
    lambda df: df['correct'].mean()
)
print("\nAccuracy by question type:")
print(accuracy_by_type)

mismatches = test_df[~test_df['correct']].head(5)
for idx, row in mismatches.iterrows():
    print(f"Question: {row['question']}")
    print(f"Ground Truth: {row['answer']}")
    print(f"Predicted: {row['predicted_answer']}")
    print("---")

output_path = Path("vqa_test_predictions_blip3_compatible.csv")
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# --- Evaluation Metrics ---
tqdm.pandas()
df = pd.read_csv(output_path)
df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()
accuracy = df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
).mean()

rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")


In [None]:
import sys
import subprocess

def run(cmd):
    print(f"Running: {cmd}")
    subprocess.check_call(cmd, shell=True)

# Uninstall incompatible versions
run(f"{sys.executable} -m pip uninstall -y torch torchvision torchaudio torchao transformers")

# Install compatible torch and transformers
run(f"{sys.executable} -m pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121")
run(f"{sys.executable} -m pip install transformers==4.40.2")

# Install other dependencies
run(f"{sys.executable} -m pip install bert-score rouge-score nltk pillow pandas tqdm scikit-learn")

print("✅ All dependencies installed. Please RESTART the runtime/kernel, then run the next cell.")


In [None]:
import sys
import subprocess

def run(cmd):
    print(f"Running: {cmd}")
    subprocess.check_call(cmd, shell=True)

run(f"{sys.executable} -m pip uninstall -y peft")


In [None]:
run(f"{sys.executable} -m pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121")
run(f"{sys.executable} -m pip install transformers==4.40.2")


In [None]:
!pip install peft --upgrade


In [None]:
import sys
import subprocess

def run(cmd):
    print(f"Running: {cmd}")
    subprocess.check_call(cmd, shell=True)

# Uninstall incompatible versions
run(f"{sys.executable} -m pip uninstall -y torch torchvision torchaudio torchao transformers")

# Install compatible torch and transformers
run(f"{sys.executable} -m pip install torch==2.2.2 torchvision==0.17.2 torchaudio==2.2.2 --index-url https://download.pytorch.org/whl/cu121")
run(f"{sys.executable} -m pip install transformers==4.40.2")

# Install other dependencies
run(f"{sys.executable} -m pip install bert-score rouge-score nltk pillow pandas tqdm scikit-learn")

print("✅ All dependencies installed. Please RESTART the runtime/kernel, then run the next cell.")



In [None]:
import os
import torch
from transformers import Blip2Processor, Blip2ForConditionalGeneration, Trainer, TrainingArguments
from PIL import Image
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from sklearn.model_selection import train_test_split
import re
from sklearn.metrics import f1_score
from bert_score import score as bert_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import numpy as np

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# --- Device Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Hugging Face Token ---
hf_token = "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"  # Replace with your token

# --- Load Model and Processor ---
try:
    processor = Blip2Processor.from_pretrained("Salesforce/blip2-flan-t5-xl", token=hf_token)
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)
except torch.cuda.OutOfMemoryError:
    print("CUDA OOM, switching to CPU...")
    device = torch.device("cpu")
    model = Blip2ForConditionalGeneration.from_pretrained(
        "Salesforce/blip2-flan-t5-xl", torch_dtype=torch.float16, token=hf_token
    ).to(device)

# --- Load Data ---
try:
    train_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_train.csv")
    test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")
except FileNotFoundError as e:
    print(f"Dataset not found: {e}")
    raise

# --- Common Answers and Synonyms ---
common_answers = {
    "colors": ["white", "black", "blue", "red", "green", "yellow", "pink", "purple", "orange", "grey", "gray", "brown", "beige", "tan", "silver", "gold", "crimson", "navy", "violet"],
    "materials": ["plastic", "silicone", "metal", "wood", "leather", "suede", "cotton", "polyester", "fabric", "synthetic", "brass", "steel", "canvas", "cloth", "rubber"],
    "yes_no": ["yes", "no"],
    "numbers": ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"],
    "phone_models": ["iphone", "samsung", "vivo", "oppo", "motorola", "lenovo", "redmi", "note", "moto", "lg", "nokia"],
    "styles": ["modern", "floral", "geometric", "abstract", "minimalist", "rustic", "classic", "ankle", "block", "sling", "vintage"],
    "objects": ["heart", "skull", "flowers", "moon", "stars", "cat", "dog", "lion", "butterflies", "teddy", "tree"]
}
valid_answers = set()
for category in common_answers.values():
    valid_answers.update(category)
synonym_map = {
    "grey": "gray", "tan": "beige", "cloth": "fabric", "crimson": "red", "navy": "blue", "violet": "purple"
}

# --- Helper Functions ---
def get_question_type(question):
    question = question.lower()
    if "color" in question or "colour" in question:
        return "color", "Answer with a color (e.g., red, blue, green): "
    elif "material" in question:
        return "material", "Answer with a material (e.g., plastic, leather, metal): "
    elif "yes or no" in question or "is " in question or "are " in question:
        return "yes_no", "Answer with yes or no: "
    elif "how many" in question:
        return "number", "Answer with a number (e.g., one, two, three): "
    elif "phone" in question or "model" in question:
        return "phone_model", "Answer with a phone model (e.g., iPhone, Samsung): "
    elif "style" in question or "pattern" in question:
        return "style", "Answer with a style or pattern (e.g., floral, modern): "
    else:
        return "object", "Answer with an object (e.g., heart, skull): "

def post_process_answer(answer, question, q_type):
    answer = answer.strip().lower()
    answer_words = answer.split()
    answer = answer_words[0] if answer_words else answer
    answer = synonym_map.get(answer, answer)
    if q_type == "yes_no":
        if "yes" in answer:
            return "yes"
        if "no" in answer:
            return "no"
        return "yes"
    elif q_type == "color":
        if answer in common_answers["colors"]:
            return answer
        return "black"
    elif q_type == "material":
        if answer in common_answers["materials"]:
            return answer
        return "plastic"
    elif q_type == "number":
        if answer in common_answers["numbers"]:
            return answer
        return "two"
    elif q_type == "phone_model":
        if answer in common_answers["phone_models"]:
            return answer
        return "iphone"
    elif q_type == "style":
        if answer in common_answers["styles"]:
            return answer
        return "modern"
    elif q_type == "object":
        if answer in common_answers["objects"]:
            return answer
        return "heart"
    if answer in valid_answers:
        return answer
    return "yes"

def normalize_text(s):
    s = str(s).strip().lower()
    s = re.sub(r'[^\w\s]', '', s)
    s = re.sub(r'\s+', ' ', s)
    return s

# --- Custom Dataset ---
class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df, processor):
        self.df = df
        self.processor = processor
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = row['full_path']
        question = row['question']
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        answer = row['answer']
        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Failed to load image {image_path}: {e}")
            image = Image.new('RGB', (224, 224), color='gray')
        inputs = self.processor(images=image, text=full_prompt, return_tensors="pt", padding=True)
        labels = self.processor.tokenizer(answer, return_tensors="pt", padding=True, truncation=True).input_ids
        for key in inputs:
            inputs[key] = inputs[key].squeeze(0)
        labels = labels.squeeze(0)
        return {
            "pixel_values": inputs["pixel_values"], 
            "input_ids": inputs["input_ids"], 
            "attention_mask": inputs["attention_mask"], 
            "labels": labels
        }

# --- Data Split ---
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)
train_dataset = VQADataset(train_df, processor)
val_dataset = VQADataset(val_df, processor)

# --- Training Arguments ---
steps_per_epoch = len(train_dataset) // 2  # Batch size is 2
training_args = TrainingArguments(
    output_dir="./blip2_finetuned",
    num_train_epochs=3,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    warmup_steps=50,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    save_steps=500,
    save_total_limit=1,
    fp16=True,
    evaluation_strategy="epoch",   # transformers 4.40.2 uses 'evaluation_strategy'
    remove_unused_columns=False
)

# --- Trainer ---
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
)

print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# --- Inference on Test Set ---
predictions = []
references = test_df['answer'].tolist()
for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        q_type, prompt = get_question_type(question)
        full_prompt = prompt + question
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=full_prompt, return_tensors="pt").to(device, torch.float16)
        output = model.generate(**inputs, max_new_tokens=3, num_beams=3)
        answer = processor.decode(output[0], skip_special_tokens=True)
        answer = post_process_answer(answer, question, q_type)
        torch.cuda.empty_cache()
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = "yes"
    predictions.append(answer)

test_df['predicted_answer'] = predictions
test_df['question_type'] = test_df['question'].apply(lambda q: get_question_type(q)[0])

test_df['answer'] = test_df['answer'].astype(str).apply(normalize_text)
test_df['predicted_answer'] = test_df['predicted_answer'].astype(str).apply(normalize_text)

test_df['correct'] = test_df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
)
accuracy_by_type = test_df.groupby('question_type').apply(
    lambda df: df['correct'].mean()
)
print("\nAccuracy by question type:")
print(accuracy_by_type)

mismatches = test_df[~test_df['correct']].head(5)
for idx, row in mismatches.iterrows():
    print(f"Question: {row['question']}")
    print(f"Ground Truth: {row['answer']}")
    print(f"Predicted: {row['predicted_answer']}")
    print("---")

output_path = Path("vqa_test_predictions_blip3_compatible.csv")
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")

# --- Evaluation Metrics ---
tqdm.pandas()
df = pd.read_csv(output_path)
df['answer'] = df['answer'].astype(str).apply(normalize_text)
df['predicted_answer'] = df['predicted_answer'].astype(str).apply(normalize_text)

def compute_token_f1(pred, gt):
    pred_tokens = pred.split()
    gt_tokens = gt.split()
    common = set(pred_tokens) & set(gt_tokens)
    if not pred_tokens or not gt_tokens:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(gt_tokens)
    if precision + recall == 0:
        return 0.0
    return 2 * precision * recall / (precision + recall)

df['token_f1'] = df.progress_apply(lambda row: compute_token_f1(row['predicted_answer'], row['answer']), axis=1)
avg_token_f1 = df['token_f1'].mean()
accuracy = df.apply(
    lambda row: row['predicted_answer'] in row['answer'].split(),
    axis=1
).mean()

rouge = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
df['rougeL'] = df.progress_apply(lambda row: rouge.score(row['answer'], row['predicted_answer'])['rougeL'].fmeasure, axis=1)
avg_rougeL = df['rougeL'].mean()

P, R, F1 = bert_score(df['predicted_answer'].tolist(), df['answer'].tolist(), lang='en', verbose=True)
bert_f1 = F1.mean().item()

smooth_fn = SmoothingFunction().method1
df['bleu'] = df.progress_apply(
    lambda row: sentence_bleu([row['answer'].split()], row['predicted_answer'].split(), smoothing_function=smooth_fn),
    axis=1
)
avg_bleu = df['bleu'].mean()

print(f"Token F1      : {avg_token_f1:.4f}")
print(f"Accuracy      : {accuracy:.4f}")
print(f"ROUGE-L       : {avg_rougeL:.4f}")
print(f"BERTScore F1  : {bert_f1:.4f}")
print(f"BLEU          : {avg_bleu:.4f}")


In [None]:
import shutil
shutil.rmtree("/root/.cache/huggingface/hub/models--Salesforce--blip2-opt-2.7b", ignore_errors=True)


In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import csv
from pathlib import Path
import shutil

# Clear any potentially corrupted cached model files
shutil.rmtree("/root/.cache/huggingface/hub/models--Salesforce--blip2-opt-2.7b", ignore_errors=True)

# Output CSV
output_path = Path("vqa_test_predictions.csv")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (keep secret in production)
hf_token = "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"

# Load the processor and model with trust_remote_code
processor = Blip2Processor.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    token=hf_token,
    trust_remote_code=True
)

model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16,
    token=hf_token,
    trust_remote_code=True
)
model.eval()

# Load data
train_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_train.csv")
test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")

# Run model on test set
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        ground_truth = row['answer']

        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)
        generated_ids = model.generate(**inputs, max_new_tokens=50)
        answer = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

test_df['predicted_answer'] = predictions
test_df['correct'] = test_df['predicted_answer'].str.strip().str.lower() == test_df['answer'].str.strip().str.lower()

# Save to CSV
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")


In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import shutil
import os

# Force delete corrupted cache (for Kaggle/Colab only)
model_cache_path = os.path.expanduser("~/.cache/huggingface/hub/models--Salesforce--blip2-opt-2.7b")
if os.path.exists(model_cache_path):
    shutil.rmtree(model_cache_path, ignore_errors=True)

# Output CSV path
output_path = Path("vqa_test_predictions.csv")

# Choose device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (use your own, keep private)
hf_token = "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"

# Load processor (DO NOT use trust_remote_code here)
processor = Blip2Processor.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    token=hf_token
)

# Load model (trust_remote_code is required here)
model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    token=hf_token,
    trust_remote_code=True
)
model.eval()

# Load test dataset
test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")

# Predict answers
predictions = []
references = test_df['answer'].tolist()

for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA on Test Set"):
    try:
        image_path = row['full_path']
        question = row['question']
        ground_truth = row['answer']

        # Load image and run processor
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=question, return_tensors="pt").to(device)
        inputs = {k: v.to(dtype=torch.float16 if torch.cuda.is_available() else torch.float32) for k, v in inputs.items()}

        # Generate prediction
        generated_ids = model.generate(**inputs, max_new_tokens=50)
        answer = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    except Exception as e:
        print(f"Failed on {row['full_path']}: {e}")
        answer = ""
    predictions.append(answer)

# Save results
test_df['predicted_answer'] = predictions
test_df['correct'] = test_df['predicted_answer'].str.strip().str.lower() == test_df['answer'].str.strip().str.lower()
test_df.to_csv(output_path, index=False)

print(f"\n✅ Predictions saved to: {output_path}")


In [None]:
pip install transformers==4.36.2 tokenizers==0.14.1 huggingface-hub>=0.19.3 --force-reinstall


In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import shutil

# Output file path
output_path = Path("vqa_test_predictions.csv")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hugging Face token (replace with your actual token)
hf_token = "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"

# Clear the local cache to avoid corrupt downloads
model_id = "Salesforce/blip2-opt-2.7b"
cache_path = os.path.expanduser(f"~/.cache/huggingface/hub/models--{model_id.replace('/', '--')}")
if os.path.exists(cache_path):
    shutil.rmtree(cache_path)

# Load processor and model
processor = Blip2Processor.from_pretrained(
    model_id,
    token=hf_token,
    force_download=True
)
model = Blip2ForConditionalGeneration.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.float16,
    token=hf_token,
    force_download=True
).to(device)
model.eval()

# Load dataset
train_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_train.csv")
test_df = pd.read_csv("/kaggle/input/vr4567/more_and_12049_test.csv")

# Predict answers
predictions = []
for _, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA"):
    try:
        image_path = row['full_path']
        question = row['question']
        image = Image.open(image_path).convert("RGB")

        # Preprocess
        inputs = processor(images=image, text=question, return_tensors="pt").to(device, torch.float16)

        # Generate answer
        generated_ids = model.generate(**inputs, max_new_tokens=50)
        answer = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    except Exception as e:
        print(f"Failed on {image_path}: {e}")
        answer = ""
    predictions.append(answer)

# Save results
test_df['predicted_answer'] = predictions
test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
test_df.to_csv(output_path, index=False)
print(f"Predictions saved to {output_path}")


In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import logging
import warnings

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths
# Change these paths to match your dataset location
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"
OUTPUT_PATH = Path("vqa_test_predictions.csv")

# Set device - use MPS for Mac M1/M2 if available
if torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
elif torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model(model_retry=0):
    """Load BLIP-2 model with retry logic"""
    model_id = "Salesforce/blip2-opt-2.7b"
    
    try:
        logger.info(f"Loading model {model_id}, attempt {model_retry + 1}")
        
        # Clear the cache for this model before loading to avoid corrupt files
        cache_path = os.path.expanduser(f"~/.cache/huggingface/hub/models--{model_id.replace('/', '--')}")
        if os.path.exists(cache_path):
            import shutil
            shutil.rmtree(cache_path)
            logger.info(f"Cleared cache at {cache_path}")
        
        # First try without the token
        try:
            processor = Blip2Processor.from_pretrained(model_id)
            model = Blip2ForConditionalGeneration.from_pretrained(
                model_id,
                device_map="auto" if device.type == "cuda" else None,
                torch_dtype=torch.float16 if device.type in ["cuda", "mps"] else torch.float32
            ).to(device)
            logger.info("Successfully loaded model without token")
        except Exception as no_token_error:
            logger.warning(f"Failed to load without token: {no_token_error}")
            logger.info("Trying with token...")
            
            # If you have a Hugging Face token, use it (optional)
            # Replace with your actual token if needed
            hf_token = None  # Set to None to try without a token first
            
            processor = Blip2Processor.from_pretrained(
                model_id,
                token=hf_token
            )
            model = Blip2ForConditionalGeneration.from_pretrained(
                model_id,
                device_map="auto" if device.type == "cuda" else None,
                torch_dtype=torch.float16 if device.type in ["cuda", "mps"] else torch.float32,
                token=hf_token
            ).to(device)
        
        model.eval()
        return processor, model
    
    except Exception as e:
        if model_retry < 2:  # Try up to 3 times
            logger.warning(f"Failed to load model: {e}. Retrying...")
            return load_model(model_retry + 1)
        else:
            # If all attempts fail, try an alternative model
            logger.warning(f"Failed to load BLIP-2. Trying alternative model...")
            return load_alternative_model()

def load_alternative_model():
    """Fallback to a smaller BLIP-2 model if the main one fails"""
    logger.info("Attempting to load smaller BLIP-2 model")
    
    try:
        from transformers import Blip2Processor, Blip2ForConditionalGeneration
        
        # Try a smaller BLIP-2 model
        alt_model_id = "Salesforce/blip2-opt-1.7b"
        
        processor = Blip2Processor.from_pretrained(alt_model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            alt_model_id,
            device_map="auto" if device.type == "cuda" else None,
            torch_dtype=torch.float16 if device.type in ["cuda", "mps"] else torch.float32
        ).to(device)
        
        model.eval()
        logger.info("Successfully loaded alternative BLIP-2 model")
        return processor, model
    
    except Exception as e:
        logger.error(f"Failed to load alternative model: {e}")
        raise RuntimeError("All model loading attempts failed")

def process_data():
    """Load and process the dataset"""
    try:
        # Load dataset from configured paths
        logger.info(f"Loading training data from: {TRAIN_DATA_PATH}")
        train_df = pd.read_csv(TRAIN_DATA_PATH)
        logger.info(f"Loaded training data with {len(train_df)} samples")
        
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        
        # Validate expected columns exist
        required_cols = ['full_path', 'question']
        for col in required_cols:
            if col not in test_df.columns:
                raise ValueError(f"Required column '{col}' not found in test data")
        
        return test_df
    except FileNotFoundError as e:
        logger.error(f"Data file not found: {e}")
        logger.error("Please check the TRAIN_DATA_PATH and TEST_DATA_PATH variables at the top of the script")
        raise
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def get_answer(processor, model, image_path, question):
    """Process a single VQA sample"""
    try:
        # Open and process image
        image = Image.open(image_path).convert("RGB")
        
        # Prepare inputs
        inputs = processor(images=image, text=question, return_tensors="pt").to(device)
        
        # Generate answer
        with torch.no_grad():
            generated_ids = model.generate(**inputs, max_new_tokens=50)
            answer = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        
        return answer
    
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return ""

def main():
    try:
        # Load model
        processor, model = load_model()
        
        # Load dataset
        test_df = process_data()
        
        # Predict answers
        predictions = []
        successes = 0
        failures = 0
        
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA"):
            try:
                image_path = row['full_path']
                question = row['question']
                
                answer = get_answer(processor, model, image_path, question)
                predictions.append(answer)
                successes += 1
                
                # Log progress periodically
                if idx % 50 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples")
                
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append("")
                failures += 1
        
        # Save results
        test_df['predicted_answer'] = predictions
        
        if 'answer' in test_df.columns:
            # Calculate accuracy if answers are available
            test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
            accuracy = test_df['correct'].mean()
            logger.info(f"Accuracy: {accuracy:.4f}")
        
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
        
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import logging
import warnings
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths - IMPORTANT: Modify these to match your setup
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"  # Updated path
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"    # Updated path
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"  # Base directory for images
OUTPUT_PATH = Path("vqa_test_predictions3.csv")

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a simpler BLIP model that's more reliable"""
    logger.info("Loading BLIP model")
    
    try:
        # Using the base BLIP model which is more reliable than BLIP-2
        model_id = "Salesforce/blip-vqa-base"
        
        processor = BlipProcessor.from_pretrained(model_id)
        model = BlipForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        
        model.eval()
        logger.info("Successfully loaded BLIP model")
        return processor, model
    
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Check if the image exists at the given path, try alternatives if not"""
    # First try the exact path provided
    if os.path.exists(path):
        return path
    
    # Try without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            return alt_path
    
    # Extract just the filename from the path
    filename = os.path.basename(path)
    
    # Check if file exists in IMAGES_BASE_DIR
    images_path = os.path.join(IMAGES_BASE_DIR, filename)
    if os.path.exists(images_path):
        return images_path
    
    # Try combining with various directory structures
    image_dirs = [
        IMAGES_BASE_DIR,                            # Direct images directory
        "",                                         # Current directory
        'images',                                   # Common subdir
        'data/images',                              # Alternative subdir
        '../images',                                # Parent dir
        'input/images',                             # Another common structure
        'vrmini2/abo-images-small/images',          # Explicit path
        '/kaggle/input/vrmini2/abo-images-small/images'  # Full Kaggle path
    ]
    
    # Try each directory
    for img_dir in image_dirs:
        alt_path = os.path.join(img_dir, filename)
        if os.path.exists(alt_path):
            return alt_path
    
    # If still not found, extract potential subdirectories from the original path
    # For example, if path is "/kaggle/something/category/image.jpg",
    # try "category/image.jpg" in our base directories
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):  # Try up to 3 subdirectories
        partial_path = os.path.join(*path_parts[-i:])
        for img_dir in image_dirs:
            alt_path = os.path.join(img_dir, partial_path)
            if os.path.exists(alt_path):
                return alt_path
    
    # Log warning about missing image
    logger.warning(f"Could not find image at any location: {path}")
    logger.warning(f"Tried IMAGES_BASE_DIR: {IMAGES_BASE_DIR}")
    
    # Return original path even though it doesn't exist
    return path

def process_data():
    """Load and process the dataset"""
    try:
        # Skip training data, only load test data
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        
        # Verify that required columns exist
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        # Print a sample of the paths to debug
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        
        # Only process a small subset for initial testing
        logger.info("Processing full dataset")
        return test_df
        
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def get_answer(processor, model, image_path, question):
    """Process a single VQA sample"""
    try:
        # Verify image path
        verified_path = verify_image_path(image_path)
        
        # Check if file exists
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Image not found"
        
        # Open and process image
        image = Image.open(verified_path).convert("RGB")
        
        # Prepare inputs for BLIP
        inputs = processor(image, question, return_tensors="pt").to(device)
        
        # Generate answer
        with torch.no_grad():
            outputs = model.generate(**inputs)
            answer = processor.decode(outputs[0], skip_special_tokens=True)
        
        return answer.strip()
    
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return f"Error: {str(e)[:50]}"  # Include short error message in prediction

def main():
    try:
        # Load model - using simpler BLIP model which is more reliable
        processor, model = load_model()
        
        # Load dataset
        test_df = process_data()
        
        # Log image directory structure
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        
        # Predict answers
        predictions = []
        successes = 0
        failures = 0
        
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA"):
            try:
                image_path = row['full_path']
                question = row['question']
                
                # Print some debug info for the first few samples
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                
                answer = get_answer(processor, model, image_path, question)
                
                # Print answers for first few samples
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                
                predictions.append(answer)
                
                if "Error" not in answer and "not found" not in answer:
                    successes += 1
                else:
                    failures += 1
                
                # Log progress periodically
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                
                # Small delay to prevent overloading
                time.sleep(0.01)
                
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append(f"Error: {str(e)[:50]}")
                failures += 1
        
        # Save results
        test_df['predicted_answer'] = predictions
        
        if 'answer' in test_df.columns:
            # Calculate accuracy if answers are available
            test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
            accuracy = test_df['correct'].mean()
            logger.info(f"Accuracy: {accuracy:.4f}")
        
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
        
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import logging
import warnings
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths - IMPORTANT: Modify these to match your setup
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"  # Updated path
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"    # Updated path
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"  # Base directory for images
OUTPUT_PATH = Path("vqa_test_predictions4.csv")

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    
    try:
        # Use BLIP-2 model
        model_id = "Salesforce/blip2-opt-2.7b"  # BLIP-2 model with OPT backbone
        
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Check if the image exists at the given path, try alternatives if not"""
    # First try the exact path provided
    if os.path.exists(path):
        return path
    
    # Try without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            return alt_path
    
    # Extract just the filename from the path
    filename = os.path.basename(path)
    
    # Check if file exists in IMAGES_BASE_DIR
    images_path = os.path.join(IMAGES_BASE_DIR, filename)
    if os.path.exists(images_path):
        return images_path
    
    # Try combining with various directory structures
    image_dirs = [
        IMAGES_BASE_DIR,                            # Direct images directory
        "",                                         # Current directory
        'images',                                   # Common subdir
        'data/images',                              # Alternative subdir
        '../images',                                # Parent dir
        'input/images',                             # Another common structure
        'vrmini2/abo-images-small/images',          # Explicit path
        '/kaggle/input/vrmini2/abo-images-small/images'  # Full Kaggle path
    ]
    
    # Try each directory
    for img_dir in image_dirs:
        alt_path = os.path.join(img_dir, filename)
        if os.path.exists(alt_path):
            return alt_path
    
    # If still not found, extract potential subdirectories from the original path
    # For example, if path is "/kaggle/something/category/image.jpg",
    # try "category/image.jpg" in our base directories
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):  # Try up to 3 subdirectories
        partial_path = os.path.join(*path_parts[-i:])
        for img_dir in image_dirs:
            alt_path = os.path.join(img_dir, partial_path)
            if os.path.exists(alt_path):
                return alt_path
    
    # Log warning about missing image
    logger.warning(f"Could not find image at any location: {path}")
    logger.warning(f"Tried IMAGES_BASE_DIR: {IMAGES_BASE_DIR}")
    
    # Return original path even though it doesn't exist
    return path

def process_data():
    """Load and process the dataset"""
    try:
        # Skip training data, only load test data
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        
        # Verify that required columns exist
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        # Print a sample of the paths to debug
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        
        logger.info("Processing full dataset")
        return test_df
        
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def get_answer(processor, model, image_path, question):
    """Process a single VQA sample using BLIP-2"""
    try:
        # Verify image path
        verified_path = verify_image_path(image_path)
        
        # Check if file exists
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Image not found"
        
        # Open and process image
        image = Image.open(verified_path).convert("RGB")
        
        # BLIP-2 specific processing
        # Format with the VQA prompt
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        
        with torch.no_grad():
            # Generate answer with BLIP-2
            outputs = model.generate(
                **inputs,
                max_new_tokens=50,
                num_beams=5,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            
            # Try to extract just the answer part (strip the prompt)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
        
        # Check if answer is empty or just repeating the question
        if not answer or answer.strip() == question.strip():
            answer = "Unable to determine from image"
        
        return answer.strip()
    
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return f"Error: {str(e)[:50]}"  # Include short error message in prediction

def main():
    try:
        # Load BLIP-2 model
        processor, model = load_model()
        
        # Load dataset
        test_df = process_data()
        
        # Log image directory structure
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        
        # Predict answers
        predictions = []
        successes = 0
        failures = 0
        
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                
                # Print some debug info for the first few samples
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                
                answer = get_answer(processor, model, image_path, question)
                
                # Print answers for first few samples
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                
                predictions.append(answer)
                
                if "Error" not in answer and "not found" not in answer:
                    successes += 1
                else:
                    failures += 1
                
                # Log progress periodically
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                
                # Small delay to prevent overloading
                time.sleep(0.01)
                
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append(f"Error: {str(e)[:50]}")
                failures += 1
        
        # Save results
        test_df['predicted_answer'] = predictions
        
        if 'answer' in test_df.columns:
            # Calculate accuracy if answers are available
            test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
            accuracy = test_df['correct'].mean()
            logger.info(f"Accuracy: {accuracy:.4f}")
        
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
        
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import logging
import warnings
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths - IMPORTANT: Modify these to match your setup
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"  # Updated path
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"    # Updated path
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"  # Base directory for images
OUTPUT_PATH = Path("vqa_test_predictions5.csv")


# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    
    try:
        # Use BLIP-2 model
        model_id = "Salesforce/blip2-opt-2.7b"  # BLIP-2 model with OPT backbone
        
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Check if the image exists at the given path, try alternatives if not"""
    # First try the exact path provided
    if os.path.exists(path):
        return path
    
    # Try without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            return alt_path
    
    # Extract just the filename from the path
    filename = os.path.basename(path)
    
    # Check if file exists in IMAGES_BASE_DIR
    images_path = os.path.join(IMAGES_BASE_DIR, filename)
    if os.path.exists(images_path):
        return images_path
    
    # Try combining with various directory structures
    image_dirs = [
        IMAGES_BASE_DIR,                            # Direct images directory
        "",                                         # Current directory
        'images',                                   # Common subdir
        'data/images',                              # Alternative subdir
        '../images',                                # Parent dir
        'input/images',                             # Another common structure
        'vrmini2/abo-images-small/images',          # Explicit path
        '/kaggle/input/vrmini2/abo-images-small/images'  # Full Kaggle path
    ]
    
    # Try each directory
    for img_dir in image_dirs:
        alt_path = os.path.join(img_dir, filename)
        if os.path.exists(alt_path):
            return alt_path
    
    # If still not found, extract potential subdirectories from the original path
    # For example, if path is "/kaggle/something/category/image.jpg",
    # try "category/image.jpg" in our base directories
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):  # Try up to 3 subdirectories
        partial_path = os.path.join(*path_parts[-i:])
        for img_dir in image_dirs:
            alt_path = os.path.join(img_dir, partial_path)
            if os.path.exists(alt_path):
                return alt_path
    
    # Log warning about missing image
    logger.warning(f"Could not find image at any location: {path}")
    logger.warning(f"Tried IMAGES_BASE_DIR: {IMAGES_BASE_DIR}")
    
    # Return original path even though it doesn't exist
    return path

def process_data():
    """Load and process the dataset"""
    try:
        # Skip training data, only load test data
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        
        # Verify that required columns exist
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        # Print a sample of the paths to debug
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        
        logger.info("Processing full dataset")
        return test_df
        
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

# Open and process image
        image = Image.open(verified_path).convert("RGB")
        
        # BLIP-2 specific processing
        # Format with the VQA prompt
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        
        with torch.no_grad():
            # Generate answer with BLIP-2
            outputs = model.generate(
                **inputs,
                max_new_tokens=50,
                num_beams=5,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            
            # Try to extract just the answer part (strip the prompt)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
        
        # Check if answer is empty or just repeating the question
        if not answer or answer.strip() == question.strip():
            answer = "Unable to determine from image"
        
        return answer.strip()
    
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return f"Error: {str(e)[:50]}"  # Include short error message in prediction

def main():
    try:
        # Load BLIP-2 model
        processor, model = load_model()
        
        # Load dataset
        test_df = process_data()
        
        # Log image directory structure
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        
        # Predict answers
        predictions = []
        successes = 0
        failures = 0
        
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                
                # Print some debug info for the first few samples
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                
                answer = get_answer(processor, model, image_path, question)
                
                # Print answers for first few samples
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                
                predictions.append(answer)
                
                if "Error" not in answer and "not found" not in answer:
                    successes += 1
                else:
                    failures += 1
                
                # Log progress periodically
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                
                # Small delay to prevent overloading
                time.sleep(0.01)
                
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append(f"Error: {str(e)[:50]}")
                failures += 1
        
        # Save results
        test_df['predicted_answer'] = predictions
        
        if 'answer' in test_df.columns:
            # Calculate accuracy if answers are available
            test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
            accuracy = test_df['correct'].mean()
            logger.info(f"Accuracy: {accuracy:.4f}")
        
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
        
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import os
import logging
import warnings
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"
OUTPUT_PATH = Path("vqa_test_predictions6.csv")

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    try:
        model_id = "Salesforce/blip2-opt-2.7b"
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Check if the image exists at the given path, try alternatives if not"""
    if os.path.exists(path):
        return path
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            return alt_path
    filename = os.path.basename(path)
    images_path = os.path.join(IMAGES_BASE_DIR, filename)
    if os.path.exists(images_path):
        return images_path
    image_dirs = [
        IMAGES_BASE_DIR,
        "",
        'images',
        'data/images',
        '../images',
        'input/images',
        'vrmini2/abo-images-small/images',
        '/kaggle/input/vrmini2/abo-images-small/images'
    ]
    for img_dir in image_dirs:
        alt_path = os.path.join(img_dir, filename)
        if os.path.exists(alt_path):
            return alt_path
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):
        partial_path = os.path.join(*path_parts[-i:])
        for img_dir in image_dirs:
            alt_path = os.path.join(img_dir, partial_path)
            if os.path.exists(alt_path):
                return alt_path
    logger.warning(f"Could not find image at any location: {path}")
    logger.warning(f"Tried IMAGES_BASE_DIR: {IMAGES_BASE_DIR}")
    return path

def process_data():
    """Load and process the dataset"""
    try:
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        logger.info("Processing full dataset")
        return test_df
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def get_answer(processor, model, image_path, question):
    """Generate a one-word answer using BLIP-2"""
    try:
        verified_path = verify_image_path(image_path)
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Missing"
        image = Image.open(verified_path).convert("RGB")
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=1,  # Limit to one token
                num_beams=3,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
            # Extract first word
            answer = answer.split()[0] if answer.strip() else "Unknown"
            if not answer or answer.strip() == question.strip():
                answer = "Unknown"
            return answer.strip()
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return "Error"

def main():
    try:
        processor, model = load_model()
        test_df = process_data()
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        predictions = []
        successes = 0
        failures = 0
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                answer = get_answer(processor, model, image_path, question)
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                predictions.append(answer)
                if answer not in ["Error", "Missing", "Unknown"]:
                    successes += 1
                else:
                    failures += 1
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                time.sleep(0.01)
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append("Error")
                failures += 1
        test_df['predicted_answer'] = predictions
        if 'answer' in test_df.columns:
            test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
            accuracy = test_df['correct'].mean()
            logger.info(f"Accuracy: {accuracy:.4f}")
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # Suppress TensorFlow CUDA warnings

from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import logging
import warnings
import time
import re
try:
    from word2number import w2n
except ImportError:
    raise ImportError("Please install word2number: `pip install word2number`")
from bert_score import score
from sklearn.metrics import precision_recall_f1_support

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"
OUTPUT_PATH = Path("vqa_test_predictions5.csv")

# Image path cache
image_path_cache = {}

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    try:
        model_id = "Salesforce/blip2-opt-2.7b"
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Robustly verify and find the image path, with recursive search and case-insensitive matching"""
    global image_path_cache
    
    # Check cache first
    if path in image_path_cache:
        return image_path_cache[path]
    
    # Original path
    if os.path.exists(path):
        image_path_cache[path] = path
        return path
    
    logger.debug(f"Attempting to find image for path: {path}")
    
    # Extract filename and its lowercase version
    filename = os.path.basename(path)
    filename_lower = filename.lower()
    
    # Define search directories
    search_dirs = [
        IMAGES_BASE_DIR,
        "/kaggle/input/vrmini2/abo-images-small/images/small",
        "/kaggle/input/vrmini2/abo-images-small/images",
        "/kaggle/input/vrmini2",
        "images",
        "data/images",
        "input/images",
        "",
        "../images",
        "/kaggle/input/vrmini2/abo-images-small"
    ]
    
    # Try exact path without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            image_path_cache[path] = alt_path
            logger.debug(f"Found image at: {alt_path}")
            return alt_path
    
    # Recursive search in each directory
    for search_dir in search_dirs:
        if not os.path.exists(search_dir):
            logger.debug(f"Search directory does not exist: {search_dir}")
            continue
        try:
            # Use pathlib for recursive search
            for p in Path(search_dir).rglob(filename):
                if p.is_file():
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image at: {str(p)}")
                    return str(p)
            # Case-insensitive search
            for p in Path(search_dir).rglob("*"):
                if p.is_file() and p.name.lower() == filename_lower:
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image (case-insensitive) at: {str(p)}")
                    return str(p)
        except Exception as e:
            logger.debug(f"Error searching in {search_dir}: {e}")
    
    # Try partial path Futbolcomponents (e.g., last 1-3 subdirectories)
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):
        partial_path = os.path.join(*path_parts[-i:])
        for search_dir in search_dirs:
            alt_path = os.path.join(search_dir, partial_path)
            if os.path.exists(alt_path):
                image_path_cache[path] = alt_path
                logger.debug(f"Found image at partial path: {alt_path}")
                return alt_path
            # Case-insensitive check
            try:
                for p in Path(search_dir).rglob(os.path.basename(partial_path)):
                    if p.is_file() and p.name.lower() == os.path.basename(partial_path).lower():
                        image_path_cache[path] = str(p)
                        logger.debug(f"Found image at partial path (case-insensitive): {str(p)}")
                        return str(p)
            except Exception as e:
                logger.debug(f"Error searching partial path in {search_dir}: {e}")
    
    # Log failure and directory structure for debugging
    logger.warning(f"Could not find image: {path}")
    logger.warning(f"Tried directories: {search_dirs}")
    if os.path.exists(IMAGES_BASE_DIR):
        try:
            logger.warning(f"Contents of {IMAGES_BASE_DIR}:")
            for item in os.listdir(IMAGES_BASE_DIR)[:10]:  # Limit to first 10 for brevity
                logger.warning(f"  {item}")
        except Exception as e:
            logger.warning(f"Could not list {IMAGES_BASE_DIR}: {e}")
    
    # Cache the original path as a fallback (will be marked as missing)
    image_path_cache[path] = path
    return path

def process_data():
    """Load and process the dataset"""
    try:
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        logger.info("Processing full dataset")
        return test_df
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def has_number(text):
    """Check if the text contains a number (digits or words)"""
    if not isinstance(text, str):
        return False
    # Check for digits
    if re.search(r'\d+', text):
        return True
    # Check for number words
    number_words = [
        'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine',
        'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen',
        'seventeen', 'eighteen', 'nineteen', 'twenty', 'thirty', 'forty', 'fifty',
        'sixty', 'seventy', 'eighty', 'ninety', 'hundred', 'thousand'
    ]
    words = text.lower().split()
    return any(word in number_words for word in words)

def get_answer(processor, model, image_path, question):
    """Generate a one-word answer using BLIP-2"""
    try:
        verified_path = verify_image_path(image_path)
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Missing"
        image = Image.open(verified_path).convert("RGB")
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=1,
                num_beams=3,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
            answer = answer.split()[0] if answer.strip() else "Unknown"
            if not answer or answer.strip() == question.strip():
                answer = "Unknown"
            return answer.strip()
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return "Error"

def compute_metrics(test_df):
    """Compute accuracy, precision, recall, F1, and BERTScore"""
    if 'answer' not in test_df.columns:
        logger.warning("Ground truth answers not available, skipping metric computation")
        return None
    # Exact match metrics
    test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
    accuracy = test_df['correct'].mean()
    # Precision, Recall, F1 for binary classification (correct/incorrect)
    y_true = test_df['correct'].astype(int)
    y_pred = test_df['correct'].astype(int)  # Using same for exact match
    precision, recall, f1, _ = precision_recall_f1_support(y_true, y_pred, average='binary', zero_division=0)
    # BERTScore
    preds = test_df['predicted_answer'].fillna("Unknown").tolist()
    refs = test_df['answer'].fillna("Unknown").tolist()
    P, R, F1 = score(preds, refs, lang="en", verbose=False)
    bert_scores = {
        'precision': P.mean().item(),
        'recall': R.mean().item(),
        'f1': F1.mean().item()
    }
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'bertscore_precision': bert_scores['precision'],
        'bertscore_recall': bert_scores['recall'],
        'bertscore_f1': bert_scores['f1']
    }
    return metrics

def main():
    try:
        processor, model = load_model()
        test_df = process_data()
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        predictions = []
        contains_numbers = []
        successes = 0
        failures = 0
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                answer = get_answer(processor, model, image_path, question)
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                predictions.append(answer)
                # Check for numbers in predicted or ground truth answer
                ground_truth = row.get('answer', '')
                has_num = has_number(answer) or (has_number(ground_truth) if ground_truth else False)
                contains_numbers.append(has_num)
                if answer not in ["Error", "Missing", "Unknown"]:
                    successes += 1
                else:
                    failures += 1
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                time.sleep(0.01)
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append("Error")
                contains_numbers.append(False)
                failures += 1
        test_df['predicted_answer'] = predictions
        test_df['contains_number'] = contains_numbers
        # Compute and log metrics
        metrics = compute_metrics(test_df)
        if metrics:
            logger.info("Evaluation Metrics:")
            logger.info(f"Accuracy: {metrics['accuracy']:.4f}")
            logger.info(f"Precision: {metrics['precision']:.4f}")
            logger.info(f"Recall: {metrics['recall']:.4f}")
            logger.info(f"F1-Score: {metrics['f1']:.4f}")
            logger.info(f"BERTScore Precision: {metrics['bertscore_precision']:.4f}")
            logger.info(f"BERTScore Recall: {metrics['bertscore_recall']:.4f}")
            logger.info(f"BERTScore F1: {metrics['bertscore_f1']:.4f}")
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
!pip install word2number bert-score scikit-learn

In [None]:
!pip install scikit-learn==1.3.0

In [None]:
!pip install word2number bert-score scikit-learn==1.3.0

In [None]:
!pip uninstall -y scikit-learn
!pip install scikit-learn==1.3.0

In [None]:
import sklearn
from sklearn.metrics import precision_recall_f1_support
print(sklearn.__version__)  # Should print 1.3.0 or similar
print(precision_recall_f1_support)  # Should print function reference

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # Suppress TensorFlow CUDA warnings

from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import logging
import warnings
import time
import re
try:
    from word2number import w2n
except ImportError:
    raise ImportError("Please install word2number: `pip install word2number`")
from bert_score import score
from sklearn.metrics import precision_recall_f1_support

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"
OUTPUT_PATH = Path("vqa_test_predictions5.csv")

# Image path cache
image_path_cache = {}

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    try:
        model_id = "Salesforce/blip2-opt-2.7b"
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Robustly verify and find the image path, with recursive search and case-insensitive matching"""
    global image_path_cache
    
    # Check cache first
    if path in image_path_cache:
        return image_path_cache[path]
    
    # Original path
    if os.path.exists(path):
        image_path_cache[path] = path
        return path
    
    logger.debug(f"Attempting to find image for path: {path}")
    
    # Extract filename and its lowercase version
    filename = os.path.basename(path)
    filename_lower = filename.lower()
    
    # Define search directories
    search_dirs = [
        IMAGES_BASE_DIR,
        "/kaggle/input/vrmini2/abo-images-small/images/small",
        "/kaggle/input/vrmini2/abo-images-small/images",
        "/kaggle/input/vrmini2",
        "images",
        "data/images",
        "input/images",
        "",
        "../images",
        "/kaggle/input/vrmini2/abo-images-small"
    ]
    
    # Try exact path without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            image_path_cache[path] = alt_path
            logger.debug(f"Found image at: {alt_path}")
            return alt_path
    
    # Recursive search in each directory
    for search_dir in search_dirs:
        if not os.path.exists(search_dir):
            logger.debug(f"Search directory does not exist: {search_dir}")
            continue
        try:
            # Use pathlib for recursive search
            for p in Path(search_dir).rglob(filename):
                if p.is_file():
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image at: {str(p)}")
                    return str(p)
            # Case-insensitive search
            for p in Path(search_dir).rglob("*"):
                if p.is_file() and p.name.lower() == filename_lower:
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image (case-insensitive) at: {str(p)}")
                    return str(p)
        except Exception as e:
            logger.debug(f"Error searching in {search_dir}: {e}")
    
    # Try partial path Futbolcomponents (e.g., last 1-3 subdirectories)
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):
        partial_path = os.path.join(*path_parts[-i:])
        for search_dir in search_dirs:
            alt_path = os.path.join(search_dir, partial_path)
            if os.path.exists(alt_path):
                image_path_cache[path] = alt_path
                logger.debug(f"Found image at partial path: {alt_path}")
                return alt_path
            # Case-insensitive check
            try:
                for p in Path(search_dir).rglob(os.path.basename(partial_path)):
                    if p.is_file() and p.name.lower() == os.path.basename(partial_path).lower():
                        image_path_cache[path] = str(p)
                        logger.debug(f"Found image at partial path (case-insensitive): {str(p)}")
                        return str(p)
            except Exception as e:
                logger.debug(f"Error searching partial path in {search_dir}: {e}")
    
    # Log failure and directory structure for debugging
    logger.warning(f"Could not find image: {path}")
    logger.warning(f"Tried directories: {search_dirs}")
    if os.path.exists(IMAGES_BASE_DIR):
        try:
            logger.warning(f"Contents of {IMAGES_BASE_DIR}:")
            for item in os.listdir(IMAGES_BASE_DIR)[:10]:  # Limit to first 10 for brevity
                logger.warning(f"  {item}")
        except Exception as e:
            logger.warning(f"Could not list {IMAGES_BASE_DIR}: {e}")
    
    # Cache the original path as a fallback (will be marked as missing)
    image_path_cache[path] = path
    return path

def process_data():
    """Load and process the dataset"""
    try:
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        logger.info("Processing full dataset")
        return test_df
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def has_number(text):
    """Check if the text contains a number (digits or words)"""
    if not isinstance(text, str):
        return False
    # Check for digits
    if re.search(r'\d+', text):
        return True
    # Check for number words
    number_words = [
        'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine',
        'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen',
        'seventeen', 'eighteen', 'nineteen', 'twenty', 'thirty', 'forty', 'fifty',
        'sixty', 'seventy', 'eighty', 'ninety', 'hundred', 'thousand'
    ]
    words = text.lower().split()
    return any(word in number_words for word in words)

def get_answer(processor, model, image_path, question):
    """Generate a one-word answer using BLIP-2"""
    try:
        verified_path = verify_image_path(image_path)
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Missing"
        image = Image.open(verified_path).convert("RGB")
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=1,
                num_beams=3,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
            answer = answer.split()[0] if answer.strip() else "Unknown"
            if not answer or answer.strip() == question.strip():
                answer = "Unknown"
            return answer.strip()
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return "Error"

def compute_metrics(test_df):
    """Compute accuracy, precision, recall, F1, and BERTScore"""
    if 'answer' not in test_df.columns:
        logger.warning("Ground truth answers not available, skipping metric computation")
        return None
    # Exact match metrics
    test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
    accuracy = test_df['correct'].mean()
    # Precision, Recall, F1 for binary classification (correct/incorrect)
    y_true = test_df['correct'].astype(int)
    y_pred = test_df['correct'].astype(int)  # Using same for exact match
    precision, recall, f1, _ = precision_recall_f1_support(y_true, y_pred, average='binary', zero_division=0)
    # BERTScore
    preds = test_df['predicted_answer'].fillna("Unknown").tolist()
    refs = test_df['answer'].fillna("Unknown").tolist()
    P, R, F1 = score(preds, refs, lang="en", verbose=False)
    bert_scores = {
        'precision': P.mean().item(),
        'recall': R.mean().item(),
        'f1': F1.mean().item()
    }
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'bertscore_precision': bert_scores['precision'],
        'bertscore_recall': bert_scores['recall'],
        'bertscore_f1': bert_scores['f1']
    }
    return metrics

def main():
    try:
        processor, model = load_model()
        test_df = process_data()
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        predictions = []
        contains_numbers = []
        successes = 0
        failures = 0
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                answer = get_answer(processor, model, image_path, question)
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                predictions.append(answer)
                # Check for numbers in predicted or ground truth answer
                ground_truth = row.get('answer', '')
                has_num = has_number(answer) or (has_number(ground_truth) if ground_truth else False)
                contains_numbers.append(has_num)
                if answer not in ["Error", "Missing", "Unknown"]:
                    successes += 1
                else:
                    failures += 1
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                time.sleep(0.01)
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append("Error")
                contains_numbers.append(False)
                failures += 1
        test_df['predicted_answer'] = predictions
        test_df['contains_number'] = contains_numbers
        # Compute and log metrics
        metrics = compute_metrics(test_df)
        if metrics:
            logger.info("Evaluation Metrics:")
            logger.info(f"Accuracy: {metrics['accuracy']:.4f}")
            logger.info(f"Precision: {metrics['precision']:.4f}")
            logger.info(f"Recall: {metrics['recall']:.4f}")
            logger.info(f"F1-Score: {metrics['f1']:.4f}")
            logger.info(f"BERTScore Precision: {metrics['bertscore_precision']:.4f}")
            logger.info(f"BERTScore Recall: {metrics['bertscore_recall']:.4f}")
            logger.info(f"BERTScore F1: {metrics['bertscore_f1']:.4f}")
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
!pip uninstall -y scikit-learn
!pip install scikit-learn==1.3.0

In [None]:
import sklearn
print(sklearn.__version__)  # Should print 1.3.0
!pip list | grep scikit-learn

In [None]:
!pip cache purge

In [None]:
!pip install word2number bert-score numpy

In [None]:
import word2number
print("word2number imported successfully")

In [None]:
!pip install word2number bert-score numpy

In [None]:
import bert_score
import numpy
print(bert_score.__version__)  # Should print version, e.g., 0.3.13
print(numpy.__version__)  # Should print version, e.g., 1.26.4
print("word2number imported successfully")

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # Suppress TensorFlow CUDA warnings

from transformers import Blip2Processor, Blip2ForConditionalGeneration
from PIL import Image
import torch
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import logging
import warnings
import time
import re
try:
    from word2number import w2n
except ImportError:
    raise ImportError("Please install word2number: `pip install word2number`")
from bert_score import score
#from sklearn.metrics import precision_recall_f1_support

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Configure paths
TRAIN_DATA_PATH = "/kaggle/input/vr1234/train_split.csv"
TEST_DATA_PATH = "/kaggle/input/vr1234/test_split.csv"
IMAGES_BASE_DIR = "/kaggle/input/vrmini2/abo-images-small/images/small/"
OUTPUT_PATH = Path("vqa_test_predictions7.csv")

# Image path cache
image_path_cache = {}

# Set device
if torch.cuda.is_available():
    device = torch.device("cuda")
    logger.info(f"Using CUDA device: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    logger.info("Using MPS device")
else:
    device = torch.device("cpu")
    logger.info("Using CPU")

def load_model():
    """Load a BLIP-2 model for VQA"""
    logger.info("Loading BLIP-2 model")
    try:
        model_id = "Salesforce/blip2-opt-2.7b"
        processor = Blip2Processor.from_pretrained(model_id)
        model = Blip2ForConditionalGeneration.from_pretrained(
            model_id,
            torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
        ).to(device)
        model.eval()
        logger.info("Successfully loaded BLIP-2 model")
        return processor, model
    except Exception as e:
        logger.error(f"Failed to load BLIP-2 model: {e}")
        raise RuntimeError("Model loading failed")

def verify_image_path(path):
    """Robustly verify and find the image path, with recursive search and case-insensitive matching"""
    global image_path_cache
    
    # Check cache first
    if path in image_path_cache:
        return image_path_cache[path]
    
    # Original path
    if os.path.exists(path):
        image_path_cache[path] = path
        return path
    
    logger.debug(f"Attempting to find image for path: {path}")
    
    # Extract filename and its lowercase version
    filename = os.path.basename(path)
    filename_lower = filename.lower()
    
    # Define search directories
    search_dirs = [
        IMAGES_BASE_DIR,
        "/kaggle/input/vrmini2/abo-images-small/images/small",
        "/kaggle/input/vrmini2/abo-images-small/images",
        "/kaggle/input/vrmini2",
        "images",
        "data/images",
        "input/images",
        "",
        "../images",
        "/kaggle/input/vrmini2/abo-images-small"
    ]
    
    # Try exact path without /kaggle prefix
    if path.startswith('/kaggle'):
        alt_path = path.replace('/kaggle', '', 1)
        if os.path.exists(alt_path):
            image_path_cache[path] = alt_path
            logger.debug(f"Found image at: {alt_path}")
            return alt_path
    
    # Recursive search in each directory
    for search_dir in search_dirs:
        if not os.path.exists(search_dir):
            logger.debug(f"Search directory does not exist: {search_dir}")
            continue
        try:
            # Use pathlib for recursive search
            for p in Path(search_dir).rglob(filename):
                if p.is_file():
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image at: {str(p)}")
                    return str(p)
            # Case-insensitive search
            for p in Path(search_dir).rglob("*"):
                if p.is_file() and p.name.lower() == filename_lower:
                    image_path_cache[path] = str(p)
                    logger.debug(f"Found image (case-insensitive) at: {str(p)}")
                    return str(p)
        except Exception as e:
            logger.debug(f"Error searching in {search_dir}: {e}")
    
    # Try partial path Futbolcomponents (e.g., last 1-3 subdirectories)
    path_parts = Path(path).parts
    for i in range(1, min(4, len(path_parts))):
        partial_path = os.path.join(*path_parts[-i:])
        for search_dir in search_dirs:
            alt_path = os.path.join(search_dir, partial_path)
            if os.path.exists(alt_path):
                image_path_cache[path] = alt_path
                logger.debug(f"Found image at partial path: {alt_path}")
                return alt_path
            # Case-insensitive check
            try:
                for p in Path(search_dir).rglob(os.path.basename(partial_path)):
                    if p.is_file() and p.name.lower() == os.path.basename(partial_path).lower():
                        image_path_cache[path] = str(p)
                        logger.debug(f"Found image at partial path (case-insensitive): {str(p)}")
                        return str(p)
            except Exception as e:
                logger.debug(f"Error searching partial path in {search_dir}: {e}")
    
    # Log failure and directory structure for debugging
    logger.warning(f"Could not find image: {path}")
    logger.warning(f"Tried directories: {search_dirs}")
    if os.path.exists(IMAGES_BASE_DIR):
        try:
            logger.warning(f"Contents of {IMAGES_BASE_DIR}:")
            for item in os.listdir(IMAGES_BASE_DIR)[:10]:  # Limit to first 10 for brevity
                logger.warning(f"  {item}")
        except Exception as e:
            logger.warning(f"Could not list {IMAGES_BASE_DIR}: {e}")
    
    # Cache the original path as a fallback (will be marked as missing)
    image_path_cache[path] = path
    return path

def process_data():
    """Load and process the dataset"""
    try:
        logger.info(f"Loading test data from: {TEST_DATA_PATH}")
        test_df = pd.read_csv(TEST_DATA_PATH)
        logger.info(f"Loaded test data with {len(test_df)} samples")
        required_cols = ['full_path', 'question']
        missing_cols = [col for col in required_cols if col not in test_df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        logger.info("Sample paths from dataset:")
        for i, path in enumerate(test_df['full_path'].iloc[:5]):
            logger.info(f"Sample {i}: {path}")
        logger.info("Processing full dataset")
        return test_df
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        raise

def has_number(text):
    """Check if the text contains a number (digits or words)"""
    if not isinstance(text, str):
        return False
    # Check for digits
    if re.search(r'\d+', text):
        return True
    # Check for number words
    number_words = [
        'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine',
        'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen',
        'seventeen', 'eighteen', 'nineteen', 'twenty', 'thirty', 'forty', 'fifty',
        'sixty', 'seventy', 'eighty', 'ninety', 'hundred', 'thousand'
    ]
    words = text.lower().split()
    return any(word in number_words for word in words)

def get_answer(processor, model, image_path, question):
    """Generate a one-word answer using BLIP-2"""
    try:
        verified_path = verify_image_path(image_path)
        if not os.path.exists(verified_path):
            logger.warning(f"Image not found: {verified_path}")
            return "Missing"
        image = Image.open(verified_path).convert("RGB")
        prompt = f"Question: {question} Answer:"
        inputs = processor(images=image, text=prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=1,
                num_beams=3,
                early_stopping=True
            )
            answer = processor.decode(outputs[0], skip_special_tokens=True)
            if prompt in answer:
                answer = answer.split(prompt)[1].strip()
            answer = answer.split()[0] if answer.strip() else "Unknown"
            if not answer or answer.strip() == question.strip():
                answer = "Unknown"
            return answer.strip()
    except Exception as e:
        logger.error(f"Error processing {image_path}: {e}")
        return "Error"

def compute_metrics(test_df):
    """Compute accuracy, precision, recall, F1, and BERTScore"""
    if 'answer' not in test_df.columns:
        logger.warning("Ground truth answers not available, skipping metric computation")
        return None
    # Exact match metrics
    test_df['correct'] = test_df['predicted_answer'].str.lower().str.strip() == test_df['answer'].str.lower().str.strip()
    accuracy = test_df['correct'].mean()
    # Precision, Recall, F1 for binary classification (correct/incorrect)
    y_true = test_df['correct'].astype(int)
    y_pred = test_df['correct'].astype(int)  # Using same for exact match
    #precision, recall, f1, _ = precision_recall_f1_support(y_true, y_pred, average='binary', zero_division=0)
    # BERTScore
    preds = test_df['predicted_answer'].fillna("Unknown").tolist()
    refs = test_df['answer'].fillna("Unknown").tolist()
    P, R, F1 = score(preds, refs, lang="en", verbose=False)
    bert_scores = {
        'precision': P.mean().item(),
        'recall': R.mean().item(),
        'f1': F1.mean().item()
    }
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'bertscore_precision': bert_scores['precision'],
        'bertscore_recall': bert_scores['recall'],
        'bertscore_f1': bert_scores['f1']
    }
    return metrics

def main():
    try:
        processor, model = load_model()
        test_df = process_data()
        logger.info(f"Image base directory: {IMAGES_BASE_DIR}")
        if os.path.exists(IMAGES_BASE_DIR):
            logger.info(f"Image directory exists and contains {len(os.listdir(IMAGES_BASE_DIR))} files/directories")
        else:
            logger.warning(f"Image directory does not exist: {IMAGES_BASE_DIR}")
        predictions = []
        contains_numbers = []
        successes = 0
        failures = 0
        for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Running VQA with BLIP-2"):
            try:
                image_path = row['full_path']
                question = row['question']
                if idx < 5:
                    logger.info(f"Processing sample {idx}")
                    logger.info(f"Original image path: {image_path}")
                    verified_path = verify_image_path(image_path)
                    logger.info(f"Verified image path: {verified_path}")
                    logger.info(f"Image exists: {os.path.exists(verified_path)}")
                    logger.info(f"Question: {question}")
                answer = get_answer(processor, model, image_path, question)
                if idx < 5:
                    logger.info(f"Generated answer: {answer}")
                predictions.append(answer)
                # Check for numbers in predicted or ground truth answer
                ground_truth = row.get('answer', '')
                has_num = has_number(answer) or (has_number(ground_truth) if ground_truth else False)
                contains_numbers.append(has_num)
                if answer not in ["Error", "Missing", "Unknown"]:
                    successes += 1
                else:
                    failures += 1
                if idx % 20 == 0 and idx > 0:
                    logger.info(f"Processed {idx}/{len(test_df)} samples - Success: {successes}, Failures: {failures}")
                time.sleep(0.01)
            except Exception as e:
                logger.error(f"Failed on sample {idx}: {e}")
                predictions.append("Error")
                contains_numbers.append(False)
                failures += 1
        test_df['predicted_answer'] = predictions
        test_df['contains_number'] = contains_numbers
        # Compute and log metrics
        metrics = compute_metrics(test_df)
        if metrics:
            logger.info("Evaluation Metrics:")
            logger.info(f"Accuracy: {metrics['accuracy']:.4f}")
            logger.info(f"Precision: {metrics['precision']:.4f}")
            logger.info(f"Recall: {metrics['recall']:.4f}")
            logger.info(f"F1-Score: {metrics['f1']:.4f}")
            logger.info(f"BERTScore Precision: {metrics['bertscore_precision']:.4f}")
            logger.info(f"BERTScore Recall: {metrics['bertscore_recall']:.4f}")
            logger.info(f"BERTScore F1: {metrics['bertscore_f1']:.4f}")
        test_df.to_csv(OUTPUT_PATH, index=False)
        logger.info(f"Predictions saved to {OUTPUT_PATH}")
        logger.info(f"Successfully processed: {successes}, Failed: {failures}")
    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise

if __name__ == "__main__":
    main()