# Contract-Driven Data Pipelines

Extract structured data from heterogeneous documents using declarative YAML contracts.

One schema. Many providers. Full async.

## Design

A **data contract** (YAML) declares everything the pipeline needs:
- Which LLM model to use
- How to classify tables (keyword matching)
- What output schema to produce (columns, types, aliases, formats)
- How to enrich records (constants, metadata from filenames/titles)

The pipeline code is 100% generic — swap the contract, not the code.

**Three use cases** demonstrate the same pipeline across different domains:

| # | Use Case | Format | Documents | Challenge |
|---|----------|--------|-----------|----------|
| 1 | Russian agricultural reports | DOCX | 2 weekly reports | Multi-category extraction, dynamic pivot years |
| 2 | Australian shipping stems | PDF | 6 providers | One canonical model, 6 different layouts, full concurrency |
| 3 | ACEA car registrations | PDF | 1 press release | Pivoted table → flat records, vision-enabled |

In [None]:
# Setup
import asyncio
import re
import time
import yaml
from pathlib import Path

import nest_asyncio
import pandas as pd

from pdf_ocr import (
    CanonicalSchema, ColumnDef,
    classify_tables, classify_docx_tables,
    compress_spatial_text, compress_spatial_text_structured,
    compress_docx_tables, extract_pivot_values,
    interpret_tables_async,
    to_pandas, to_records, to_parquet,
)
# Direct async imports — avoid sync wrappers that spawn extra threads
from pdf_ocr.interpret import _interpret_pages_batched_async, _split_pages

nest_asyncio.apply()

print("Setup complete.")

In [None]:
# Display helpers
import base64, html as html_mod, shutil, subprocess, tempfile
from IPython.display import display, HTML


def render_document_page(path, page=0, dpi=150):
    """Render a document page as a base64 PNG. Supports PDF and DOCX."""
    import fitz

    path = str(path)
    if path.lower().endswith((".docx", ".doc")):
        soffice = shutil.which("soffice") or "/Applications/LibreOffice.app/Contents/MacOS/soffice"
        with tempfile.TemporaryDirectory() as tmpdir:
            subprocess.run(
                [soffice, "--headless", "--convert-to", "pdf", "--outdir", tmpdir, path],
                capture_output=True, check=True,
            )
            pdf_path = next(Path(tmpdir).glob("*.pdf"))
            doc = fitz.open(str(pdf_path))
    else:
        doc = fitz.open(path)

    page_count = len(doc)
    pix = doc[page].get_pixmap(dpi=dpi)
    b64 = base64.b64encode(pix.tobytes("png")).decode()
    doc.close()
    return b64, page_count


def side_by_side_display(*, image_b64=None, compressed_text=None, dataframe=None, max_height=500):
    """Display up to 3 panels side-by-side: document image | compressed text | DataFrame."""
    panels = []
    style = f"overflow-y:auto; max-height:{max_height}px; border:1px solid #ddd; padding:6px; flex:1"
    if image_b64:
        panels.append(f'<div style="{style}"><img src="data:image/png;base64,{image_b64}" style="width:100%"></div>')
    if compressed_text:
        escaped = html_mod.escape(compressed_text)
        panels.append(f'<div style="{style}"><pre style="font-size:11px; margin:0; white-space:pre">{escaped}</pre></div>')
    if dataframe is not None:
        df_html = dataframe.to_html(index=False, max_rows=30)
        panels.append(f'<div style="{style}; font-size:11px">{df_html}</div>')
    display(HTML(f'<div style="display:flex; gap:8px; align-items:flex-start">{" ".join(panels)}</div>'))


print("Display helpers ready.")

---
## Pipeline Architecture

```
contract (YAML) → prepare() → fetch() → transform_async() → save()
```

Four levels of concurrency via `asyncio.gather()` — all using native BAML async (no sync wrappers, no extra threads):

| Level | Scope | Pattern |
|---|---|---|
| **Document-level** | Process N documents simultaneously | `asyncio.gather(*[transform_async(doc) for doc in docs])` |
| **Category-level** | Multiple outputs per document (e.g. harvest + planting) | `asyncio.gather(*[_transform_output_async(...) for out in outputs])` |
| **Table/Page-level** | DOCX: N tables concurrent; PDF: N pages concurrent | `interpret_tables_async()` / `_interpret_pages_batched_async()` |
| **Batch-level** | Step 2 mapping in chunks of 20 rows | Built into `_interpret_pages_batched_async()` |

In [None]:
# ── Generic Pipeline Functions ────────────────────────────────────────────────
# These functions are contract-agnostic. Swap the YAML contract and re-run.


