# Financial Document VLM Fine-tuning Module
## Qwen2-VL 7B Fine-tuning for Document Understanding

This notebook provides a comprehensive pipeline for:
1. Processing PDF financial documents
2. Loading and preparing Qwen2-VL 7B model
3. Data preprocessing and validation
4. Fine-tuning with LoRA/QLoRA
5. Model evaluation and saving

Designed for use on Runpod or other GPU-enabled environments.

## 1. Installation and Imports

In [None]:
# Install required packages
!pip install -q torch torchvision transformers accelerate bitsandbytes
!pip install -q peft datasets pillow pdf2image pypdf2
!pip install -q qwen-vl-utils evaluate scikit-learn
!pip install -q sentencepiece protobuf

In [None]:
import os
import json
import torch
import numpy as np
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# PDF and Image Processing
from pdf2image import convert_from_path
from PIL import Image
import PyPDF2

# Model and Training
from transformers import (
    Qwen2VLForConditionalGeneration,
    AutoProcessor,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    BitsAndBytesConfig
)
from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training,
    TaskType
)
from torch.utils.data import Dataset, DataLoader
from datasets import Dataset as HFDataset

# Evaluation
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import evaluate

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

## 2. Configuration and Data Classes

In [None]:
@dataclass
class FineTuningConfig:
    """Configuration for fine-tuning pipeline"""
    
    # Model configuration
    model_name: str = "Qwen/Qwen2-VL-7B-Instruct"
    use_quantization: bool = True
    load_in_4bit: bool = True
    load_in_8bit: bool = False
    
    # LoRA configuration
    lora_r: int = 16
    lora_alpha: int = 32
    lora_dropout: float = 0.05
    lora_target_modules: List[str] = None
    
    # Training configuration
    output_dir: str = "./finetuned_qwen2vl"
    num_train_epochs: int = 3
    per_device_train_batch_size: int = 1
    per_device_eval_batch_size: int = 1
    gradient_accumulation_steps: int = 4
    learning_rate: float = 2e-4
    max_grad_norm: float = 0.3
    warmup_ratio: float = 0.03
    lr_scheduler_type: str = "cosine"
    
    # Image processing
    max_image_size: Tuple[int, int] = (1024, 1024)
    dpi: int = 200
    
    # Data configuration
    train_split: float = 0.8
    eval_split: float = 0.1
    test_split: float = 0.1
    max_length: int = 512
    
    # Logging and checkpointing
    logging_steps: int = 10
    save_steps: int = 100
    eval_steps: int = 100
    save_total_limit: int = 3
    
    def __post_init__(self):
        if self.lora_target_modules is None:
            # Target attention and MLP layers for Qwen2-VL
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj",
                "gate_proj", "up_proj", "down_proj"
            ]

# Initialize configuration
config = FineTuningConfig()
print("Configuration initialized:")
print(json.dumps(config.__dict__, indent=2, default=str))

## 3. PDF Processing and Image Extraction

