# Step 1: Data Ingestion Benchmark

**Research Question:** How does ingesting 2.9 GB of real Medicaid claims data compare across a traditional multi-database stack (PostgreSQL + DuckDB + Neo4j) vs. AXYM's unified platform?

## What We're Measuring

| Metric | Description |
|--------|-------------|
| Wall clock time | End-to-end ingestion time per system |
| CPU utilization | User + system CPU seconds consumed |
| Peak memory | Maximum RSS during ingestion |
| Disk footprint | Storage consumed by each database |
| Operational complexity | Scripts, languages, containers, ETL pipelines required |

## Dataset

**CMS Medicaid Provider Utilization & Spending** — aggregated provider-level claims from HHS/CMS.
- ~89M rows of provider billing, procedure codes, beneficiary counts, and payment amounts
- Source: Parquet format (2.9 GB)
- Columns: `Billing_Provider_NPI`, `Servicing_Provider_NPI`, `HCPCS_Code`, `Claim_From_Month`, `Total_Unique_Beneficiaries`, `Total_Claims`, `Total_Paid`

In [None]:
import sys
sys.path.insert(0, '..')

import platform
import json
from pathlib import Path

import pandas as pd
import pyarrow.parquet as pq
import psutil
import matplotlib.pyplot as plt

from config.settings import PARQUET_PATH, RESULTS_DIR
from lib.metrics import BenchmarkResult
from lib.report import (
    comparison_table,
    bar_chart_wall_time,
    bar_chart_disk_footprint,
    stacked_bar_traditional_vs_axym,
    complexity_summary,
)

%matplotlib inline

## 1. Environment

In [None]:
print(f"Platform:  {platform.platform()}")
print(f"Python:    {platform.python_version()}")
print(f"CPU:       {platform.processor()} ({psutil.cpu_count(logical=False)} cores, {psutil.cpu_count()} threads)")
print(f"RAM:       {psutil.virtual_memory().total / (1024**3):.1f} GB")
print(f"Disk free: {psutil.disk_usage('/').free / (1024**3):.1f} GB")

## 2. Data Exploration

In [None]:
if PARQUET_PATH.exists():
    pf = pq.ParquetFile(PARQUET_PATH)
    print(f"File:    {PARQUET_PATH.name}")
    print(f"Size:    {PARQUET_PATH.stat().st_size / (1024**3):.2f} GB")
    print(f"Rows:    {pf.metadata.num_rows:,}")
    print(f"Columns: {pf.metadata.num_columns}")
    print(f"Row groups: {pf.metadata.num_row_groups}")
    print()
    print("Schema:")
    print(pf.schema_arrow)
else:
    print(f"Parquet file not found at {PARQUET_PATH}")
    print("Run `make setup` to download the data.")

In [None]:
if PARQUET_PATH.exists():
    # Read a small sample for exploration
    sample = pf.read_row_group(0).to_pandas().head(10)
    display(sample)
    print(f"\nDtypes:")
    print(sample.dtypes)

In [None]:
if PARQUET_PATH.exists():
    # Cardinality of key columns
    sample_large = pf.read_row_group(0).to_pandas()
    print("Cardinality (first row group):")
    for col in ["Billing_Provider_NPI", "Servicing_Provider_NPI", "HCPCS_Code"]:
        print(f"  {col}: {sample_large[col].nunique():,} unique values")
    print(f"  Claim_From_Month range: {sample_large['Claim_From_Month'].min()} to {sample_large['Claim_From_Month'].max()}")

## 3. Traditional Stack — Individual Ingestions

Results are loaded from JSON files produced by `make benchmark`. Each ingestion script measures wall time, CPU, memory, disk, and row count.

In [None]:
def load_result(name: str) -> BenchmarkResult | None:
    """Load a benchmark result from JSON, or return None."""
    path = RESULTS_DIR / f"ingest_{name}.json"
    if path.exists():
        return BenchmarkResult.load(path)
    print(f"Result not found: {path}")
    print("Run `make benchmark` to generate results.")
    return None

