In [1]:
from pathlib import Path

# --- INPUTS (point to your BIG files) ---
STATE = "GA"
RATES_PARQ_BIG = Path(r"C:\Users\ChristopherCato\OneDrive - clarity-dx.com\code\bph\mrf-etl\data\raw\rates_uhc.parquet")
PROV_PARQ_BIG  = Path(r"C:\Users\ChristopherCato\OneDrive - clarity-dx.com\code\bph\mrf-etl\prod_etl\data\input\prov_uhc.parquet")

# --- TEST SAMPLING ---
TEST_MODE = True
SAMPLE_ROWS_RATES = 100_000   # ↓ adjust down if still heavy (e.g., 20_000)
SAMPLE_ROWS_PROV  = 50_000

# Optional: restrict to a month or simple filter (speeds up a lot if column exists)
# e.g., WHERE negotiated_rate IS NOT NULL
RATES_WHERE = "negotiated_rate IS NOT NULL"
PROV_WHERE  = ""  # often okay empty

# Optional: restrict column set even more for the test
RATES_COLS = [
    "last_updated_on","reporting_entity_name","version",
    "billing_class","billing_code_type","billing_code",
    "service_codes","negotiated_type","negotiation_arrangement",
    "negotiated_rate","expiration_date","description","name",
    "provider_reference_id","provider_group_id","provider_group_id_raw"
]
PROV_COLS = [
    "last_updated_on","reporting_entity_name","version",
    "provider_group_id","provider_reference_id",
    "npi","tin_type","tin_value"
]

# --- OUTPUTS (test sandbox) ---
TEST_ROOT = Path("core/test_sandbox")
TEST_ROOT.mkdir(parents=True, exist_ok=True)
RATES_SAMPLE = TEST_ROOT / "rates_sample.parquet"
PROV_SAMPLE  = TEST_ROOT / "prov_sample.parquet"

# --- PIPELINE SPEED TOGGLES ---
FAST_SKIP_POS = True      # True = skip parsing service_codes/pos_set (huge win)
FAST_SKIP_XREFS = False   # True = skip building NPI/TIN xrefs (if providers file is huge)


cell 2

In [3]:
import duckdb, os
from pathlib import Path

if not RATES_PARQ_BIG.exists():
    raise FileNotFoundError(RATES_PARQ_BIG)
if not PROV_PARQ_BIG.exists():
    raise FileNotFoundError(PROV_PARQ_BIG)

con = duckdb.connect()

def list_columns_for_query(con, path: Path) -> list[str]:
    # DESCRIBE the relation to get its column names
    path_sql = str(path).replace("'", "''")
    cols_df = con.execute(f"DESCRIBE SELECT * FROM read_parquet('{path_sql}')").fetchdf()
    # DuckDB returns 'column_name' in DESCRIBE output
    return cols_df["column_name"].tolist()

def build_select_list(avail_cols: list[str], desired_cols: list[str]) -> str:
    cols = [c for c in desired_cols if c in avail_cols]
    if len(cols) == 0:
        return "*"  # fallback if none of the desired columns exist
    return ", ".join([f'"{c}"' for c in cols])

def can_apply_where(avail_cols: list[str], where_expr: str) -> bool:
    # very simple guard: only allow our default negotiated_rate filter if that column exists
    if not where_expr.strip():
        return True
    # tune this if you add more complex WHEREs
    needed = {"negotiated_rate"}
    return needed.issubset(set(avail_cols))

def copy_sample(in_path: Path, out_path: Path, desired_cols: list[str], where_expr: str, sample_rows: int):
    path_sql = str(in_path).replace("'", "''")
    out_sql  = str(out_path).replace("'", "''")

    avail = list_columns_for_query(con, in_path)
    sel = build_select_list(avail, desired_cols)
    where_sql = f"WHERE {where_expr}" if (where_expr.strip() and can_apply_where(avail, where_expr)) else ""
    sample_sql = f"USING SAMPLE reservoir({sample_rows} rows)"

    con.execute(f"""
      COPY (
        SELECT {sel}
        FROM read_parquet('{path_sql}')
        {where_sql}
        {sample_sql}
      ) TO '{out_sql}' (FORMAT PARQUET, COMPRESSION ZSTD);
    """)

# ---- Build samples safely ----
copy_sample(RATES_PARQ_BIG, RATES_SAMPLE, RATES_COLS, RATES_WHERE, SAMPLE_ROWS_RATES)
copy_sample(PROV_PARQ_BIG,  PROV_SAMPLE,  PROV_COLS,  PROV_WHERE,  SAMPLE_ROWS_PROV)