In [None]:
class PDFProcessor:
    """Process PDF documents and extract images"""
    
    def __init__(self, dpi: int = 200, max_image_size: Tuple[int, int] = (1024, 1024)):
        self.dpi = dpi
        self.max_image_size = max_image_size
    
    def validate_pdf(self, pdf_path: str) -> Dict[str, Any]:
        """Validate PDF and extract metadata"""
        try:
            with open(pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                num_pages = len(pdf_reader.pages)
                metadata = pdf_reader.metadata
                
                return {
                    'valid': True,
                    'num_pages': num_pages,
                    'metadata': metadata,
                    'file_size': os.path.getsize(pdf_path)
                }
        except Exception as e:
            return {
                'valid': False,
                'error': str(e)
            }
    
    def pdf_to_images(self, pdf_path: str, output_dir: Optional[str] = None) -> List[Image.Image]:
        """Convert PDF pages to images"""
        try:
            # Validate PDF first
            validation = self.validate_pdf(pdf_path)
            if not validation['valid']:
                raise ValueError(f"Invalid PDF: {validation['error']}")
            
            # Convert to images
            images = convert_from_path(
                pdf_path,
                dpi=self.dpi,
                fmt='png'
            )
            
            # Resize if needed
            processed_images = []
            for idx, img in enumerate(images):
                # Resize maintaining aspect ratio
                img.thumbnail(self.max_image_size, Image.Resampling.LANCZOS)
                processed_images.append(img)
                
                # Optionally save to disk
                if output_dir:
                    os.makedirs(output_dir, exist_ok=True)
                    img_path = os.path.join(output_dir, f"page_{idx+1}.png")
                    img.save(img_path, 'PNG')
            
            print(f"✓ Converted {len(processed_images)} pages from {pdf_path}")
            return processed_images
            
        except Exception as e:
            print(f"✗ Error processing PDF {pdf_path}: {e}")
            raise
    
    def batch_process_pdfs(self, pdf_paths: List[str], output_base_dir: str = "./processed_pdfs") -> Dict[str, List[Image.Image]]:
        """Process multiple PDFs"""
        results = {}
        
        for pdf_path in pdf_paths:
            pdf_name = Path(pdf_path).stem
            output_dir = os.path.join(output_base_dir, pdf_name)
            
            try:
                images = self.pdf_to_images(pdf_path, output_dir)
                results[pdf_name] = images
            except Exception as e:
                print(f"Failed to process {pdf_path}: {e}")
                results[pdf_name] = []
        
        return results

# Test the PDF processor
pdf_processor = PDFProcessor(dpi=config.dpi, max_image_size=config.max_image_size)
print("PDF Processor initialized successfully")

## 4. Data Preprocessing and Validation

In [None]:
class FinancialDocumentDataset:
    """Dataset handler for financial documents with key field extraction"""
    
    def __init__(self, config: FineTuningConfig):
        self.config = config
        self.data = []
        self.key_fields_schema = None
    
    def load_key_fields_schema(self, schema_path: str) -> Dict:
        """Load JSON schema for key fields"""
        try:
            with open(schema_path, 'r') as f:
                self.key_fields_schema = json.load(f)
            
            print(f"✓ Loaded key fields schema with {len(self.key_fields_schema.get('fields', []))} fields")
            return self.key_fields_schema
        except Exception as e:
            print(f"✗ Error loading schema: {e}")
            raise
    
    def validate_annotations(self, annotations: Dict) -> Tuple[bool, List[str]]:
        """Validate annotations against schema"""
        errors = []
        
        if not self.key_fields_schema:
            errors.append("No schema loaded")
            return False, errors
        
        required_fields = [f['name'] for f in self.key_fields_schema.get('fields', []) if f.get('required', False)]
        
        # Check required fields
        for field in required_fields:
            if field not in annotations:
                errors.append(f"Missing required field: {field}")
        
        # Check field types
        for field_def in self.key_fields_schema.get('fields', []):
            field_name = field_def['name']
            if field_name in annotations:
                expected_type = field_def.get('type', 'string')
                value = annotations[field_name]
                
                if expected_type == 'number' and not isinstance(value, (int, float)):
                    try:
                        float(value)
                    except:
                        errors.append(f"Field {field_name} should be numeric")
        
        return len(errors) == 0, errors
    
    def add_sample(self, image: Image.Image, annotations: Dict, metadata: Optional[Dict] = None):
        """Add a training sample"""
        # Validate annotations
        is_valid, errors = self.validate_annotations(annotations)
        if not is_valid:
            print(f"⚠ Validation warnings: {errors}")
        
        sample = {
            'image': image,
            'annotations': annotations,
            'metadata': metadata or {},
            'validation_errors': errors if not is_valid else []
        }
        
        self.data.append(sample)
    
    def load_from_directory(self, data_dir: str, annotations_file: str):
        """Load dataset from directory with annotations"""
        try:
            # Load annotations
            with open(annotations_file, 'r') as f:
                annotations_data = json.load(f)
            
            # Process each annotated document
            for item in annotations_data:
                image_path = os.path.join(data_dir, item['image_file'])
                if os.path.exists(image_path):
                    image = Image.open(image_path).convert('RGB')
                    self.add_sample(
                        image=image,
                        annotations=item['fields'],
                        metadata={'source': item.get('source', 'unknown')}
                    )
                else:
                    print(f"⚠ Image not found: {image_path}")
            
            print(f"✓ Loaded {len(self.data)} samples from {data_dir}")
            
        except Exception as e:
            print(f"✗ Error loading data: {e}")
            raise
    
    def create_prompt(self, key_fields: List[str]) -> str:
        """Create extraction prompt for the model"""
        fields_str = ", ".join(key_fields)
        prompt = f"""Extract the following fields from this financial document: {fields_str}
Return the information in JSON format with the exact field names as keys.
If a field is not found, use null as the value."""
        return prompt
    
    def split_dataset(self) -> Tuple[List, List, List]:
        """Split dataset into train, eval, and test sets"""
        total_samples = len(self.data)
        train_size = int(total_samples * self.config.train_split)
        eval_size = int(total_samples * self.config.eval_split)
        
        # Shuffle data
        import random
        random.shuffle(self.data)
        
        train_data = self.data[:train_size]
        eval_data = self.data[train_size:train_size + eval_size]
        test_data = self.data[train_size + eval_size:]
        
        print(f"✓ Dataset split: {len(train_data)} train, {len(eval_data)} eval, {len(test_data)} test")
        return train_data, eval_data, test_data
    
    def get_statistics(self) -> Dict:
        """Get dataset statistics"""
        stats = {
            'total_samples': len(self.data),
            'samples_with_errors': sum(1 for s in self.data if s['validation_errors']),
            'unique_fields': set()
        }
        
        for sample in self.data:
            stats['unique_fields'].update(sample['annotations'].keys())
        
        stats['unique_fields'] = list(stats['unique_fields'])
        return stats

print("Dataset handler initialized successfully")

## 5. Model Loading with Quantization

In [None]:
class Qwen2VLModelLoader:
    """Load and prepare Qwen2-VL model for fine-tuning"""
    
    def __init__(self, config: FineTuningConfig):
        self.config = config
        self.model = None
        self.processor = None
        self.tokenizer = None
    
    def load_model(self) -> Tuple[Any, Any, Any]:
        """Load Qwen2-VL model with optional quantization"""
        print(f"Loading {self.config.model_name}...")
        
        try:
            # Configure quantization
            quantization_config = None
            if self.config.use_quantization:
                quantization_config = BitsAndBytesConfig(
                    load_in_4bit=self.config.load_in_4bit,
                    load_in_8bit=self.config.load_in_8bit,
                    bnb_4bit_compute_dtype=torch.float16,
                    bnb_4bit_quant_type="nf4",
                    bnb_4bit_use_double_quant=True,
                )
                print(f"✓ Quantization enabled: {('4-bit' if self.config.load_in_4bit else '8-bit')}")
            
            # Load processor and tokenizer
            self.processor = AutoProcessor.from_pretrained(
                self.config.model_name,
                trust_remote_code=True
            )
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.config.model_name,
                trust_remote_code=True
            )
            print("✓ Processor and tokenizer loaded")
            
            # Load model
            self.model = Qwen2VLForConditionalGeneration.from_pretrained(
                self.config.model_name,
                quantization_config=quantization_config,
                device_map="auto",
                trust_remote_code=True,
                torch_dtype=torch.float16 if quantization_config else torch.float32,
            )
            print("✓ Model loaded successfully")
            
            # Prepare for k-bit training if quantized
            if quantization_config:
                self.model = prepare_model_for_kbit_training(self.model)
                print("✓ Model prepared for k-bit training")
            
            # Print model memory footprint
            if torch.cuda.is_available():
                memory_allocated = torch.cuda.memory_allocated() / 1e9
                print(f"✓ GPU memory allocated: {memory_allocated:.2f} GB")
            
            return self.model, self.processor, self.tokenizer
            
        except Exception as e:
            print(f"✗ Error loading model: {e}")
            raise
    
    def apply_lora(self) -> Any:
        """Apply LoRA adapters to the model"""
        if self.model is None:
            raise ValueError("Model not loaded. Call load_model() first.")
        
        print("Applying LoRA configuration...")
        
        lora_config = LoraConfig(
            r=self.config.lora_r,
            lora_alpha=self.config.lora_alpha,
            target_modules=self.config.lora_target_modules,
            lora_dropout=self.config.lora_dropout,
            bias="none",
            task_type=TaskType.CAUSAL_LM,
        )
        
        self.model = get_peft_model(self.model, lora_config)
        
        # Print trainable parameters
        trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
        total_params = sum(p.numel() for p in self.model.parameters())
        print(f"✓ LoRA applied: {trainable_params:,} trainable parameters ({100 * trainable_params / total_params:.2f}%)")
        
        return self.model
    
    def get_model_info(self) -> Dict:
        """Get model information"""
        if self.model is None:
            return {'status': 'not_loaded'}
        
        info = {
            'model_name': self.config.model_name,
            'quantized': self.config.use_quantization,
            'device': str(next(self.model.parameters()).device),
            'dtype': str(next(self.model.parameters()).dtype),
        }
        
        if torch.cuda.is_available():
            info['gpu_memory_allocated'] = f"{torch.cuda.memory_allocated() / 1e9:.2f} GB"
            info['gpu_memory_reserved'] = f"{torch.cuda.memory_reserved() / 1e9:.2f} GB"
        
        return info