pg_result = load_result("postgres")
duck_result = load_result("duckdb")
neo4j_result = load_result("neo4j")
axym_result = load_result("axym")

### 3a. PostgreSQL

Ingestion method: Read Parquet in batches via PyArrow → convert to CSV in memory → `COPY` protocol via psycopg3. Indexes on NPI, HCPCS, and date columns are built during ingestion (included in timing).

In [None]:
if pg_result:
    print(f"PostgreSQL Ingestion")
    print(f"  Rows:      {pg_result.row_count:,}")
    print(f"  Wall time: {pg_result.wall_time_seconds:.1f}s")
    print(f"  CPU:       {pg_result.cpu_user_seconds:.1f}s user, {pg_result.cpu_system_seconds:.1f}s sys")
    print(f"  Memory:    {pg_result.peak_memory_mb:,.0f} MB peak")
    print(f"  Disk:      {pg_result.disk_mb:,.0f} MB")
    print(f"  Throughput:{pg_result.rows_per_second:,.0f} rows/sec")
    if pg_result.error:
        print(f"  ERROR: {pg_result.error}")

### 3b. DuckDB

Ingestion method: Single SQL statement — `CREATE TABLE AS SELECT * FROM read_parquet(...)`. DuckDB reads Parquet natively with zero ETL.

In [None]:
if duck_result:
    print(f"DuckDB Ingestion")
    print(f"  Rows:      {duck_result.row_count:,}")
    print(f"  Wall time: {duck_result.wall_time_seconds:.1f}s")
    print(f"  CPU:       {duck_result.cpu_user_seconds:.1f}s user, {duck_result.cpu_system_seconds:.1f}s sys")
    print(f"  Memory:    {duck_result.peak_memory_mb:,.0f} MB peak")
    print(f"  Disk:      {duck_result.disk_mb:,.0f} MB")
    print(f"  Throughput:{duck_result.rows_per_second:,.0f} rows/sec")
    if duck_result.error:
        print(f"  ERROR: {duck_result.error}")

### 3c. Neo4j

Ingestion method: Multi-phase ETL pipeline:
1. Extract distinct Provider nodes (union of billing + servicing NPIs)
2. Extract distinct Procedure nodes (unique HCPCS codes)
3. Build relationship CSVs (BILLED_FOR, REFERRED_TO edges)
4. Copy CSVs to Neo4j import volume
5. `LOAD CSV` with periodic commits

The ETL overhead — transforming tabular data into graph structure — is a key finding about impedance mismatch.

In [None]:
if neo4j_result:
    print(f"Neo4j Ingestion")
    print(f"  Nodes+Rels: {neo4j_result.row_count:,}")
    print(f"  Wall time:  {neo4j_result.wall_time_seconds:.1f}s")
    print(f"  CPU:        {neo4j_result.cpu_user_seconds:.1f}s user, {neo4j_result.cpu_system_seconds:.1f}s sys")
    print(f"  Memory:     {neo4j_result.peak_memory_mb:,.0f} MB peak")
    print(f"  Disk:       {neo4j_result.disk_mb:,.0f} MB")
    if neo4j_result.error:
        print(f"  ERROR: {neo4j_result.error}")
    
    if neo4j_result.metadata.get("timings"):
        print(f"\n  ETL Phase Breakdown:")
        for phase, secs in neo4j_result.metadata["timings"].items():
            print(f"    {phase}: {secs:.1f}s")
    
    if neo4j_result.metadata.get("counts"):
        print(f"\n  Graph Counts:")
        for k, v in neo4j_result.metadata["counts"].items():
            print(f"    {k}: {v:,}")

## 4. Traditional Stack — Aggregate

To run the "traditional" Medicaid fraud detection pipeline, you need **all three** databases. The combined cost is the sum of individual ingestion times, disk footprints, and operational complexity.

In [None]:
traditional = [r for r in [pg_result, duck_result, neo4j_result] if r and not r.error]