print("Wrote samples:")
print("  ", RATES_SAMPLE)
print("  ", PROV_SAMPLE)
con.close()


Wrote samples:
   core\test_sandbox\rates_sample.parquet
   core\test_sandbox\prov_sample.parquet


cell 3

In [4]:
import re, hashlib, json
import polars as pl
import duckdb
from datetime import datetime

def md5(s: str) -> str:
    return hashlib.md5(s.encode("utf-8")).hexdigest()

def slugify(s: str) -> str:
    if s is None: return ""
    s = s.lower()
    s = re.sub(r"[^a-z0-9]+", "-", s).strip("-")
    s = re.sub(r"-+", "-", s)
    return s

def _co(x): return "" if x is None else str(x)

PAYER_SLUG_OVERRIDE = None
def payer_slug_from_name(name: str) -> str:
    return PAYER_SLUG_OVERRIDE or slugify(name or "")

def normalize_yymm(date_str: str | None) -> str:
    if not date_str: return ""
    for fmt in ("%Y-%m-%d","%Y/%m/%d","%Y-%m","%Y/%m","%Y%m%d","%Y%m"):
        try:
            dt = datetime.strptime(date_str[:len(re.sub(r'[^Ymd]', '', fmt))], fmt)
            return dt.strftime("%Y-%m")
        except Exception:
            pass
    m = re.search(r"(20\d{2})[-/](0[1-9]|1[0-2])", date_str)
    return f"{m.group(1)}-{m.group(2)}" if m else ""

def normalize_service_codes(svc) -> list[str]:
    if svc is None: return []
    if isinstance(svc, (list, tuple)):
        vals = ["" if v is None else str(v) for v in svc]
    else:
        s = str(svc)
        if s.startswith("[") and s.endswith("]"):
            try:
                parsed = json.loads(s)
                vals = ["" if v is None else str(v) for v in parsed] if isinstance(parsed, list) else re.split(r"[;,|\s]+", s)
            except Exception:
                vals = re.split(r"[;,|\s]+", s)
        else:
            vals = re.split(r"[;,|\s]+", s)
    cleaned = [str(v).strip() for v in vals if str(v).strip()]
    return sorted(set(cleaned))

def pos_set_id_from_members(members) -> str:
    if members is None:
        return md5("none")
    try:
        n = len(members)
    except Exception:
        members = [str(members)]
        n = 1
    if n == 0:
        return md5("none")
    parts = ["" if m is None else str(m) for m in members]
    return md5("|".join(parts))

def pg_uid_from_parts(payer_slug: str, version: str | None, pgid: str | None, pref: str | None) -> str:
    key = f"{_co(payer_slug)}|{_co(version)}|{_co(pgid)}|{_co(pref)}"
    return md5(key)

def fact_uid_from_struct(s: dict) -> str:
    try:
        rate_str = f"{float(s.get('negotiated_rate')):.4f}" if s.get('negotiated_rate') is not None else ""
    except Exception:
        rate_str = ""
    parts = [
        _co(s.get("state")),
        _co(s.get("year_month")),
        _co(s.get("payer_slug")),
        _co(s.get("billing_class")),
        _co(s.get("code_type")),
        _co(s.get("code")),
        _co(s.get("pg_uid")),
        _co(s.get("pos_set_id")),
        _co(s.get("negotiated_type")),
        _co(s.get("negotiation_arrangement")),
        _co(s.get("expiration_date")),
        rate_str,
        _co(s.get("provider_group_id_raw")),
    ]
    return md5("|".join(parts))

def read_parquet_safely(path: Path, desired_cols: list[str]) -> pl.DataFrame:
    lf = pl.scan_parquet(str(path))
    avail = set(lf.columns)
    use_cols = [c for c in desired_cols if c in avail]
    df = lf.select(use_cols).collect()
    missing = [c for c in desired_cols if c not in df.columns]
    if missing:
        df = df.with_columns([pl.lit(None).alias(c) for c in missing])
    return df


cell 4

In [5]:
rates = read_parquet_safely(RATES_SAMPLE, RATES_COLS)
prov  = read_parquet_safely(PROV_SAMPLE,  PROV_COLS)
print("rates_sample rows:", rates.height, "cols:", len(rates.columns))
print("prov_sample  rows:", prov.height, "cols:", len(prov.columns))


  avail = set(lf.columns)


rates_sample rows: 100000 cols: 16
prov_sample  rows: 17855 cols: 8


cell 5

