<a href="https://colab.research.google.com/github/MeenakshiRajpurohit/CMPE-252-AI-and-Data-Engineering/blob/main/VQA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#pip install --upgrade transformers

In [None]:
# Install required packages
!pip install -q datasets torch torchvision
!pip install -q accelerate bitsandbytes peft
!pip install -q sentencepiece pillow tqdm
!pip install -q evaluate rouge-score
!pip install -q matplotlib seaborn

print("✓ All packages installed successfully!")

In [None]:
import os
import json
import torch
import requests
import numpy as np
import pandas as pd
from PIL import Image
from io import BytesIO
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

from transformers import (
    AutoProcessor,
    #AutoModelForVision2Seq,
    VisionEncoderDecoderModel,
    DonutProcessor,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
)

from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training,
)

from datasets import Dataset, load_dataset

# Check GPU
print(f"GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

In [None]:
# Load dataset
from datasets import load_dataset

try:
    # Try loading from HuggingFace
    dataset = load_dataset("sujet-ai/Sujet-Finance-QA-Vision-100k", split="train")
    print(f"✓ Loaded {len(dataset)} samples from HuggingFace")
except:
    # Fallback: Load via API
    print("Loading via API...")
    url = "https://datasets-server.huggingface.co/rows?dataset=sujet-ai%2FSujet-Finance-QA-Vision-100k&config=default&split=train&offset=0&length=1000"
    response = requests.get(url)
    data = response.json()
    rows = [row['row'] for row in data.get('rows', [])]
    dataset = Dataset.from_list(rows)
    print(f"✓ Loaded {len(dataset)} samples via API")

In [None]:
# Explore dataset structure
print("Dataset columns:", dataset.column_names)
print("\nFirst example:")
print(json.dumps(dataset[0], indent=2, default=str))

In [None]:
# Visualize sample
def visualize_sample(example):
    """Display a sample from the dataset"""
    # Load image
    if isinstance(example['image'], str):
        response = requests.get(example['image'])
        image = Image.open(BytesIO(response.content))
    else:
        image = example['image']

    # Display
    plt.figure(figsize=(10, 6))
    plt.imshow(image)
    plt.axis('off')
    plt.title(f"Question: {example.get('question', example.get('query', 'N/A'))}")
    plt.show()

    print(f"Answer: {example.get('answer', example.get('response', 'N/A'))}")

# Show first 3 samples
for i in range(min(3, len(dataset))):
    print(f"\n{'='*80}")
    print(f"Sample {i+1}")
    print(f"{'='*80}")
    visualize_sample(dataset[i])

In [None]:
# Split dataset
train_test_split = dataset.train_test_split(test_size=0.1, seed=42)
train_dataset = train_test_split['train']
test_dataset = train_test_split['test']

# Further split test into validation and test
val_test_split = test_dataset.train_test_split(test_size=0.5, seed=42)
val_dataset = val_test_split['train']
test_dataset = val_test_split['test']

print(f"Train: {len(train_dataset)} samples")
print(f"Validation: {len(val_dataset)} samples")
print(f"Test: {len(test_dataset)} samples")

In [None]:
# Training configuration
CONFIG = {
    # Model selection (change this to train different models)
    'model_type': 'donut',  # Options: 'donut', 'blip', 'pix2struct'

    # Data configuration
    'num_train_samples': 500,  # Reduce for testing, increase for full training
    'num_val_samples': 100,

    # Training hyperparameters
    'epochs': 3,
    'batch_size': 2,  # Reduce if OOM
    'gradient_accumulation_steps': 8,
    'learning_rate': 5e-5,
    'warmup_steps': 100,

    # QLoRA hyperparameters
    'lora_r': 16,
    'lora_alpha': 32,
    'lora_dropout': 0.05,

    # Output
    'output_dir': f'./finance-vqa-qlora',
}

print("Configuration:")
print(json.dumps(CONFIG, indent=2))

In [None]:
# Select processor based on model type
if CONFIG['model_type'] == 'donut':
    model_name = "naver-clova-ix/donut-base"
    processor = DonutProcessor.from_pretrained(model_name)

elif CONFIG['model_type'] == 'blip':
    model_name = "Salesforce/blip-vqa-base"
    processor = AutoProcessor.from_pretrained(model_name)

elif CONFIG['model_type'] == 'pix2struct':
    model_name = "google/pix2struct-docvqa-base"
    processor = AutoProcessor.from_pretrained(model_name)

print(f"✓ Loaded processor for {model_name}")

In [None]:
def process_example_donut(example):
    """Process example for Donut model"""
    # Load image
    if isinstance(example['image'], str):
        response = requests.get(example['image'])
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = example['image']

    question = example.get('question', example.get('query', ''))
    answer = example.get('answer', example.get('response', ''))

    # Format for Donut
    prompt = f"<s_docvqa><s_question>{question}</s_question><s_answer>"
    target = f"{answer}</s_answer></s_docvqa>"

    # Encode
    pixel_values = processor(image, return_tensors="pt").pixel_values

    decoder_input_ids = processor.tokenizer(
        prompt,
        add_special_tokens=False,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    ).input_ids

    labels = processor.tokenizer(
        target,
        add_special_tokens=False,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    ).input_ids

    labels[labels == processor.tokenizer.pad_token_id] = -100

    return {
        'pixel_values': pixel_values.squeeze(),
        'decoder_input_ids': decoder_input_ids.squeeze(),
        'labels': labels.squeeze()
    }

def process_example_blip(example):
    """Process example for BLIP model"""
    if isinstance(example['image'], str):
        response = requests.get(example['image'])
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = example['image']

    question = example.get('question', example.get('query', ''))
    answer = example.get('answer', example.get('response', ''))

    encoding = processor(
        images=image,
        text=question,
        padding="max_length",
        truncation=True,
        max_length=128,
        return_tensors="pt"
    )

    labels = processor.tokenizer(
        answer,
        padding="max_length",
        truncation=True,
        max_length=128,
        return_tensors="pt"
    ).input_ids

    labels[labels == processor.tokenizer.pad_token_id] = -100

    return {
        'pixel_values': encoding['pixel_values'].squeeze(),
        'input_ids': encoding['input_ids'].squeeze(),
        'attention_mask': encoding['attention_mask'].squeeze(),
        'labels': labels.squeeze()
    }

def process_example_pix2struct(example):
    """Process example for Pix2Struct model"""
    if isinstance(example['image'], str):
        response = requests.get(example['image'])
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = example['image']

    question = example.get('question', example.get('query', ''))
    answer = example.get('answer', example.get('response', ''))

    encoding = processor(
        images=image,
        text=question,
        return_tensors="pt",
        max_patches=2048
    )

    labels = processor.tokenizer(
        answer,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    ).input_ids

    labels[labels == processor.tokenizer.pad_token_id] = -100

    return {
        'flattened_patches': encoding['flattened_patches'].squeeze(),
        'attention_mask': encoding['attention_mask'].squeeze(),
        'labels': labels.squeeze()
    }

# Select processing function
if CONFIG['model_type'] == 'donut':
    process_fn = process_example_donut
elif CONFIG['model_type'] == 'blip':
    process_fn = process_example_blip
else:
    process_fn = process_example_pix2struct

this below one cell is extra addiytion


In [None]:
# Use .map() instead of a for-loop to keep RAM usage near zero
print("Processing training data...")
train_data = train_dataset.select(range(CONFIG['num_train_samples'])).map(
    process_fn,
    remove_columns=train_dataset.column_names, # Clears old data to save space
    desc="Mapping train data"
)

print("\nProcessing validation data...")
val_data = val_dataset.select(range(CONFIG['num_val_samples'])).map(
    process_fn,
    remove_columns=val_dataset.column_names,
    desc="Mapping val data"
)

print(f"\n✓ Processed {len(train_data)} training samples")
print(f"✓ Processed {len(val_data)} validation samples")

In [None]:
from tqdm.auto import tqdm

# Process datasets
print("Processing training data...")
processed_train = []
for example in tqdm(train_dataset.select(range(CONFIG['num_train_samples']))):
    try:
        processed_train.append(process_fn(example))
    except Exception as e:
        print(f"Error: {e}")
        continue

print("\nProcessing validation data...")
processed_val = []
for example in tqdm(val_dataset.select(range(CONFIG['num_val_samples']))):
    try:
        processed_val.append(process_fn(example))
    except Exception as e:
        print(f"Error: {e}")
        continue

train_data = Dataset.from_list(processed_train)
val_data = Dataset.from_list(processed_val)

print(f"\n✓ Processed {len(train_data)} training samples")
print(f"✓ Processed {len(val_data)} validation samples")

In [None]:
# BitsAndBytes config for 4-bit quantization
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True
)