def prepare(contract_path):
    """Load YAML contract → parse categories, schemas, enrichment rules."""
    with open(contract_path) as f:
        contract = yaml.safe_load(f)

    ctx = {"contract": contract, "model": contract.get("model", "openai/gpt-4o")}

    # Categories for table classification
    ctx["categories"] = {
        name: cat["keywords"]
        for name, cat in contract.get("categories", {}).items()
    }

    # Build CanonicalSchema + enrichment rules per output
    ctx["schemas"] = {}
    ctx["enrichment"] = {}
    ctx["output_specs"] = {}
    for out_name, spec in contract.get("outputs", {}).items():
        llm_cols = []
        enrich = {}
        for col in spec["schema"]["columns"]:
            if "source" in col:
                enrich[col["name"]] = col
            else:
                llm_cols.append(ColumnDef(
                    name=col["name"],
                    type=col.get("type", "string"),
                    description=col.get("description", ""),
                    aliases=col.get("aliases", []),
                    format=col.get("format"),
                ))
        ctx["schemas"][out_name] = CanonicalSchema(
            description=spec["schema"].get("description", ""),
            columns=llm_cols,
        )
        ctx["enrichment"][out_name] = enrich
        ctx["output_specs"][out_name] = spec

    print(f"  Contract: {contract['provider']}")
    print(f"  Model: {ctx['model']}")
    print(f"  Outputs: {list(ctx['schemas'].keys())}")
    return ctx


def fetch(ctx, doc_path):
    """Auto-detect format, classify tables, compress to pipe-table markdown."""
    doc_path = str(doc_path)
    doc_ctx = {"doc_path": doc_path, "compressed_by_category": {}}

    # Extract report_date from filename if pattern is defined
    pattern = ctx["contract"].get("report_date_pattern")
    if pattern:
        m = re.search(pattern, Path(doc_path).name)
        doc_ctx["report_date"] = m.group(1) if m else ""
    else:
        doc_ctx["report_date"] = ""

    is_docx = doc_path.lower().endswith((".docx", ".doc"))

    if is_docx:
        classes = classify_docx_tables(doc_path, ctx["categories"])
        for out_name, spec in ctx["output_specs"].items():
            cat = spec["category"]
            indices = [c["index"] for c in classes if c["category"] == cat]
            if indices:
                doc_ctx["compressed_by_category"][out_name] = compress_docx_tables(
                    doc_path, table_indices=indices
                )
    else:
        # PDF: compress → structured tables → classify
        compressed_text = compress_spatial_text(doc_path, refine_headers=True)
        structured = compress_spatial_text_structured(doc_path)
        tuples = [t.to_compressed() for t in structured]
        classes = classify_tables(tuples, ctx["categories"]) if tuples else []

        for out_name, spec in ctx["output_specs"].items():
            cat = spec["category"]
            matched = [c for c in classes if c["category"] == cat]
            if matched:
                # For PDF, store the full compressed text
                doc_ctx["compressed_by_category"][out_name] = compressed_text
                doc_ctx["pdf_path"] = doc_path

    cats_found = list(doc_ctx["compressed_by_category"].keys())
    print(f"  {Path(doc_path).name[:50]}: categories={cats_found}")
    return doc_ctx


def _apply_enrichment(df, enrichment, doc_ctx, meta=None):
    """Apply enrichment columns (title, report_date, constant) to a DataFrame."""
    for col_name, spec in enrichment.items():
        src = spec["source"]
        if src == "title" and meta:
            df[col_name] = meta.get("title") or "Unknown"
        elif src == "report_date":
            val = doc_ctx.get("report_date", "")
            if "suffix" in spec:
                val += spec["suffix"]
            df[col_name] = val
        elif src == "constant":
            df[col_name] = spec["value"]
    return df


def _apply_formatting(df, col_specs):
    """Apply column-level format and filter transformations."""
    for col_spec in col_specs:
        cn = col_spec["name"]
        # Format
        fmt = col_spec.get("format")
        if fmt and cn in df.columns:
            if fmt == "lowercase":
                df[cn] = df[cn].astype(str).str.lower()
            elif fmt == "uppercase":
                df[cn] = df[cn].astype(str).str.upper()
            elif fmt == "titlecase":
                df[cn] = df[cn].astype(str).str.title()
        # Filter
        filt = col_spec.get("filter")
        if filt and filt != "all" and cn in df.columns:
            if filt == "latest":
                df = df[df[cn] == df[cn].max()]
            elif filt == "earliest":
                df = df[df[cn] == df[cn].min()]
    return df