In [6]:
# payer + year_month
base = (
    rates
    .with_columns([
        pl.col("reporting_entity_name").fill_null("").alias("ren"),
        pl.col("version").fill_null("").alias("ver"),
        pl.col("last_updated_on").fill_null("").alias("luo"),
    ])
    .with_columns([
        pl.col("ren").map_elements(payer_slug_from_name, return_dtype=pl.Utf8).alias("payer_slug"),
        pl.col("luo").map_elements(normalize_yymm, return_dtype=pl.Utf8).alias("year_month"),
    ])
)

# POS set (fast toggle)
if FAST_SKIP_POS or "service_codes" not in base.columns:
    base = base.with_columns([
        pl.lit([]).alias("pos_members"),
        pl.lit(md5("none")).alias("pos_set_id"),
    ])
else:
    base = (
        base
        .with_columns(
            pl.col("service_codes").map_elements(normalize_service_codes, return_dtype=pl.List(pl.Utf8)).alias("pos_members")
        )
        .with_columns(
            pl.col("pos_members").map_elements(pos_set_id_from_members, return_dtype=pl.Utf8).alias("pos_set_id")
        )
    )

# Provider group UID
base = base.with_columns(
    pl.struct(["payer_slug","version","provider_group_id","provider_reference_id"])
      .map_elements(lambda s: pg_uid_from_parts(s["payer_slug"], s["version"], s["provider_group_id"], s["provider_reference_id"]),
                    return_dtype=pl.Utf8)
      .alias("pg_uid")
)

# ---- dims/xrefs candidates ----
dim_code_new = (
    base.select([
        pl.col("billing_code_type").alias("code_type"),
        pl.col("billing_code").cast(pl.Utf8).alias("code"),
        pl.col("description").alias("code_description"),
        pl.col("name").alias("code_name"),
    ]).drop_nulls(subset=["code_type","code"]).unique()
)

dim_payer_new = (
    base.select([
        pl.col("payer_slug"),
        pl.col("reporting_entity_name").alias("reporting_entity_name"),
        pl.col("version").alias("version"),
    ]).drop_nulls(subset=["payer_slug"]).unique()
)

dim_pg_new = (
    base.select([
        pl.col("pg_uid"),
        pl.col("payer_slug"),
        pl.coalesce([pl.col("provider_group_id"), pl.col("provider_reference_id")]).alias("provider_group_id_raw"),
        pl.col("version"),
    ]).drop_nulls(subset=["pg_uid"]).unique()
)

dim_pos_new = base.select(["pos_set_id","pos_members"]).drop_nulls(subset=["pos_set_id"]).unique()

# XREFs (optional skip if too heavy)
if FAST_SKIP_XREFS:
    xref_pg_npi_new = pl.DataFrame({"pg_uid": [], "npi": []})
    xref_pg_tin_new = pl.DataFrame({"pg_uid": [], "tin_type": [], "tin_value": []})
else:
    prov_aug = (
        prov
          .with_columns([
              pl.col("reporting_entity_name").fill_null("").alias("ren"),
              pl.col("version").fill_null("").alias("ver"),
          ])
          .with_columns(pl.col("ren").map_elements(payer_slug_from_name, return_dtype=pl.Utf8).alias("payer_slug"))
          .with_columns(
              pl.struct(["payer_slug","version","provider_group_id","provider_reference_id"])
                .map_elements(lambda s: pg_uid_from_parts(s["payer_slug"], s["version"], s["provider_group_id"], s["provider_reference_id"]),
                              return_dtype=pl.Utf8)
                .alias("pg_uid")
          )
    )
    xref_pg_npi_new = prov_aug.select(["pg_uid","npi"]).drop_nulls(subset=["pg_uid","npi"]).unique()
    xref_pg_tin_new = prov_aug.select(["pg_uid","tin_type","tin_value"]).drop_nulls(subset=["pg_uid","tin_value"]).unique()

print("base rows:", base.height)
print("dim_code_new:", dim_code_new.height, " dim_pg_new:", dim_pg_new.height, " dim_pos_new:", dim_pos_new.height)


base rows: 100000
dim_code_new: 3678  dim_pg_new: 245  dim_pos_new: 1


cell 6

In [7]:
from pathlib import Path
import os

TEST_DIM_DIR  = TEST_ROOT / "dims"
TEST_XREF_DIR = TEST_ROOT / "xrefs"
TEST_GOLD_DIR = TEST_ROOT / "gold"
for p in (TEST_DIM_DIR, TEST_XREF_DIR, TEST_GOLD_DIR):
    p.mkdir(parents=True, exist_ok=True)