# LoRA config
lora_config = LoraConfig(
    r=CONFIG['lora_r'],
    lora_alpha=CONFIG['lora_alpha'],
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],
    lora_dropout=CONFIG['lora_dropout'],
    bias="none",
    task_type="SEQ_2_SEQ_LM"
)

print("✓ QLoRA configuration ready")

In [None]:
# Load model
print(f"Loading {model_name} with 4-bit quantization...")

if CONFIG['model_type'] == 'donut':
    model = VisionEncoderDecoderModel.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto"
    )
else:
    model = AutoModelForVision2Seq.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto"
    )

# Prepare for k-bit training
model = prepare_model_for_kbit_training(model)

# Add LoRA adapters
model = get_peft_model(model, lora_config)

# Print trainable parameters
model.print_trainable_parameters()

print("✓ Model loaded successfully!")

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir=CONFIG['output_dir'],
    num_train_epochs=CONFIG['epochs'],
    per_device_train_batch_size=CONFIG['batch_size'],
    per_device_eval_batch_size=CONFIG['batch_size'],
    gradient_accumulation_steps=CONFIG['gradient_accumulation_steps'],
    learning_rate=CONFIG['learning_rate'],
    weight_decay=0.01,
    warmup_steps=CONFIG['warmup_steps'],
    logging_steps=10,
    save_steps=200,
    eval_steps=200,
    evaluation_strategy="steps",
    save_total_limit=2,
    fp16=True,
    dataloader_num_workers=2,
    remove_unused_columns=False,
    report_to="none",
    load_best_model_at_end=True,
)

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_data,
    eval_dataset=val_data,
)