print("Model loader class defined successfully")

## 6. Custom Dataset Class for Training

In [None]:
class VLMFinancialDataset(Dataset):
    """PyTorch Dataset for VLM fine-tuning"""
    
    def __init__(self, data: List[Dict], processor: Any, tokenizer: Any, key_fields: List[str], max_length: int = 512):
        self.data = data
        self.processor = processor
        self.tokenizer = tokenizer
        self.key_fields = key_fields
        self.max_length = max_length
    
    def __len__(self):
        return len(self.data)
    
    def create_prompt(self) -> str:
        """Create extraction prompt"""
        fields_str = ", ".join(self.key_fields)
        return f"""Extract the following fields from this financial document: {fields_str}
Return the information in JSON format."""
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        sample = self.data[idx]
        image = sample['image']
        annotations = sample['annotations']
        
        # Create prompt
        prompt = self.create_prompt()
        
        # Create target (ground truth)
        target = json.dumps(annotations, ensure_ascii=False)
        
        # Process image and text
        # For Qwen2-VL, we need to create a conversation format
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": image},
                    {"type": "text", "text": prompt}
                ]
            },
            {
                "role": "assistant",
                "content": [{"type": "text", "text": target}]
            }
        ]
        
        # Process with processor
        text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=False)
        
        # Tokenize
        inputs = self.processor(
            text=[text],
            images=[image],
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        
        # Prepare labels (for causal language modeling)
        labels = inputs["input_ids"].clone()
        
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "pixel_values": inputs["pixel_values"].squeeze(0) if "pixel_values" in inputs else None,
            "labels": labels.squeeze(0)
        }

