# Mortgage-Backed Securities PDF Extraction: Approach Comparison

This notebook compares three extraction approaches for extracting structured data from MBS trustee report PDFs:

1. **Approach 1**: Gemini Vision Direct Extraction
2. **Approach 2**: PDF Text Parsing + Gemini Pro
3. **Approach 3**: Hybrid Smart Extraction

We'll evaluate each approach on:
- Accuracy (successful extraction rate, field-level accuracy)
- Speed (time per document)
- Cost (API token usage and estimated cost)
- Quality (table handling, multi-page support)

## Setup

In [None]:
import os
import json
import time
from pathlib import Path
from datetime import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm

# Set up plotting style
sns.set_theme(style="whitegrid")
plt.rcParams['figure.figsize'] = [12, 6]

# Import our extraction modules
import sys
sys.path.insert(0, '.')

from utils.gemini_vision import GeminiVisionExtractor
from utils.pdf_text_extraction import PDFTextExtractor
from utils.validators import DataValidator
from utils.output_formatter import ExcelFormatter

In [None]:
# Configuration
API_KEY = os.getenv("GOOGLE_API_KEY")  # Or set directly: API_KEY = "your-api-key"

if not API_KEY:
    raise ValueError("Please set GOOGLE_API_KEY environment variable or set API_KEY directly")

# Paths
PDF_DIR = Path("data/pdfs")
OUTPUT_DIR = Path("data/output")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Get sample PDFs (5-10 for comparison)
all_pdfs = sorted(PDF_DIR.glob("*.pdf"))
SAMPLE_SIZE = min(10, len(all_pdfs))
sample_pdfs = all_pdfs[:SAMPLE_SIZE]

print(f"Total PDFs available: {len(all_pdfs)}")
print(f"Sample size for comparison: {SAMPLE_SIZE}")
print(f"Sample PDFs: {[p.name for p in sample_pdfs]}")

## 1. Run Extraction Experiments

### Approach 1: Gemini Vision Direct Extraction

In [None]:
# Initialize extractors
vision_extractor = GeminiVisionExtractor(api_key=API_KEY, model_name="gemini-2.0-flash-exp")
validator = DataValidator()

# Run Approach 1
approach1_results = []

print("Running Approach 1: Gemini Vision Direct Extraction")
print("=" * 50)

for pdf_path in tqdm(sample_pdfs, desc="Approach 1"):
    result = vision_extractor.extract(pdf_path)
    
    # Add validation
    if result["success"]:
        is_valid, validated_doc = validator.validate_document(result["data"], pdf_path.name)
        result["validation"] = validator.get_validation_summary()
        result["validation_passed"] = is_valid
    else:
        result["validation_passed"] = False
    
    approach1_results.append(result)
    
    # Print progress
    status = "OK" if result["success"] else f"FAIL: {result['error'][:50]}"
    print(f"  {pdf_path.name}: {status} ({result['processing_time']:.2f}s)")

### Approach 2: PDF Text Parsing + Gemini Pro

In [None]:
# Initialize text extractor
text_extractor = PDFTextExtractor(api_key=API_KEY, model_name="gemini-1.5-pro")

# Run Approach 2
approach2_results = []

print("Running Approach 2: PDF Text Parsing + Gemini Pro")
print("=" * 50)

for pdf_path in tqdm(sample_pdfs, desc="Approach 2"):
    result = text_extractor.extract(pdf_path, text_method="pdfplumber")
    
    # Add validation
    if result["success"]:
        is_valid, validated_doc = validator.validate_document(result["data"], pdf_path.name)
        result["validation"] = validator.get_validation_summary()
        result["validation_passed"] = is_valid
    else:
        result["validation_passed"] = False
    
    approach2_results.append(result)
    
    status = "OK" if result["success"] else f"FAIL: {result['error'][:50]}"
    print(f"  {pdf_path.name}: {status} ({result['processing_time']:.2f}s)")

### Approach 3: Hybrid Smart Extraction

In [None]:
def assess_text_quality(text: str, metadata: dict) -> bool:
    """Assess if extracted text is good enough for text-based extraction."""
    if not text or len(text) < 500:
        return False
    
    has_tables = metadata.get("tables_found", 0) > 0
    delinquency_keywords = ["current", "delinquent", "30-59", "60-89", "90+", "foreclosure"]
    keyword_count = sum(1 for kw in delinquency_keywords if kw.lower() in text.lower())
    has_balance_data = "$" in text or "balance" in text.lower()
    
    quality_score = 0
    if has_tables:
        quality_score += 2
    if keyword_count >= 3:
        quality_score += 2
    if has_balance_data:
        quality_score += 1
    if len(text) > 2000:
        quality_score += 1
    
    return quality_score >= 4

