# 01 — Peer Grouping Design (Core)

Decision Supported:
D-PI-1 — Provider Review Entry

Decision Actor:
Program Integrity (PI)

Decision Owner:
Investigations Lead

Why This Notebook Exists:
Provider behavior must be evaluated relative to comparable providers.
Peer grouping defines what "comparable" means and prevents false anomaly signals.

What Decision Changes Because Of This Work:
Provider review prioritization will rely on peer-adjusted comparisons rather than raw totals.

What This Notebook Does NOT Do:
- Does not flag providers
- Does not produce anomaly scores
- Does not produce operational recommendations
- Does not perform modeling


In [1]:
# CELL-ID: DPI1-01-LOAD-01
# PURPOSE: Load interim parquet dataset for peer grouping diagnostics
# INPUTS: data/interim/partb_provider_service.parquet/*
# OUTPUTS: Arrow dataset handle + row count estimate + column list
# DECISION-LINK: D-PI-1 Provider Review Entry

from pathlib import Path
import pyarrow.dataset as ds

PROJECT_ROOT = Path(r"C:\Users\billm\Projects\Medicare\medicare-program-integrity")
INTERIM_PARQUET_DIR = PROJECT_ROOT / "data" / "interim" / "partb_provider_service.parquet"

assert INTERIM_PARQUET_DIR.exists(), f"Missing interim parquet dir: {INTERIM_PARQUET_DIR}"

dataset = ds.dataset(INTERIM_PARQUET_DIR, format="parquet")

print("Parquet location:", INTERIM_PARQUET_DIR)
print("Schema:")
print(dataset.schema)

# Row count (fast approximate via fragment metadata if available; otherwise prints fragment count)
fragments = list(dataset.get_fragments())
print("Fragments:", len(fragments))


Parquet location: C:\Users\billm\Projects\Medicare\medicare-program-integrity\data\interim\partb_provider_service.parquet
Schema:
Rndrng_NPI: string
Rndrng_Prvdr_Last_Org_Name: string
Rndrng_Prvdr_First_Name: string
Rndrng_Prvdr_MI: string
Rndrng_Prvdr_Crdntls: string
Rndrng_Prvdr_Ent_Cd: string
Rndrng_Prvdr_St1: string
Rndrng_Prvdr_St2: string
Rndrng_Prvdr_City: string
Rndrng_Prvdr_State_Abrvtn: string
Rndrng_Prvdr_State_FIPS: string
Rndrng_Prvdr_Zip5: string
Rndrng_Prvdr_RUCA: string
Rndrng_Prvdr_RUCA_Desc: string
Rndrng_Prvdr_Cntry: string
Rndrng_Prvdr_Type: string
Rndrng_Prvdr_Mdcr_Prtcptg_Ind: string
HCPCS_Cd: string
HCPCS_Desc: string
HCPCS_Drug_Ind: string
Place_Of_Srvc: string
Tot_Benes: int64
Tot_Srvcs: double
Tot_Bene_Day_Srvcs: double
Avg_Sbmtd_Chrg: double
Avg_Mdcr_Alowd_Amt: double
Avg_Mdcr_Pymt_Amt: double
Avg_Mdcr_Stdzd_Amt: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 3852
Fragments: 39


In [4]:
# CELL-ID: DPI1-01-DIAG-01
# PURPOSE: Compute peer group size diagnostics for candidate peer definitions (memory-safe)
# INPUTS: interim parquet dataset
# OUTPUTS: unique provider types + peer group size distribution summary
# DECISION-LINK: D-PI-1 Provider Review Entry

import pyarrow.compute as pc

# Candidate peer definition (v1)
peer_keys = ["Rndrng_Prvdr_Type", "Rndrng_Prvdr_State_Abrvtn", "Place_Of_Srvc"]

# Build a lightweight table with only the peer keys and NPI
cols = peer_keys + ["Rndrng_NPI"]
tbl = dataset.to_table(columns=cols)

# Group size = distinct NPIs per peer group
# Use Arrow group_by then count distinct NPI
grouped = tbl.group_by(peer_keys).aggregate([("Rndrng_NPI", "count_distinct")])
grouped = grouped.rename_columns(peer_keys + ["peer_npi_count"])

# Basic summaries
peer_counts = grouped.column("peer_npi_count")

unique_types = pc.count_distinct(tbl.column("Rndrng_Prvdr_Type")).as_py()
num_groups = grouped.num_rows
min_sz = pc.min(peer_counts).as_py()
# Quantiles return an Array; take first element
p10 = pc.quantile(peer_counts, q=0.10, interpolation="linear")[0].as_py()
median = pc.quantile(peer_counts, q=0.50, interpolation="linear")[0].as_py()


print(f"Unique Rndrng_Prvdr_Type: {unique_types}")
print(f"Peer groups (Type x State x POS): {num_groups:,}")
print(f"Peer group size (distinct NPI) min: {min_sz}")
print(f"Peer group size p10: {p10}")
print(f"Peer group size median: {median}")

# Optional: show the smallest groups for inspection
smallest = grouped.sort_by([("peer_npi_count", "ascending")]).slice(20).to_pandas()
display(smallest)