print("Custom dataset class defined successfully")

## 7. Training Pipeline

In [None]:
class FineTuningPipeline:
    """Complete fine-tuning pipeline"""
    
    def __init__(self, config: FineTuningConfig):
        self.config = config
        self.model_loader = Qwen2VLModelLoader(config)
        self.dataset = FinancialDocumentDataset(config)
        self.model = None
        self.processor = None
        self.tokenizer = None
        self.trainer = None
    
    def setup(self, schema_path: str):
        """Setup the pipeline"""
        print("=" * 60)
        print("STEP 1: Loading Model")
        print("=" * 60)
        
        # Load model
        self.model, self.processor, self.tokenizer = self.model_loader.load_model()
        
        # Apply LoRA
        self.model = self.model_loader.apply_lora()
        
        print("\n" + "=" * 60)
        print("STEP 2: Loading Schema")
        print("=" * 60)
        
        # Load schema
        self.dataset.load_key_fields_schema(schema_path)
        
        print("\n✓ Setup complete!\n")
    
    def load_data(self, data_dir: str, annotations_file: str):
        """Load training data"""
        print("=" * 60)
        print("STEP 3: Loading Training Data")
        print("=" * 60)
        
        self.dataset.load_from_directory(data_dir, annotations_file)
        
        # Print statistics
        stats = self.dataset.get_statistics()
        print(f"\nDataset Statistics:")
        print(f"  Total samples: {stats['total_samples']}")
        print(f"  Samples with validation errors: {stats['samples_with_errors']}")
        print(f"  Unique fields: {len(stats['unique_fields'])}")
    
    def prepare_datasets(self) -> Tuple[Dataset, Dataset, Dataset]:
        """Prepare train, eval, and test datasets"""
        print("\n" + "=" * 60)
        print("STEP 4: Preparing Datasets")
        print("=" * 60)
        
        # Split data
        train_data, eval_data, test_data = self.dataset.split_dataset()
        
        # Get key fields
        key_fields = [f['name'] for f in self.dataset.key_fields_schema.get('fields', [])]
        
        # Create PyTorch datasets
        train_dataset = VLMFinancialDataset(
            train_data, self.processor, self.tokenizer, key_fields, self.config.max_length
        )
        eval_dataset = VLMFinancialDataset(
            eval_data, self.processor, self.tokenizer, key_fields, self.config.max_length
        )
        test_dataset = VLMFinancialDataset(
            test_data, self.processor, self.tokenizer, key_fields, self.config.max_length
        )
        
        print(f"✓ Datasets prepared")
        return train_dataset, eval_dataset, test_dataset
    
    def train(self, train_dataset: Dataset, eval_dataset: Dataset):
        """Train the model"""
        print("\n" + "=" * 60)
        print("STEP 5: Training")
        print("=" * 60)
        
        # Training arguments
        training_args = TrainingArguments(
            output_dir=self.config.output_dir,
            num_train_epochs=self.config.num_train_epochs,
            per_device_train_batch_size=self.config.per_device_train_batch_size,
            per_device_eval_batch_size=self.config.per_device_eval_batch_size,
            gradient_accumulation_steps=self.config.gradient_accumulation_steps,
            learning_rate=self.config.learning_rate,
            max_grad_norm=self.config.max_grad_norm,
            warmup_ratio=self.config.warmup_ratio,
            lr_scheduler_type=self.config.lr_scheduler_type,
            logging_steps=self.config.logging_steps,
            save_steps=self.config.save_steps,
            eval_steps=self.config.eval_steps,
            save_total_limit=self.config.save_total_limit,
            evaluation_strategy="steps",
            save_strategy="steps",
            load_best_model_at_end=True,
            push_to_hub=False,
            report_to=["tensorboard"],
            fp16=torch.cuda.is_available(),
            optim="paged_adamw_8bit" if self.config.use_quantization else "adamw_torch",
            remove_unused_columns=False,
        )
        
        # Initialize trainer
        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=eval_dataset,
            tokenizer=self.tokenizer,
        )
        
        print(f"\nStarting training...")
        print(f"  Epochs: {self.config.num_train_epochs}")
        print(f"  Batch size: {self.config.per_device_train_batch_size}")
        print(f"  Gradient accumulation: {self.config.gradient_accumulation_steps}")
        print(f"  Effective batch size: {self.config.per_device_train_batch_size * self.config.gradient_accumulation_steps}")
        print(f"  Learning rate: {self.config.learning_rate}\n")
        
        # Train
        self.trainer.train()
        
        print("\n✓ Training complete!")
    
    def save_model(self, save_path: Optional[str] = None):
        """Save the fine-tuned model"""
        save_path = save_path or self.config.output_dir
        
        print(f"\nSaving model to {save_path}...")
        
        # Save the model
        self.model.save_pretrained(save_path)
        self.processor.save_pretrained(save_path)
        self.tokenizer.save_pretrained(save_path)
        
        # Save config
        config_path = os.path.join(save_path, "training_config.json")
        with open(config_path, 'w') as f:
            json.dump(self.config.__dict__, f, indent=2, default=str)
        
        print(f"✓ Model saved to {save_path}")
    
    def load_finetuned_model(self, model_path: str):
        """Load a fine-tuned model"""
        print(f"Loading fine-tuned model from {model_path}...")
        
        from peft import PeftModel
        
        # Load base model first
        base_model, processor, tokenizer = self.model_loader.load_model()
        
        # Load PEFT model
        self.model = PeftModel.from_pretrained(base_model, model_path)
        self.processor = processor
        self.tokenizer = tokenizer
        
        print("✓ Fine-tuned model loaded")
        return self.model, self.processor, self.tokenizer