print("✓ Trainer initialized")

In [None]:
# Start training
print("\n" + "="*80)
print(f"STARTING TRAINING: {CONFIG['model_type'].upper()}")
print("="*80 + "\n")

trainer.train()

print("\n✓ Training complete!")

In [None]:
# Save model
trainer.save_model(CONFIG['output_dir'])
processor.save_pretrained(CONFIG['output_dir'])

print(f"✓ Model saved to {CONFIG['output_dir']}")

In [None]:
# Test inference on a sample
def test_inference(model, processor, example):
    """Test model on a single example"""
    model.eval()

    # Load image
    if isinstance(example['image'], str):
        response = requests.get(example['image'])
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = example['image']

    question = example.get('question', example.get('query', ''))

    if CONFIG['model_type'] == 'donut':
        prompt = f"<s_docvqa><s_question>{question}</s_question><s_answer>"
        pixel_values = processor(image, return_tensors="pt").pixel_values.to(model.device)
        decoder_input_ids = processor.tokenizer(
            prompt,
            add_special_tokens=False,
            return_tensors="pt"
        ).input_ids.to(model.device)

        with torch.no_grad():
            outputs = model.generate(
                pixel_values,
                decoder_input_ids=decoder_input_ids,
                max_length=512,
                early_stopping=True,
            )

        answer = processor.batch_decode(outputs)[0]
        answer = answer.split("<s_answer>")[-1].split("</s_answer>")[0].strip()

    else:
        inputs = processor(
            images=image,
            text=question,
            return_tensors="pt"
        ).to(model.device)

        with torch.no_grad():
            outputs = model.generate(**inputs, max_length=128)

        answer = processor.decode(outputs[0], skip_special_tokens=True)

    return answer