Unique Rndrng_Prvdr_Type: 104
Peer groups (Type x State x POS): 8,420
Peer group size (distinct NPI) min: 1
Peer group size p10: 2.0
Peer group size median: 26.0


Unnamed: 0,Rndrng_Prvdr_Type,Rndrng_Prvdr_State_Abrvtn,Place_Of_Srvc,peer_npi_count
0,Medical Toxicology,MS,F,1
1,Clinic or Group Practice,MD,O,1
2,Pain Management,DC,F,1
3,Pain Management,DC,O,1
4,Dentist,MO,O,1
...,...,...,...,...
8395,Family Practice,CA,O,6326
8396,Physical Therapist in Private Practice,CA,O,6585
8397,Nurse Practitioner,OH,O,6755
8398,Nurse Practitioner,TX,O,8833


In [5]:
# CELL-ID: DPI1-01-DIAG-02
# PURPOSE: Re-test peer group size using broader definition (Provider Type x State)
# INPUTS: interim parquet dataset
# OUTPUTS: revised peer group size metrics
# DECISION-LINK: D-PI-1 Provider Review Entry

import pyarrow.compute as pc

peer_keys = ["Rndrng_Prvdr_Type", "Rndrng_Prvdr_State_Abrvtn"]

cols = peer_keys + ["Rndrng_NPI"]
tbl = dataset.to_table(columns=cols)

grouped = tbl.group_by(peer_keys).aggregate([("Rndrng_NPI", "count_distinct")])
grouped = grouped.rename_columns(peer_keys + ["peer_npi_count"])

peer_counts = grouped.column("peer_npi_count")

unique_types = pc.count_distinct(tbl.column("Rndrng_Prvdr_Type")).as_py()
num_groups = grouped.num_rows
min_sz = pc.min(peer_counts).as_py()
p10 = pc.quantile(peer_counts, q=0.10, interpolation="linear")[0].as_py()
median = pc.quantile(peer_counts, q=0.50, interpolation="linear")[0].as_py()

print(f"Unique Rndrng_Prvdr_Type: {unique_types}")
print(f"Peer groups (Type x State): {num_groups:,}")
print(f"Peer group size (distinct NPI) min: {min_sz}")
print(f"Peer group size p10: {p10}")
print(f"Peer group size median: {median}")


Unique Rndrng_Prvdr_Type: 104
Peer groups (Type x State): 4,796
Peer group size (distinct NPI) min: 1
Peer group size p10: 2.0
Peer group size median: 41.0


In [6]:
# CELL-ID: DPI1-01-DIAG-03
# PURPOSE: Test peer grouping using Provider Type only (broad stability baseline)
# INPUTS: interim parquet dataset
# OUTPUTS: peer group size metrics for Provider Type only
# DECISION-LINK: D-PI-1 Provider Review Entry

import pyarrow.compute as pc

peer_keys = ["Rndrng_Prvdr_Type"]

cols = peer_keys + ["Rndrng_NPI"]
tbl = dataset.to_table(columns=cols)

grouped = tbl.group_by(peer_keys).aggregate([("Rndrng_NPI", "count_distinct")])
grouped = grouped.rename_columns(peer_keys + ["peer_npi_count"])

peer_counts = grouped.column("peer_npi_count")

num_groups = grouped.num_rows
min_sz = pc.min(peer_counts).as_py()
p10 = pc.quantile(peer_counts, q=0.10, interpolation="linear")[0].as_py()
median = pc.quantile(peer_counts, q=0.50, interpolation="linear")[0].as_py()

print(f"Peer groups (Provider Type only): {num_groups:,}")
print(f"Peer group size min: {min_sz}")
print(f"Peer group size p10: {p10}")
print(f"Peer group size median: {median}")


Peer groups (Provider Type only): 104
Peer group size min: 1
Peer group size p10: 47.800000000000004
Peer group size median: 2584.0


In [7]:
# CELL-ID: DPI1-01-LOCK-01
# PURPOSE: Lock v1 peer grouping definition for downstream signal engine
# INPUTS: peer diagnostic results
# OUTPUTS: peer_group_contract dict used by later notebooks
# DECISION-LINK: D-PI-1 Provider Review Entry

peer_group_contract = {
    "version": "v1",
    "primary_grouping": ["Rndrng_Prvdr_Type"],
    "secondary_controls": [
        "Rndrng_Prvdr_State_Abrvtn",
        "Rndrng_Prvdr_RUCA",
        "Place_Of_Srvc"
    ],
    "minimum_provider_threshold": 30,
    "notes": [
        "Primary peer grouping uses Provider Type due to stability.",
        "Geography and service location applied as secondary analytical controls.",
        "State-level grouping caused over-fragmentation.",
        "Type-only grouping provides stable statistical baseline."
    ]
}

peer_group_contract


{'version': 'v1',
 'primary_grouping': ['Rndrng_Prvdr_Type'],
 'secondary_controls': ['Rndrng_Prvdr_State_Abrvtn',
  'Rndrng_Prvdr_RUCA',
  'Place_Of_Srvc'],
 'minimum_provider_threshold': 30,
 'notes': ['Primary peer grouping uses Provider Type due to stability.',
  'Geography and service location applied as secondary analytical controls.',
  'State-level grouping caused over-fragmentation.',
  'Type-only grouping provides stable statistical baseline.']}