print("Training pipeline class defined successfully")

## 8. Evaluation and Testing

In [None]:
class ModelEvaluator:
    """Evaluate fine-tuned model performance"""
    
    def __init__(self, model, processor, tokenizer):
        self.model = model
        self.processor = processor
        self.tokenizer = tokenizer
        self.model.eval()
    
    def predict(self, image: Image.Image, key_fields: List[str]) -> Dict:
        """Make prediction on a single image"""
        # Create prompt
        fields_str = ", ".join(key_fields)
        prompt = f"""Extract the following fields from this financial document: {fields_str}
Return the information in JSON format."""
        
        # Prepare messages
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": image},
                    {"type": "text", "text": prompt}
                ]
            }
        ]
        
        # Process
        text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = self.processor(
            text=[text],
            images=[image],
            padding=True,
            return_tensors="pt"
        ).to(self.model.device)
        
        # Generate
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=512,
                do_sample=False,
            )
        
        # Decode
        generated_text = self.processor.batch_decode(
            outputs,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True
        )[0]
        
        # Extract JSON from response
        try:
            # Try to find JSON in the response
            import re
            json_match = re.search(r'\{.*\}', generated_text, re.DOTALL)
            if json_match:
                result = json.loads(json_match.group())
            else:
                result = {"error": "No JSON found in response", "raw_text": generated_text}
        except json.JSONDecodeError:
            result = {"error": "Invalid JSON", "raw_text": generated_text}
        
        return result
    
    def evaluate_dataset(self, test_dataset: Dataset, key_fields: List[str]) -> Dict:
        """Evaluate on test dataset"""
        print("\n" + "=" * 60)
        print("EVALUATION")
        print("=" * 60)
        
        results = []
        correct_fields = {field: 0 for field in key_fields}
        total_samples = len(test_dataset)
        
        for idx in range(total_samples):
            sample = test_dataset.data[idx]
            image = sample['image']
            ground_truth = sample['annotations']
            
            # Predict
            prediction = self.predict(image, key_fields)
            
            # Compare
            sample_result = {
                'ground_truth': ground_truth,
                'prediction': prediction,
                'correct_fields': {}
            }
            
            for field in key_fields:
                if field in prediction and field in ground_truth:
                    # Normalize for comparison
                    pred_val = str(prediction[field]).strip().lower()
                    true_val = str(ground_truth[field]).strip().lower()
                    
                    is_correct = pred_val == true_val
                    sample_result['correct_fields'][field] = is_correct
                    
                    if is_correct:
                        correct_fields[field] += 1
            
            results.append(sample_result)
            
            print(f"  Evaluated sample {idx + 1}/{total_samples}", end='\r')
        
        # Calculate metrics
        field_accuracy = {field: correct / total_samples for field, correct in correct_fields.items()}
        overall_accuracy = sum(field_accuracy.values()) / len(field_accuracy) if field_accuracy else 0
        
        evaluation_results = {
            'total_samples': total_samples,
            'overall_accuracy': overall_accuracy,
            'field_accuracy': field_accuracy,
            'detailed_results': results
        }
        
        # Print summary
        print("\n\nEvaluation Results:")
        print(f"  Total samples: {total_samples}")
        print(f"  Overall accuracy: {overall_accuracy:.2%}")
        print(f"\n  Per-field accuracy:")
        for field, acc in field_accuracy.items():
            print(f"    {field}: {acc:.2%}")
        
        return evaluation_results
    
    def save_evaluation_results(self, results: Dict, output_path: str):
        """Save evaluation results to file"""
        with open(output_path, 'w') as f:
            json.dump(results, f, indent=2, default=str)
        print(f"\n✓ Evaluation results saved to {output_path}")