DIM_CODE_FILE = TEST_DIM_DIR / "dim_code.parquet"
DIM_PAYER_FILE = TEST_DIM_DIR / "dim_payer.parquet"
DIM_PG_FILE = TEST_DIM_DIR / "dim_provider_group.parquet"
DIM_POS_FILE = TEST_DIM_DIR / "dim_pos_set.parquet"
XREF_PG_NPI = TEST_XREF_DIR / "xref_pg_member_npi.parquet"
XREF_PG_TIN = TEST_XREF_DIR / "xref_pg_member_tin.parquet"
GOLD_FACT_FILE = TEST_GOLD_DIR / "fact_rate.parquet"

def append_unique_parquet(df_new: pl.DataFrame, path: Path, keys: list[str]):
    path.parent.mkdir(parents=True, exist_ok=True)
    if path.exists():
        old_keys = pl.read_parquet(path, columns=keys).unique()
        to_add = df_new.join(old_keys, on=keys, how="anti")
    else:
        to_add = df_new
    if to_add.is_empty():
        return
    tmp_new = path.with_suffix(".new.parquet")
    tmp_out = path.with_suffix(".next.parquet")
    to_add.write_parquet(tmp_new, compression="zstd")
    con = duckdb.connect()
    if path.exists():
        con.execute(f"""
          COPY (
            SELECT * FROM read_parquet('{path}')
            UNION ALL
            SELECT * FROM read_parquet('{tmp_new}')
          ) TO '{tmp_out}' (FORMAT PARQUET, COMPRESSION ZSTD);
        """)
    else:
        con.execute(f"""
          COPY (SELECT * FROM read_parquet('{tmp_new}'))
          TO '{tmp_out}' (FORMAT PARQUET, COMPRESSION ZSTD);
        """)
    con.close()
    os.replace(tmp_out, path)
    os.remove(tmp_new)

append_unique_parquet(dim_code_new, DIM_CODE_FILE, keys=["code_type","code"])
append_unique_parquet(dim_payer_new, DIM_PAYER_FILE, keys=["payer_slug"])
append_unique_parquet(dim_pg_new,   DIM_PG_FILE,    keys=["pg_uid"])
append_unique_parquet(dim_pos_new,  DIM_POS_FILE,   keys=["pos_set_id"])
append_unique_parquet(xref_pg_npi_new, XREF_PG_NPI, keys=["pg_uid","npi"])
append_unique_parquet(xref_pg_tin_new, XREF_PG_TIN, keys=["pg_uid","tin_value"])

print("Test dims/xrefs written.")


Test dims/xrefs written.


cell 7

In [8]:
# Build fact_new with 'state' in key
fact_new = (
    base
      .with_columns(pl.lit(STATE).alias("state"))
      .select(
          "state",
          pl.col("year_month"),
          pl.col("payer_slug"),
          pl.col("billing_class"),
          pl.col("billing_code_type").alias("code_type"),
          pl.col("billing_code").cast(pl.Utf8).alias("code"),
          pl.col("pg_uid"),
          pl.col("pos_set_id"),
          pl.col("negotiated_type"),
          pl.col("negotiation_arrangement"),
          pl.col("negotiated_rate").cast(pl.Float64).alias("negotiated_rate"),
          pl.col("expiration_date"),
          pl.coalesce([pl.col("provider_group_id"), pl.col("provider_reference_id")]).alias("provider_group_id_raw"),
          pl.col("reporting_entity_name"),
      )
      .with_columns(
          pl.struct([
              "state","year_month","payer_slug","billing_class","code_type","code",
              "pg_uid","pos_set_id","negotiated_type","negotiation_arrangement",
              "expiration_date","negotiated_rate","provider_group_id_raw"
          ]).map_elements(fact_uid_from_struct, return_dtype=pl.Utf8).alias("fact_uid")
      )
      .select(
          "fact_uid","state","year_month","payer_slug","billing_class","code_type","code",
          "pg_uid","pos_set_id","negotiated_type","negotiation_arrangement",
          "negotiated_rate","expiration_date","provider_group_id_raw","reporting_entity_name"
      )
      .unique()
)

print("fact_new rows:", fact_new.height)
fact_new.head(3)


fact_new rows: 78514