# Initialize extractors for hybrid approach
hybrid_text_extractor = PDFTextExtractor(api_key=API_KEY, model_name="gemini-1.5-flash")

# Run Approach 3
approach3_results = []

print("Running Approach 3: Hybrid Smart Extraction")
print("=" * 50)

for pdf_path in tqdm(sample_pdfs, desc="Approach 3"):
    start_time = time.time()
    
    # Assess text quality first
    try:
        text, text_metadata = hybrid_text_extractor.extract_text(pdf_path, method="pdfplumber")
        use_text_method = assess_text_quality(text, text_metadata)
    except Exception:
        use_text_method = False
    
    # Choose method based on quality
    if use_text_method:
        result = hybrid_text_extractor.extract(pdf_path, text_method="pdfplumber")
        result["hybrid_method"] = "text"
    else:
        result = vision_extractor.extract(pdf_path)
        result["hybrid_method"] = "vision"
    
    # Add validation
    if result["success"]:
        is_valid, validated_doc = validator.validate_document(result["data"], pdf_path.name)
        result["validation"] = validator.get_validation_summary()
        result["validation_passed"] = is_valid
    else:
        result["validation_passed"] = False
    
    approach3_results.append(result)
    
    method = result.get("hybrid_method", "unknown")
    status = "OK" if result["success"] else f"FAIL"
    print(f"  {pdf_path.name}: {status} [{method}] ({result['processing_time']:.2f}s)")

## 2. Quantitative Comparison

### 2.1 Accuracy Metrics

In [None]:
def calculate_metrics(results, approach_name):
    """Calculate metrics for a set of results."""
    total = len(results)
    successful = sum(1 for r in results if r["success"])
    validated = sum(1 for r in results if r.get("validation_passed", False))
    
    # Calculate field-level accuracy for successful extractions
    field_scores = {"series_name": 0, "report_date": 0, "beginning_balance": 0, 
                   "ending_balance": 0, "delinquency": 0}
    
    for r in results:
        if r["success"] and r.get("data"):
            data = r["data"]
            if data.get("series_name"):
                field_scores["series_name"] += 1
            if data.get("report_date"):
                field_scores["report_date"] += 1
            if data.get("beginning_balance") is not None:
                field_scores["beginning_balance"] += 1
            if data.get("ending_balance") is not None:
                field_scores["ending_balance"] += 1
            if data.get("delinquency") and len(data.get("delinquency", [])) > 0:
                field_scores["delinquency"] += 1
    
    # Calculate processing times
    times = [r["processing_time"] for r in results]
    
    return {
        "approach": approach_name,
        "total_documents": total,
        "successful_extractions": successful,
        "success_rate": successful / total * 100 if total > 0 else 0,
        "validation_passed": validated,
        "validation_rate": validated / total * 100 if total > 0 else 0,
        "field_accuracy": {k: v / successful * 100 if successful > 0 else 0 for k, v in field_scores.items()},
        "avg_time": np.mean(times),
        "min_time": np.min(times),
        "max_time": np.max(times),
        "total_time": np.sum(times),
    }

# Calculate metrics for each approach
metrics_1 = calculate_metrics(approach1_results, "Approach 1: Gemini Vision")
metrics_2 = calculate_metrics(approach2_results, "Approach 2: Text + Gemini Pro")
metrics_3 = calculate_metrics(approach3_results, "Approach 3: Hybrid")

all_metrics = [metrics_1, metrics_2, metrics_3]

In [None]:
# Create comparison DataFrame
comparison_data = []
for m in all_metrics:
    comparison_data.append({
        "Approach": m["approach"],
        "Success Rate (%)": f"{m['success_rate']:.1f}",
        "Validation Rate (%)": f"{m['validation_rate']:.1f}",
        "Avg Time (s)": f"{m['avg_time']:.2f}",
        "Total Time (s)": f"{m['total_time']:.2f}",
    })

comparison_df = pd.DataFrame(comparison_data)
print("\n" + "=" * 80)
print("ACCURACY AND SPEED COMPARISON")
print("=" * 80)
display(comparison_df)

In [None]:
# Field-level accuracy comparison
field_data = []
for m in all_metrics:
    for field, accuracy in m["field_accuracy"].items():
        field_data.append({
            "Approach": m["approach"].split(":")[0],
            "Field": field,
            "Accuracy (%)": accuracy
        })

field_df = pd.DataFrame(field_data)
print("\n" + "=" * 80)
print("FIELD-LEVEL ACCURACY")
print("=" * 80)