async def _transform_output_async(ctx, doc_ctx, out_name, data):
    """Async: interpret one output category. Returns (out_name, DataFrame)."""
    schema = ctx["schemas"][out_name]
    spec = ctx["output_specs"][out_name]
    doc_path = doc_ctx["doc_path"]
    is_docx = doc_path.lower().endswith((".docx", ".doc"))

    # Resolve dynamic pivot aliases
    for col in schema.columns:
        col_spec = next(
            (c for c in spec["schema"]["columns"] if c["name"] == col.name), None
        )
        if col_spec and col_spec.get("dynamic_aliases") == "pivot":
            if is_docx and isinstance(data, list):
                pivot_vals = extract_pivot_values(data[0][0])
                col.aliases = pivot_vals[-2:]

    if is_docx and isinstance(data, list):
        # DOCX: multiple independent tables → native BAML async
        texts = [md for md, _ in data]
        mapped_list = await interpret_tables_async(texts, schema, model=ctx["model"])
        frames = []
        for (md, meta), mapped in zip(data, mapped_list):
            df = to_pandas(mapped, schema)
            df = _apply_enrichment(df, ctx["enrichment"][out_name], doc_ctx, meta)
            frames.append(df)
        df_out = pd.concat(frames, ignore_index=True)
    else:
        # PDF: split pages, call BAML async directly (no sync wrapper, no extra threads)
        pages = _split_pages(data, "\f")
        result = await _interpret_pages_batched_async(
            pages, schema, model=ctx["model"],
        )
        df_out = to_pandas(result, schema)
        df_out = _apply_enrichment(df_out, ctx["enrichment"][out_name], doc_ctx)

    df_out = _apply_formatting(df_out, spec["schema"]["columns"])

    # Reorder columns to match contract
    col_order = [c["name"] for c in spec["schema"]["columns"] if c["name"] in df_out.columns]
    return out_name, df_out[col_order]


async def transform_async(ctx, doc_ctx):
    """Async: interpret ALL output categories concurrently."""
    # Launch all categories in parallel (e.g. harvest + planting at the same time)
    tasks = [
        _transform_output_async(ctx, doc_ctx, out_name, data)
        for out_name, data in doc_ctx["compressed_by_category"].items()
    ]
    results = await asyncio.gather(*tasks)
    doc_ctx["dataframes"] = dict(results)
    return doc_ctx