print("Evaluator class defined successfully")

## 9. Example Usage Pipeline

In [None]:
# Example: Complete end-to-end pipeline

def run_complete_pipeline(
    pdf_files: List[str],
    schema_path: str,
    annotations_file: str,
    output_dir: str = "./finetuned_qwen2vl"
):
    """
    Complete pipeline for fine-tuning Qwen2-VL on financial documents
    
    Args:
        pdf_files: List of PDF file paths
        schema_path: Path to JSON schema file with key fields
        annotations_file: Path to annotations JSON file
        output_dir: Directory to save fine-tuned model
    """
    
    # Update config
    config.output_dir = output_dir
    
    # Step 1: Process PDFs
    print("\n" + "=" * 60)
    print("PDF PROCESSING")
    print("=" * 60)
    
    pdf_processor = PDFProcessor(dpi=config.dpi, max_image_size=config.max_image_size)
    processed_images = pdf_processor.batch_process_pdfs(pdf_files)
    
    # Step 2: Initialize pipeline
    pipeline = FineTuningPipeline(config)
    pipeline.setup(schema_path)
    
    # Step 3: Load data
    data_dir = "./processed_pdfs"  # Directory where images were saved
    pipeline.load_data(data_dir, annotations_file)
    
    # Step 4: Prepare datasets
    train_dataset, eval_dataset, test_dataset = pipeline.prepare_datasets()
    
    # Step 5: Train
    pipeline.train(train_dataset, eval_dataset)
    
    # Step 6: Save model
    pipeline.save_model()
    
    # Step 7: Evaluate
    print("\n" + "=" * 60)
    print("EVALUATION")
    print("=" * 60)
    
    evaluator = ModelEvaluator(pipeline.model, pipeline.processor, pipeline.tokenizer)
    key_fields = [f['name'] for f in pipeline.dataset.key_fields_schema.get('fields', [])]
    eval_results = evaluator.evaluate_dataset(test_dataset, key_fields)
    
    # Save evaluation results
    eval_output_path = os.path.join(output_dir, "evaluation_results.json")
    evaluator.save_evaluation_results(eval_results, eval_output_path)
    
    print("\n" + "=" * 60)
    print("PIPELINE COMPLETE!")
    print("=" * 60)
    print(f"\nModel saved to: {output_dir}")
    print(f"Evaluation results: {eval_output_path}")
    print(f"Overall accuracy: {eval_results['overall_accuracy']:.2%}")
    
    return pipeline, eval_results