fact_uid,state,year_month,payer_slug,billing_class,code_type,code,pg_uid,pos_set_id,negotiated_type,negotiation_arrangement,negotiated_rate,expiration_date,provider_group_id_raw,reporting_entity_name
str,str,str,str,str,str,str,str,str,str,str,f64,str,i64,str
"""6386f61dc8bbd95b8451e41f76a055…","""GA""","""2025-08""","""unitedhealthcare-of-georgia-in…","""professional""","""CPT""","""19101""","""a4384c5b4444d255c1d13fac116781…","""334c4a4c42fdb79d7ebc3e73b517e6…","""negotiated""","""ffs""",356.14,"""9999-12-31""",491,"""UnitedHealthcare of Georgia In…"
"""5e4f1c32d4513965485c46b82bdc35…","""GA""","""2025-08""","""unitedhealthcare-of-georgia-in…","""professional""","""CPT""","""75891""","""6fafb8326ef12759167a3d744e8b00…","""334c4a4c42fdb79d7ebc3e73b517e6…","""negotiated""","""ffs""",390.42,"""9999-12-31""",177,"""UnitedHealthcare of Georgia In…"
"""674af5f5954f8d5775b0349b290588…","""GA""","""2025-08""","""unitedhealthcare-of-georgia-in…","""institutional""","""CPT""","""26706""","""d860297eb9da59e1cadd7e847271f1…","""334c4a4c42fdb79d7ebc3e73b517e6…","""negotiated""","""ffs""",997.0,"""9999-12-31""",290,"""UnitedHealthcare of Georgia In…"


cell 8

In [9]:
def upsert_fact_single(fact_batch: pl.DataFrame, out_file: Path):
    if fact_batch.is_empty():
        print("No fact rows in batch.")
        return
    need_cols = {
        "fact_uid","state","year_month","payer_slug","billing_class","code_type","code",
        "pg_uid","pos_set_id","negotiated_type","negotiation_arrangement",
        "negotiated_rate","expiration_date","provider_group_id_raw","reporting_entity_name"
    }
    missing = need_cols - set(fact_batch.columns)
    if missing:
        raise ValueError(f"fact_batch missing columns: {sorted(missing)}")
    out_file.parent.mkdir(parents=True, exist_ok=True)
    tmp_new = str(out_file.with_suffix(".stage.parquet"))
    tmp_out = str(out_file.with_suffix(".next.parquet"))
    fact_batch.select(sorted(list(need_cols))).write_parquet(tmp_new, compression="zstd")
    con = duckdb.connect()
    if not out_file.exists():
        con.execute(f"""
          COPY (SELECT * FROM read_parquet('{tmp_new}'))
          TO '{out_file}' (FORMAT PARQUET, COMPRESSION ZSTD);
        """)
        con.close()
        os.remove(tmp_new)
        print(f"Created {out_file} with {fact_batch.height} rows.")
        return
    con.execute(f"""
      CREATE OR REPLACE TABLE _all   AS SELECT * FROM read_parquet('{out_file}');
      CREATE OR REPLACE TABLE _stage AS SELECT * FROM read_parquet('{tmp_new}');
      INSERT INTO _all
      SELECT s.* FROM _stage s
      LEFT JOIN _all a ON a.fact_uid = s.fact_uid
      WHERE a.fact_uid IS NULL;
      COPY (SELECT * FROM _all) TO '{tmp_out}' (FORMAT PARQUET, COMPRESSION ZSTD);
    """)
    con.close()
    os.replace(tmp_out, out_file)
    os.remove(tmp_new)
    print(f"Upsert complete into {out_file}.")

upsert_fact_single(fact_new, GOLD_FACT_FILE)


Created core\test_sandbox\gold\fact_rate.parquet with 78514 rows.


cell 9 sanity check

In [10]:
def count_parquet_rows(path: Path) -> int:
    if not path.exists(): return 0
    con = duckdb.connect()
    n = con.execute(f"SELECT COUNT(*) FROM read_parquet('{str(path)}')").fetchone()[0]
    con.close()
    return int(n)

print("Row counts (test sandbox):")
print("  dim_code         :", count_parquet_rows(DIM_CODE_FILE))
print("  dim_payer        :", count_parquet_rows(DIM_PAYER_FILE))
print("  dim_provider_grp :", count_parquet_rows(DIM_PG_FILE))
print("  dim_pos_set      :", count_parquet_rows(DIM_POS_FILE))
print("  xref_pg_npi      :", count_parquet_rows(XREF_PG_NPI))
print("  xref_pg_tin      :", count_parquet_rows(XREF_PG_TIN))
print("  fact_rate (gold) :", count_parquet_rows(GOLD_FACT_FILE))


Row counts (test sandbox):
  dim_code         : 3678
  dim_payer        : 1
  dim_provider_grp : 245
  dim_pos_set      : 1
  xref_pg_npi      : 16999
  xref_pg_tin      : 9486
  fact_rate (gold) : 78514
