In [14]:
import os
os.environ.setdefault("PYTHONUTF8", "1")

%load_ext autoreload
%autoreload 2
%matplotlib inline

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [15]:
import json
import sys
from pathlib import Path

from dotenv import load_dotenv

# repo root → makes `processing_layer` importable
sys.path.insert(0, str(Path().resolve().parent.parent))
load_dotenv(Path().resolve().parent.parent / ".env")

True

In [16]:
import fiftyone as fo
from fiftyone.utils.huggingface import load_from_hub

N_SAMPLES = 1  # adjust as needed

print(f"Loading {N_SAMPLES} samples from HuggingFace…")
dataset = load_from_hub(
    "Voxel51/high-quality-invoice-images-for-ocr",
    max_samples=N_SAMPLES,
)

annotated = [s for s in dataset if s["json_annotation"] is not None]
print(f"{len(annotated)} annotated / {len(dataset)} loaded")

Loading 1 samples from HuggingFace…
Downloading config file fiftyone.yml from Voxel51/high-quality-invoice-images-for-ocr
Loading dataset


ValueError: Dataset name 'Voxel51/high-quality-invoice-images-for-ocr' is not available

In [None]:
from processing_layer.extraction.invoice import InvoiceExtractor
from processing_layer.llm.gemini import GeminiProvider

provider = GeminiProvider()
extractor = InvoiceExtractor(provider)
print(f"Using model: {provider.model}")

In [None]:
from IPython.display import display
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

results = []

for i, sample in enumerate(annotated):
    print(f"\n{'='*70}")
    print(f"Sample {i+1}: {Path(sample.filepath).name}")
    print(f"{'='*70}")

    # --- plot invoice image ---
    img = Image.open(sample.filepath)
    fig, axes = plt.subplots(1, 1, figsize=(6, 8))
    axes.imshow(img)
    axes.axis("off")
    axes.set_title(Path(sample.filepath).name, fontsize=9)
    plt.tight_layout()
    plt.show()

    # --- call Gemini ---
    image_bytes = Path(sample.filepath).read_bytes()
    extracted = extractor.extract_from_image(image_bytes, "image/jpeg")
    gt = json.loads(sample["json_annotation"])

    results.append({"sample": Path(sample.filepath).name, "extracted": extracted, "ground_truth": gt})

    # --- side-by-side comparison ---
    print("\n--- Extracted (Gemini) ---")
    print(extracted.model_dump_json(indent=2))

    print("\n--- Ground truth ---")
    print(json.dumps(gt, indent=2))

In [None]:
# ── Field-level summary ──────────────────────────────────────────────────────
import pandas as pd

summary_rows = []
for r in results:
    ext = r["extracted"]
    gt_inv = r["ground_truth"]["invoice"]
    gt_sub = r["ground_truth"].get("subtotal", {})
    summary_rows.append({
        "file": r["sample"],
        "invoice_number": ext.invoice_number == gt_inv.get("invoice_number"),
        "invoice_date":   ext.invoice_date   == gt_inv.get("invoice_date"),
        "vendor_name":    ext.vendor_name    == gt_inv.get("seller_name"),
        "client_name":    ext.client_name    == gt_inv.get("client_name"),
        "vendor_address": ext.vendor_address == gt_inv.get("seller_address"),
        "client_address": ext.client_address == gt_inv.get("client_address"),
        "total_extracted": ext.total,
        "total_gt":        float(gt_sub.get("total", "nan") or "nan"),
        "total_match":     ext.total is not None and abs(ext.total - float(gt_sub.get("total", 0) or 0)) < 0.01,
        "n_items":         len(ext.line_items) == len(r["ground_truth"].get("items", [])),
    })

df_summary = pd.DataFrame(summary_rows)
display(df_summary)

In [None]:
# ── Per-line-item price comparison ───────────────────────────────────────────
item_rows = []
for r in results:
    ext_items = r["extracted"].line_items
    gt_items  = r["ground_truth"].get("items", [])
    for idx, (ext_item, gt_item) in enumerate(zip(ext_items, gt_items)):
        gt_total  = float(gt_item.get("total_price", "nan") or "nan")
        ext_total = ext_item.total_price
        diff      = ext_total - gt_total if (ext_total is not None and not pd.isna(gt_total)) else None
        pct_diff  = (diff / gt_total * 100) if (diff is not None and gt_total != 0) else None
        item_rows.append({
            "file":        r["sample"],
            "item_idx":    idx + 1,
            "description": ext_item.description[:55] + "…" if len(ext_item.description) > 55 else ext_item.description,
            "qty":         ext_item.quantity,
            "total_extracted": ext_total,
            "total_gt":        gt_total,
            "diff":            round(diff, 2) if diff is not None else None,
            "pct_diff":        round(pct_diff, 1) if pct_diff is not None else None,
            "exact_match":     diff is not None and abs(diff) < 0.01,
        })

df_items = pd.DataFrame(item_rows)

# highlight mismatches
def color_match(val):
    if val is True:  return "background-color: #c8f7c5"
    if val is False: return "background-color: #f7c5c5"
    return ""

display(
    df_items.style
        .applymap(color_match, subset=["exact_match"])
        .format({"pct_diff": "{:+.1f}%", "diff": "{:+.2f}"}, na_rep="—")
)