# Pivot for better display
field_pivot = field_df.pivot(index="Field", columns="Approach", values="Accuracy (%)")
display(field_pivot.round(1))

### 2.2 Error Analysis

In [None]:
def analyze_errors(results, approach_name):
    """Analyze errors for a set of results."""
    errors = []
    warnings = []
    
    for r in results:
        if not r["success"]:
            errors.append({"file": r["filename"], "error": r["error"]})
        elif r.get("validation"):
            if r["validation"].get("errors"):
                for e in r["validation"]["errors"]:
                    errors.append({"file": r["filename"], "error": e})
            if r["validation"].get("warnings"):
                for w in r["validation"]["warnings"]:
                    warnings.append({"file": r["filename"], "warning": w})
    
    return {"approach": approach_name, "errors": errors, "warnings": warnings}

errors_1 = analyze_errors(approach1_results, "Approach 1")
errors_2 = analyze_errors(approach2_results, "Approach 2")
errors_3 = analyze_errors(approach3_results, "Approach 3")

print("\n" + "=" * 80)
print("ERROR ANALYSIS")
print("=" * 80)

for e in [errors_1, errors_2, errors_3]:
    print(f"\n{e['approach']}:")
    print(f"  Errors: {len(e['errors'])}")
    print(f"  Warnings: {len(e['warnings'])}")
    if e['errors']:
        print("  Error details:")
        for err in e['errors'][:5]:  # Show first 5
            print(f"    - {err['file']}: {err['error'][:80]}")

### 2.3 Cost Estimation

In [None]:
# Estimate costs (rough approximations)
NUM_TOTAL_DOCS = 50
AVG_PAGES_PER_DOC = 10  # Estimate - adjust based on actual PDFs
AVG_TEXT_LENGTH = 15000  # Estimate - adjust based on actual extraction

cost_1 = vision_extractor.estimate_cost(AVG_PAGES_PER_DOC, NUM_TOTAL_DOCS)
cost_2 = text_extractor.estimate_cost(AVG_TEXT_LENGTH, NUM_TOTAL_DOCS)

# Hybrid cost estimation (assume 60% text, 40% vision based on quality assessment)
hybrid_text_ratio = 0.6
cost_3 = {
    "estimated_total_cost": (
        cost_2["estimated_total_cost"] * hybrid_text_ratio + 
        cost_1["estimated_total_cost"] * (1 - hybrid_text_ratio)
    ),
    "cost_per_document": (
        cost_2["cost_per_document"] * hybrid_text_ratio + 
        cost_1["cost_per_document"] * (1 - hybrid_text_ratio)
    ),
}

print("\n" + "=" * 80)
print("COST ESTIMATION (for 50 documents)")
print("=" * 80)

cost_data = [
    {"Approach": "Approach 1: Gemini Vision", 
     "Est. Total Cost": f"${cost_1['estimated_total_cost']:.4f}",
     "Cost per Doc": f"${cost_1['cost_per_document']:.6f}"},
    {"Approach": "Approach 2: Text + Pro", 
     "Est. Total Cost": f"${cost_2['estimated_total_cost']:.4f}",
     "Cost per Doc": f"${cost_2['cost_per_document']:.6f}"},
    {"Approach": "Approach 3: Hybrid", 
     "Est. Total Cost": f"${cost_3['estimated_total_cost']:.4f}",
     "Cost per Doc": f"${cost_3['cost_per_document']:.6f}"},
]

cost_df = pd.DataFrame(cost_data)
display(cost_df)

print("\nNote: These are rough estimates. Actual costs may vary based on document complexity.")

## 3. Visualizations

In [None]:
# Create visualization data
approaches = ["Approach 1\n(Vision)", "Approach 2\n(Text+Pro)", "Approach 3\n(Hybrid)"]
success_rates = [metrics_1["success_rate"], metrics_2["success_rate"], metrics_3["success_rate"]]
validation_rates = [metrics_1["validation_rate"], metrics_2["validation_rate"], metrics_3["validation_rate"]]
avg_times = [metrics_1["avg_time"], metrics_2["avg_time"], metrics_3["avg_time"]]
costs = [cost_1["cost_per_document"] * 1000, cost_2["cost_per_document"] * 1000, cost_3["cost_per_document"] * 1000]  # in millicents

In [None]:
# Figure 1: Success and Validation Rates
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Success Rate
colors = sns.color_palette("husl", 3)
bars1 = axes[0].bar(approaches, success_rates, color=colors)
axes[0].set_ylabel("Success Rate (%)")
axes[0].set_title("Extraction Success Rate")
axes[0].set_ylim(0, 105)
for bar, rate in zip(bars1, success_rates):
    axes[0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1, 
                f'{rate:.1f}%', ha='center', va='bottom', fontsize=10)