if traditional:
    total_wall = sum(r.wall_time_seconds for r in traditional)
    total_disk = sum(r.disk_mb for r in traditional)
    print(f"Traditional Stack Total:")
    print(f"  Combined wall time: {total_wall:.1f}s")
    print(f"  Combined disk:      {total_disk:,.0f} MB")
    print(f"  Systems:            {len(traditional)}")
    print(f"  Scripts required:   3 (each with different driver/protocol)")
    print(f"  Query languages:    2 (SQL + Cypher)")
    print(f"  Docker containers:  2 (PostgreSQL + Neo4j)")
    print(f"  ETL pipelines:      1 (tabular → graph for Neo4j)")
else:
    print("No results available. Run `make benchmark` first.")

## 5. AXYM

AXYM aims to replace this entire stack with a single unified platform. One ingestion command, one query language, zero ETL.

In [None]:
if axym_result and not axym_result.error:
    print(f"AXYM Ingestion")
    print(f"  Rows:      {axym_result.row_count:,}")
    print(f"  Wall time: {axym_result.wall_time_seconds:.1f}s")
    print(f"  Disk:      {axym_result.disk_mb:,.0f} MB")
    print(f"  Throughput:{axym_result.rows_per_second:,.0f} rows/sec")
else:
    print("AXYM CLI is not yet available.")
    print("Results will be added once the CLI ships.")
    print("")
    print("Expected interface:")
    print("  axym ingest medicaid-provider-spending.parquet")
    print("  → Automatically creates tables, indexes, graph structure, and embeddings")
    print("  → Single command, zero ETL, zero configuration")

## 6. Side-by-Side Comparison

In [None]:
all_results = [r for r in [pg_result, duck_result, neo4j_result] if r and not r.error]

if all_results:
    display(comparison_table(all_results))

In [None]:
if all_results:
    bar_chart_wall_time(all_results)
    plt.show()

In [None]:
if all_results:
    bar_chart_disk_footprint(all_results)
    plt.show()

In [None]:
if traditional:
    stacked_bar_traditional_vs_axym(traditional, axym_result)
    plt.show()

In [None]:
if traditional:
    display(complexity_summary(traditional))

## 7. Key Findings

### What the numbers show

1. **DuckDB is fastest for pure tabular ingestion** — native Parquet support means zero conversion overhead. A single SQL statement loads the entire dataset.

2. **PostgreSQL adds conversion overhead** — Parquet → CSV-in-memory → COPY protocol. This is the realistic scenario: PostgreSQL doesn't read Parquet natively, so an ETL step is unavoidable.

3. **Neo4j has the highest cost** — not because the database is slow, but because tabular data must be **transformed** into a graph structure before loading. The ETL phases (extract nodes, build edges, write CSVs) add significant wall time.

4. **The traditional stack requires all three** — for a fraud detection pipeline you need relational queries (PostgreSQL), analytical queries (DuckDB), and graph traversal (Neo4j). The combined cost is the sum.

### Implications for Steps 2–6

- **Step 2 (Querying)**: Three different query languages and APIs to maintain
- **Step 3 (Graph Traversal)**: Only Neo4j handles this — the ETL tax is paid at ingestion
- **Step 4 (Embeddings)**: Requires yet another system (pgvector or standalone)
- **Step 5 (RAG)**: Orchestrating across all systems for a single query
- **Step 6 (Full Pipeline)**: The operational complexity multiplies at each step

## 8. Reproducibility

### Data
- Source: CMS Medicaid Provider Utilization & Spending
- Format: Parquet (2.9 GB)
- SHA256: `a998e5ae11a391f1eb0d8464b3866a3ee7fe18aa13e56d411c50e72e3a0e35c7`

### Reproduce
```bash
git clone <repo-url> axym-research && cd axym-research
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
make setup      # ~10 min (2.9 GB download)
make benchmark  # ~30-60 min depending on hardware
make notebook   # View results
```

### Methodology
- Cold start: tables truncated / databases deleted between runs
- Docker memory limits: 4 GB per container
- Ingestion timing includes index creation (fair comparison)
- Neo4j ETL phases timed individually
- Results persisted as JSON in `results/` for inspection without re-running