print("\n✓ Complete pipeline function defined")

## 10. Usage Examples

In [None]:
# Example 1: Create a sample schema file
sample_schema = {
    "fields": [
        {"name": "invoice_number", "type": "string", "required": True},
        {"name": "date", "type": "string", "required": True},
        {"name": "total_amount", "type": "number", "required": True},
        {"name": "vendor_name", "type": "string", "required": True},
        {"name": "tax_amount", "type": "number", "required": False},
        {"name": "currency", "type": "string", "required": False}
    ]
}

# Save sample schema
with open("sample_schema.json", "w") as f:
    json.dump(sample_schema, f, indent=2)

print("Sample schema created: sample_schema.json")
print(json.dumps(sample_schema, indent=2))

In [None]:
# Example 2: Create sample annotations file format
sample_annotations = [
    {
        "image_file": "document1/page_1.png",
        "source": "invoice_dataset",
        "fields": {
            "invoice_number": "INV-2024-001",
            "date": "2024-01-15",
            "total_amount": 1234.56,
            "vendor_name": "ACME Corporation",
            "tax_amount": 123.45,
            "currency": "USD"
        }
    },
    {
        "image_file": "document2/page_1.png",
        "source": "invoice_dataset",
        "fields": {
            "invoice_number": "INV-2024-002",
            "date": "2024-01-16",
            "total_amount": 5678.90,
            "vendor_name": "Tech Solutions Inc",
            "tax_amount": 567.89,
            "currency": "USD"
        }
    }
]

# Save sample annotations
with open("sample_annotations.json", "w") as f:
    json.dump(sample_annotations, f, indent=2)

print("Sample annotations created: sample_annotations.json")
print(json.dumps(sample_annotations[:1], indent=2))

## 11. Quick Test - Single Prediction

In [None]:
# Example: Quick test on a single image
# Uncomment and modify paths to test

"""
# Load model
pipeline = FineTuningPipeline(config)
pipeline.load_finetuned_model("./finetuned_qwen2vl")

# Load test image
test_image = Image.open("path/to/test/invoice.png")

# Define fields to extract
key_fields = ["invoice_number", "date", "total_amount", "vendor_name"]

# Create evaluator
evaluator = ModelEvaluator(pipeline.model, pipeline.processor, pipeline.tokenizer)

# Make prediction
result = evaluator.predict(test_image, key_fields)

print("\nExtracted Fields:")
print(json.dumps(result, indent=2))
"""

pass

## 12. Main Execution (Uncomment to run)

To run the complete pipeline, uncomment the code below and provide your data paths.

In [None]:
# MAIN EXECUTION
# Uncomment and modify paths to run the complete pipeline

"""
# Define your data paths
PDF_FILES = [
    "./data/invoice1.pdf",
    "./data/invoice2.pdf",
    # Add more PDF files
]

SCHEMA_PATH = "./sample_schema.json"
ANNOTATIONS_FILE = "./sample_annotations.json"
OUTPUT_DIR = "./finetuned_qwen2vl"

# Run the complete pipeline
pipeline, eval_results = run_complete_pipeline(
    pdf_files=PDF_FILES,
    schema_path=SCHEMA_PATH,
    annotations_file=ANNOTATIONS_FILE,
    output_dir=OUTPUT_DIR
)
"""

print("\n" + "="*60)
print("READY TO USE!")
print("="*60)
print("\nThis notebook is ready to be imported and run on Runpod.")
print("\nTo use:")
print("1. Upload this notebook to your Runpod instance")
print("2. Prepare your data (PDFs, schema, annotations)")
print("3. Update the paths in the Main Execution cell")
print("4. Run all cells")
print("\nThe fine-tuned model will be saved to the output directory.")