# Processing Time
bars2 = axes[1].bar(approaches, avg_times, color=colors)
axes[1].set_ylabel("Average Time (seconds)")
axes[1].set_title("Average Processing Time per Document")
for bar, t in zip(bars2, avg_times):
    axes[1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1, 
                f'{t:.2f}s', ha='center', va='bottom', fontsize=10)

# Cost
bars3 = axes[2].bar(approaches, costs, color=colors)
axes[2].set_ylabel("Cost per Document (millicents)")
axes[2].set_title("Estimated Cost per Document")
for bar, c in zip(bars3, costs):
    axes[2].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001, 
                f'{c:.3f}', ha='center', va='bottom', fontsize=10)

plt.tight_layout()
plt.savefig(OUTPUT_DIR / "comparison_charts.png", dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Figure 2: Field-level Accuracy Heatmap
fig, ax = plt.subplots(figsize=(10, 6))

field_matrix = field_pivot.values
sns.heatmap(field_pivot, annot=True, fmt='.1f', cmap='RdYlGn', 
            vmin=0, vmax=100, ax=ax, cbar_kws={'label': 'Accuracy (%)'})
ax.set_title("Field-Level Extraction Accuracy by Approach")
ax.set_xlabel("")
ax.set_ylabel("Field")

plt.tight_layout()
plt.savefig(OUTPUT_DIR / "field_accuracy_heatmap.png", dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Figure 3: Processing Time Distribution
fig, ax = plt.subplots(figsize=(10, 6))

time_data = {
    "Approach 1 (Vision)": [r["processing_time"] for r in approach1_results],
    "Approach 2 (Text+Pro)": [r["processing_time"] for r in approach2_results],
    "Approach 3 (Hybrid)": [r["processing_time"] for r in approach3_results],
}

time_df = pd.DataFrame(time_data)
time_df.boxplot(ax=ax)
ax.set_ylabel("Processing Time (seconds)")
ax.set_title("Processing Time Distribution by Approach")

plt.tight_layout()
plt.savefig(OUTPUT_DIR / "time_distribution.png", dpi=150, bbox_inches='tight')
plt.show()

## 4. Qualitative Analysis

In [None]:
# Analyze text extraction quality for Approach 2
print("=" * 80)
print("TEXT EXTRACTION QUALITY ANALYSIS (Approach 2)")
print("=" * 80)

for r in approach2_results:
    if r.get("text_extraction_metadata"):
        meta = r["text_extraction_metadata"]
        print(f"\n{r['filename']}:")
        print(f"  Pages: {meta.get('pages', 'N/A')}")
        print(f"  Tables found: {meta.get('tables_found', 'N/A')}")
        print(f"  Method: {meta.get('method', 'N/A')}")
        print(f"  Extraction success: {r['success']}")

In [None]:
# Analyze hybrid method distribution
if approach3_results:
    print("\n" + "=" * 80)
    print("HYBRID APPROACH METHOD DISTRIBUTION")
    print("=" * 80)
    
    text_count = sum(1 for r in approach3_results if r.get("hybrid_method") == "text")
    vision_count = sum(1 for r in approach3_results if r.get("hybrid_method") == "vision")
    
    print(f"\nText-based extraction: {text_count} ({text_count/len(approach3_results)*100:.1f}%)")
    print(f"Vision-based extraction: {vision_count} ({vision_count/len(approach3_results)*100:.1f}%)")
    
    # Create pie chart
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.pie([text_count, vision_count], labels=['Text Method', 'Vision Method'], 
           autopct='%1.1f%%', colors=sns.color_palette("husl", 2))
    ax.set_title("Approach 3: Method Distribution")
    plt.savefig(OUTPUT_DIR / "hybrid_distribution.png", dpi=150, bbox_inches='tight')
    plt.show()

## 5. Sample Data Inspection

In [None]:
# Show sample extracted data from each approach
print("=" * 80)
print("SAMPLE EXTRACTED DATA COMPARISON")
print("=" * 80)

# Get first successful extraction from each approach
for approach_name, results in [("Approach 1", approach1_results), 
                                ("Approach 2", approach2_results),
                                ("Approach 3", approach3_results)]:
    print(f"\n{approach_name}:")
    print("-" * 40)
    
    successful = [r for r in results if r["success"]]
    if successful:
        sample = successful[0]
        data = sample["data"]
        print(f"  Filename: {data.get('filename')}")
        print(f"  Series: {data.get('series_name')}")
        print(f"  Date: {data.get('report_date')}")
        print(f"  Beginning Balance: ${data.get('beginning_balance', 0):,.2f}")
        print(f"  Ending Balance: ${data.get('ending_balance', 0):,.2f}")
        print(f"  Delinquency Categories: {len(data.get('delinquency', []))}")
        if data.get('delinquency'):
            for d in data['delinquency'][:3]:
                print(f"    - {d.get('category')}: {d.get('count')} loans, ${d.get('balance', 0):,.2f}")
    else:
        print("  No successful extractions")

## 6. Final Recommendation

In [None]:
print("=" * 80)
print("FINAL RECOMMENDATION")
print("=" * 80)

# Score each approach (simple weighted scoring)
scores = {}
for m in all_metrics:
    # Weights: accuracy 40%, speed 30%, cost 30%
    accuracy_score = m["success_rate"] * 0.4
    
    # Normalize speed (lower is better, max 100)
    max_time = max(metrics_1["avg_time"], metrics_2["avg_time"], metrics_3["avg_time"])
    speed_score = (1 - m["avg_time"] / max_time) * 100 * 0.3 if max_time > 0 else 30
    
    # Normalize cost (lower is better, max 100)
    approach_num = int(m["approach"].split()[1][0])
    cost_val = [cost_1, cost_2, cost_3][approach_num - 1]["cost_per_document"]
    max_cost = max(cost_1["cost_per_document"], cost_2["cost_per_document"], cost_3["cost_per_document"])
    cost_score = (1 - cost_val / max_cost) * 100 * 0.3 if max_cost > 0 else 30
    
    total_score = accuracy_score + speed_score + cost_score
    scores[m["approach"]] = {
        "total": total_score,
        "accuracy_component": accuracy_score,
        "speed_component": speed_score,
        "cost_component": cost_score,
    }

# Find best approach
best_approach = max(scores.items(), key=lambda x: x[1]["total"])

print("\nWeighted Scoring (Accuracy: 40%, Speed: 30%, Cost: 30%):")
print("-" * 60)
for approach, score in sorted(scores.items(), key=lambda x: -x[1]["total"]):
    print(f"\n{approach}:")
    print(f"  Total Score: {score['total']:.1f}")
    print(f"  - Accuracy: {score['accuracy_component']:.1f}")
    print(f"  - Speed: {score['speed_component']:.1f}")
    print(f"  - Cost: {score['cost_component']:.1f}")

print(f"\n" + "=" * 60)
print(f"RECOMMENDED: {best_approach[0]}")
print(f"Total Score: {best_approach[1]['total']:.1f}")
print("=" * 60)

In [None]:
# Generate qualitative recommendation summary
print("\nQUALITATIVE ANALYSIS SUMMARY:")
print("-" * 60)
print("""
Approach 1 (Gemini Vision):
  - Pros: Best for complex table structures, handles scanned PDFs
  - Cons: Higher cost, slower processing
  - Best for: Documents with complex layouts or poor text extraction

Approach 2 (Text + Gemini Pro):
  - Pros: Lower cost, faster when text extraction works well
  - Cons: May struggle with complex tables, depends on PDF quality
  - Best for: Clean, text-based PDFs with simple layouts

Approach 3 (Hybrid):
  - Pros: Balances cost and accuracy, adapts to document quality
  - Cons: Additional complexity, two-step assessment
  - Best for: Mixed document quality in batch processing
""")

print("\nFINAL RECOMMENDATION FOR FULL EXTRACTION:")
print("-" * 60)
print(f"Based on this analysis, use {best_approach[0]} for processing all 50 documents.")
print("\nReasoning:")
print("- Review the success rates and field accuracy above")
print("- Consider the cost-accuracy tradeoff for your specific use case")
print("- If accuracy is paramount, prioritize Approach 1 (Vision)")
print("- If cost efficiency matters more, consider Approach 2 or 3")

## 7. Save Results

In [None]:
# Save all results to JSON
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

all_results = {
    "timestamp": timestamp,
    "sample_size": SAMPLE_SIZE,
    "approach_1_results": approach1_results,
    "approach_2_results": approach2_results,
    "approach_3_results": approach3_results,
    "metrics": {
        "approach_1": metrics_1,
        "approach_2": metrics_2,
        "approach_3": metrics_3,
    },
    "recommendation": best_approach[0],
}

with open(OUTPUT_DIR / f"comparison_results_{timestamp}.json", "w") as f:
    json.dump(all_results, f, indent=2, default=str)

print(f"Results saved to: {OUTPUT_DIR / f'comparison_results_{timestamp}.json'}")
print(f"Charts saved to: {OUTPUT_DIR}")