# Test on first 5 samples
print("Testing inference on sample data...\n")
for i in range(min(5, len(test_dataset))):
    example = test_dataset[i]
    prediction = test_inference(model, processor, example)
    ground_truth = example.get('answer', example.get('response', ''))

    print(f"{'='*80}")
    print(f"Sample {i+1}")
    print(f"Question: {example.get('question', example.get('query', ''))}")
    print(f"Predicted: {prediction}")
    print(f"Ground Truth: {ground_truth}")
    print(f"Match: {prediction.strip().lower() == ground_truth.strip().lower()}")
    print()

In [None]:
# Comprehensive evaluation
def evaluate_model(model, processor, test_data, num_samples=100):
    """Evaluate model on test set"""
    predictions = []
    references = []

    print(f"Evaluating on {num_samples} samples...\n")

    for example in tqdm(test_data.select(range(min(num_samples, len(test_data))))):
        try:
            pred = test_inference(model, processor, example)
            ref = example.get('answer', example.get('response', ''))

            predictions.append(pred)
            references.append(ref)
        except:
            continue

    # Calculate metrics
    exact_matches = sum(
        p.strip().lower() == r.strip().lower()
        for p, r in zip(predictions, references)
    )

    accuracy = exact_matches / len(predictions) if predictions else 0

    # F1 score
    f1_scores = []
    for pred, ref in zip(predictions, references):
        pred_tokens = set(pred.lower().split())
        ref_tokens = set(ref.lower().split())

        if not ref_tokens:
            continue

        common = pred_tokens & ref_tokens
        if not common:
            f1_scores.append(0)
            continue

        precision = len(common) / len(pred_tokens) if pred_tokens else 0
        recall = len(common) / len(ref_tokens)

        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        f1_scores.append(f1)

    avg_f1 = np.mean(f1_scores) if f1_scores else 0

    results = {
        'model': CONFIG['model_type'],
        'num_samples': len(predictions),
        'exact_match': accuracy,
        'f1_score': avg_f1,
    }

    return results, predictions, references

# Run evaluation
results, preds, refs = evaluate_model(model, processor, test_dataset, num_samples=100)

print("\n" + "="*80)
print("EVALUATION RESULTS")
print("="*80)
print(f"Model: {results['model']}")
print(f"Samples: {results['num_samples']}")
print(f"Exact Match: {results['exact_match']:.2%}")
print(f"F1 Score: {results['f1_score']:.4f}")
print("="*80)

In [None]:
# Save evaluation results
results_df = pd.DataFrame([results])
results_df.to_csv(f"{CONFIG['output_dir']}/evaluation_results.csv", index=False)

# Save predictions
predictions_data = [
    {
        'prediction': p,
        'reference': r,
        'correct': p.strip().lower() == r.strip().lower()
    }
    for p, r in zip(preds, refs)
]

with open(f"{CONFIG['output_dir']}/predictions.json", 'w') as f:
    json.dump(predictions_data, f, indent=2)

print(f"✓ Results saved to {CONFIG['output_dir']}/")

In [None]:
# Optional: Run experiments with different hyperparameters
HYPERPARAMETER_GRID = {
    'lora_r': [8, 16, 32],
    'lora_alpha': [16, 32, 64],
    'learning_rate': [3e-5, 5e-5, 1e-4],
}

# Uncomment to run grid search (time-consuming)
# experiment_results = []
#
# for r in HYPERPARAMETER_GRID['lora_r']:
#     for alpha in HYPERPARAMETER_GRID['lora_alpha']:
#         for lr in HYPERPARAMETER_GRID['learning_rate']:
#             print(f"\nExperiment: r={r}, alpha={alpha}, lr={lr}")
#             # Run training and evaluation
#             # Store results
#             pass

In [None]:
# Option 1: Download as ZIP
!zip -r finance-vqa-model.zip {CONFIG['output_dir']}
from google.colab import files
files.download('finance-vqa-model.zip')

# Option 2: Save to Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# !cp -r {CONFIG['output_dir']} /content/drive/MyDrive/

In [None]:
!python test_model.py