def save(results, output_dir="outputs"):
    """Merge DataFrames across documents, write each output to Parquet."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    merged = {}

    for doc_ctx in results:
        for out_name, df in doc_ctx.get("dataframes", {}).items():
            if out_name not in merged:
                merged[out_name] = []
            merged[out_name].append(df)

    paths = {}
    for out_name, frames in merged.items():
        df = pd.concat(frames, ignore_index=True)
        path = output_dir / f"{out_name}.parquet"
        df.to_parquet(path, index=False)
        paths[out_name] = path
        print(f"  {out_name}: {path} ({len(df)} rows)")

    return merged, paths


async def run_pipeline_async(contract_path, doc_paths, output_dir="outputs"):
    """Orchestrate full pipeline with document-level concurrency."""
    t0 = time.perf_counter()

    print("PREPARE")
    ctx = prepare(contract_path)

    print("\nFETCH")
    doc_ctxs = [fetch(ctx, p) for p in doc_paths]

    print("\nTRANSFORM (async)")
    results = await asyncio.gather(
        *[transform_async(ctx, dc) for dc in doc_ctxs]
    )

    print("\nSAVE")
    merged, paths = save(results, output_dir)

    elapsed = time.perf_counter() - t0
    print(f"\nDone in {elapsed:.1f}s")
    return results, merged, paths, elapsed


print("Pipeline functions defined: prepare -> fetch -> transform_async -> save")

---
## Use Case 1: Russian Agricultural DOCX Reports

Multi-category extraction from Russian Ministry of Agriculture weekly grain reports.
Each DOCX contains harvest, planting, and export tables. The contract classifies
tables by keywords and extracts crop names from table titles, report dates from
filenames, and year values from dynamic pivot headers.

In [None]:
# Load the Russian agricultural contract
ru_ctx = prepare("contracts/ru_ag_ministry.yaml")

print("\nSchema summary:")
for name, schema in ru_ctx["schemas"].items():
    cols = [c.name for c in schema.columns]
    enrich = list(ru_ctx["enrichment"][name].keys())
    print(f"  {name}: LLM cols={cols}, enriched={enrich}")

In [None]:
# Run pipeline on the June DOCX
june_path = "inputs/docx/input/2025-06-24_11-58-45.Russian weekly grain EOW June 20-21 2025-1.docx"

results_june, merged_june, _, elapsed_june = await run_pipeline_async(
    "contracts/ru_ag_ministry.yaml", [june_path], output_dir="outputs/ru_june"
)

In [None]:
# Side-by-side: DOCX page | pipe-table | DataFrame (first available category)
try:
    img_b64, _ = render_document_page(june_path, page=0, dpi=120)
except Exception:
    img_b64 = None  # LibreOffice not available

# Get the first available category from this document
june_doc_ctx = results_june[0]
first_cat = next(iter(june_doc_ctx["compressed_by_category"]), None)

if first_cat:
    cat_data = june_doc_ctx["compressed_by_category"][first_cat]
    sample_md = cat_data[0][0]
    df_display = june_doc_ctx["dataframes"][first_cat]
    print(f"Showing category: {first_cat} ({len(df_display)} rows)")
    side_by_side_display(image_b64=img_b64, compressed_text=sample_md, dataframe=df_display)
else:
    print("No tables classified for this document.")

In [None]:
# Display output DataFrames
for name, frames in merged_june.items():
    df = frames[0] if isinstance(frames, list) else frames
    print(f"=== {name.upper()} ({len(df)} rows) ===")
    display(df.head(10))
    print()

In [None]:
# Run pipeline on the July DOCX — same contract, different document
july_path = "inputs/docx/input/2025-07-17_10-16-25.Russian weekly grain EOW July 11-12 2025-1.docx"

results_july, merged_july, _, elapsed_july = await run_pipeline_async(
    "contracts/ru_ag_ministry.yaml", [july_path], output_dir="outputs/ru_july"
)

print(f"\nJune: {elapsed_june:.1f}s, July: {elapsed_july:.1f}s")
for name in merged_july:
    df = merged_july[name][0] if isinstance(merged_july[name], list) else merged_july[name]
    print(f"  {name}: {len(df)} rows")

---
## Use Case 2: Australian Shipping Stems — XXL

**One canonical model, six providers, full concurrency.**

6 shipping stem PDFs from 6 different providers — each expresses the same semantic data
(vessel name, port, commodity, tonnage, ETA) with completely different layouts, column
names, and formatting. A single canonical schema with rich aliases normalizes them all
into one unified DataFrame.

In [None]:
# Load the shipping stem contract
ship_ctx = prepare("contracts/au_shipping_stem.yaml")

print("\nSchema columns with aliases:")
for col in ship_ctx["schemas"]["vessels"].columns:
    print(f"  {col.name:15s} {col.type:6s} aliases={col.aliases}")

In [None]:
# List all 6 PDFs
shipping_pdfs = {
    "Newcastle":  "inputs/2857439.pdf",
    "Bunge":      "inputs/Bunge_loadingstatement_2025-09-25.pdf",
    "CBH":        "inputs/CBH Shipping Stem 26092025.pdf",
    "GrainCorp":  "inputs/shipping-stem-2025-11-13.pdf",
    "Riordan":    "inputs/shipping_stem-accc-30092025-1.pdf",
    "Queensland": "inputs/document (1).pdf",
}

import fitz
print(f"{'Provider':<14s} {'Filename':<50s} {'Pages':>5s}")
print("-" * 72)
for provider, path in shipping_pdfs.items():
    doc = fitz.open(path)
    pages = len(doc)
    doc.close()
    print(f"{provider:<14s} {Path(path).name:<50s} {pages:>5d}")

In [None]:
# Run pipeline on ALL 6 PDFs concurrently
results_ship, merged_ship, paths_ship, elapsed_ship = await run_pipeline_async(
    "contracts/au_shipping_stem.yaml",
    list(shipping_pdfs.values()),
    output_dir="outputs/shipping",
)

In [None]:
# Per-document summary
print(f"{'Provider':<14s} {'Records':>8s}")
print("-" * 24)
total = 0
for (provider, _), doc_ctx in zip(shipping_pdfs.items(), results_ship):
    n = sum(len(df) for df in doc_ctx.get("dataframes", {}).values())
    total += n
    print(f"{provider:<14s} {n:>8d}")
print("-" * 24)
print(f"{'TOTAL':<14s} {total:>8d}")
print(f"\nWall-clock time: {elapsed_ship:.1f}s")

In [None]:
# Side-by-side gallery — one representative page per provider
for provider, path in list(shipping_pdfs.items())[:3]:
    img_b64, _ = render_document_page(path, page=0, dpi=100)
    compressed = compress_spatial_text(path, refine_headers=False)
    # Truncate compressed text for display
    lines = compressed.splitlines()[:25]
    truncated = "\n".join(lines) + "\n..."
    print(f"\n--- {provider} ---")
    side_by_side_display(image_b64=img_b64, compressed_text=truncated, max_height=350)

In [None]:
# Unified DataFrame — all providers in one schema
df_vessel_frames = merged_ship["vessels"]
df_all = pd.concat(df_vessel_frames, ignore_index=True) if isinstance(df_vessel_frames, list) else df_vessel_frames
print(f"Unified DataFrame: {df_all.shape}")
display(df_all.head(20))

In [None]:
# Distribution by provider (inferred from source document)
# Add source_provider from doc_ctx ordering
provider_frames = []
for (provider, _), doc_ctx in zip(shipping_pdfs.items(), results_ship):
    for df in doc_ctx.get("dataframes", {}).values():
        dfp = df.copy()
        dfp["source_provider"] = provider
        provider_frames.append(dfp)

if provider_frames:
    df_with_provider = pd.concat(provider_frames, ignore_index=True)
    print("Records by provider:")
    print(df_with_provider.groupby("source_provider").size().to_string())

---
## Use Case 3: ACEA Car Registrations

**Normalization: pivoted table to flat records.**

The ACEA press release PDF contains a dense pivoted table: 28 countries x 7 power types
x 3 metrics. The pipeline unpivots this into flat records — a 23-column wide table
becomes a 4-column long DataFrame. Vision-enabled interpretation handles the dense
hierarchical headers.

In [None]:
# Load the ACEA contract
acea_ctx = prepare("contracts/acea_car_registrations.yaml")

print("\nSchema:")
for col in acea_ctx["schemas"]["registrations_by_market"].columns:
    print(f"  {col.name:25s} {col.type:6s} {col.description}")

In [None]:
# Run pipeline on ACEA press release PDF (vision-enabled)
acea_pdf = "inputs/Press_release_car_registrations_December_2025.pdf"

results_acea, merged_acea, _, elapsed_acea = await run_pipeline_async(
    "contracts/acea_car_registrations.yaml",
    [acea_pdf],
    output_dir="outputs/acea",
)

In [None]:
# Side-by-side: PDF | compressed pivot table | unpivoted DataFrame
img_b64, _ = render_document_page(acea_pdf, page=0, dpi=120)
acea_compressed = compress_spatial_text(acea_pdf, refine_headers=False)
acea_df = list(merged_acea.values())[0]
if isinstance(acea_df, list):
    acea_df = acea_df[0]

# Show first page of compressed text
first_page = acea_compressed.split("\f")[0]
side_by_side_display(image_b64=img_b64, compressed_text=first_page, dataframe=acea_df)

In [None]:
# Display unpivoted DataFrame
print(f"Shape: {acea_df.shape} (from wide pivoted table to long flat records)")
display(acea_df.head(20))

In [None]:
# Round-trip verification: pivot back to wide format
if "country" in acea_df.columns and "car_motorization" in acea_df.columns:
    try:
        pivot = acea_df.pivot_table(
            index="country",
            columns="car_motorization",
            values="new_car_registration",
            aggfunc="sum",
        )
        print(f"Pivoted back: {pivot.shape} (countries x power types)")
        display(pivot.head(10))
    except Exception as e:
        print(f"Pivot failed: {e}")
else:
    print("Columns not available for pivot")

---
## Summary

| Use Case | Format | Documents | Elapsed | Key Feature |
|----------|--------|-----------|---------|-------------|
| Russian Agriculture | DOCX | 2 reports | see above | Multi-category, pivot years, enrichment |
| Australian Shipping | PDF | 6 providers | see above | One schema, 6 layouts, full concurrency |
| ACEA Registrations | PDF | 1 press release | see above | Pivoted → flat normalization |

In [None]:
# Summary statistics
total_docs = 2 + 6 + 1  # June + July + 6 shipping + 1 ACEA
total_records = 0
for m in [merged_june, merged_july, merged_ship, merged_acea]:
    for v in m.values():
        df = v[0] if isinstance(v, list) else v
        total_records += len(df)

total_time = elapsed_june + elapsed_july + elapsed_ship + elapsed_acea

print(f"Total documents processed: {total_docs}")
print(f"Total records extracted:   {total_records:,}")
print(f"Total wall-clock time:     {total_time:.1f}s")
print(f"Throughput:                {total_records / total_time:.0f} records/s")