
# HRRP ETL — Cleaned Datasets & Tableau Summaries

This notebook ingests CMS HRRP hospital-level files (FY2021–FY2025), normalizes schemas, flags suppressed rows, and produces Tableau-ready summary tables:

**Outputs (CSV, in `../data/processed/`):**
- `state_summary.csv` — FY × State × Condition averages (+ YoY delta)
- `national_summary.csv` — FY × Condition national average (hospital-weighted)
- `national_summary_stateavg.csv` — FY × Condition (mean of state averages)
- `hospital_summary.csv` — FY × Hospital × Condition records (ERR + counts)
- `suppression_by_fy_measure.csv` — FY × Condition suppression % (for KPI)
- `suppression_summary.csv` — FY suppression overview

Assumptions:
- One CSV per FY in `../data/raw/`
- FY is parsed from filename, e.g., `fy_2024_...csv`


---

### Background
The Hospital Readmissions Reduction Program (HRRP) is a Medicare initiative that reduces payments to hospitals with excess readmissions. Publicly released HRRP data often includes suppressed values for privacy, and schema variations across fiscal years. This ETL process cleans and harmonizes the files to produce consistent Tableau-ready datasets that allow year-over-year and state-level comparisons.

## 1) Imports, paths, and helpers
Utility functions for column normalization and FY parsing.

In [1]:

from pathlib import Path
import re
import warnings
import numpy as np
import pandas as pd

warnings.filterwarnings("ignore", category=UserWarning)
pd.set_option("display.max_rows", 50)
pd.set_option("display.width", 120)

RAW = Path("../data/raw")
PROC = Path("../data/processed")
PROC.mkdir(parents=True, exist_ok=True)

# helper: normalize column names to lower_snake
def norm_cols(cols):
    def f(c):
        c = c.strip()
        c = c.replace("-", " ").replace("/", " ")
        c = re.sub(r"\s+", "_", c.lower())
        return c
    return [f(c) for c in cols]

# helper: parse FY from filename (e.g., "fy_2024_hospital_...csv")
def parse_fy_from_name(p: Path) -> int:
    m = re.search(r"fy[_\- ]?(\d{4})", p.name, flags=re.I)
    if not m:
        raise ValueError(f"Could not parse FY from filename: {p.name}")
    return int(m.group(1))

# canonical column names we want
CANON = [
    "facility_name", "facility_id", "state", "measure_name",
    "number_of_discharges", "number_of_readmissions",
    "predicted_readmission_rate", "expected_readmission_rate",
    "excess_readmission_ratio", "start_date", "end_date", "footnote"
]

# mapping of known variants → canonical
VARIANT_MAP = {
    "facility id": "facility_id",
    "facility_name": "facility_name",
    "facilityid": "facility_id",
    "state": "state",
    "measure_name": "measure_name",
    "measurename": "measure_name",

    "number_of_discharges": "number_of_discharges",
    "numberofdischarges": "number_of_discharges",

    "number_of_readmissions": "number_of_readmissions",
    "numberofreadmissions": "number_of_readmissions",

    "predicted_readmission_rate": "predicted_readmission_rate",
    "predictedreadmissionrate": "predicted_readmission_rate",

    "expected_readmission_rate": "expected_readmission_rate",
    "expectedreadmissionrate": "expected_readmission_rate",

    "excess_readmission_ratio": "excess_readmission_ratio",
    "excessreadmissionratio": "excess_readmission_ratio",

    "start_date": "start_date",
    "end_date": "end_date",
    "footnote": "footnote",
}


## 2) Load all raw CSVs and normalize schema
Reads every CSV in `../data/raw/`, harmonizes headers, coerces types, and attaches FY.

In [2]:

raw_files = sorted([p for p in RAW.glob("*.csv")])
assert raw_files, f"No CSV files found in {RAW.resolve()}"

norm_frames = []

for p in raw_files:
    fy = parse_fy_from_name(p)

    # read as string to avoid parser surprises, then coerce
    df = pd.read_csv(p, dtype=str)
    df.columns = norm_cols(df.columns)

    # rename to canonical
    df = df.rename(columns={c: VARIANT_MAP.get(c, c) for c in df.columns})

    # ensure canonical columns exist
    for col in CANON:
        if col not in df.columns:
            df[col] = np.nan

    # numeric conversions
    for c in ["number_of_discharges","number_of_readmissions",
              "predicted_readmission_rate","expected_readmission_rate",
              "excess_readmission_ratio"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # dates
    for c in ["start_date","end_date"]:
        df[c] = pd.to_datetime(df[c], errors="coerce")

    # id/string
    df["facility_id"] = pd.to_numeric(df["facility_id"], errors="coerce").astype("Int64")
    for c in ["facility_name","state","measure_name","footnote"]:
        df[c] = df[c].astype("string")

    # set FY from filename
    df["FY"] = fy

    # select/order
    df = df.loc[:, CANON + ["FY"]]
    norm_frames.append(df)

len(norm_frames), [f"{parse_fy_from_name(p)}:{p.name}" for p in raw_files]


(5,
 ['2021:fy_2021_hospital_readmissions_reduction_program_hospital.csv',
  '2022:fy_2022_hospital_readmissions_reduction_program_hospital.csv',
  '2023:fy_2023_hospital_readmissions_reduction_program_hospital.csv',
  '2024:fy_2024_hospital_readmissions_reduction_program_hospital.csv',
  '2025:FY_2025_Hospital_Readmissions_Reduction_Program_Hospital.csv'])

## 3) Quick schema & missingness spot-check
Shows missingness of key fields per FY.

⚖️ **Interpretation**: Suppressed data is expected when hospitals have low discharge counts, as CMS masks these values to protect patient privacy. Missingness here should align with suppression rules, and not indicate data ingestion errors.

In [3]:

for df in norm_frames:
    fy = df["FY"].iloc[0]
    print(f"\n=== FY {fy} ===")
    print(df[["excess_readmission_ratio","number_of_discharges","number_of_readmissions"]].isna().mean().round(3))



=== FY 2021 ===
excess_readmission_ratio    0.278
number_of_discharges        0.415
number_of_readmissions      0.422
dtype: float64

=== FY 2022 ===
excess_readmission_ratio    0.297
number_of_discharges        0.460
number_of_readmissions      0.468
dtype: float64

=== FY 2023 ===
excess_readmission_ratio    0.320
number_of_discharges        0.508
number_of_readmissions      0.519
dtype: float64

=== FY 2024 ===
excess_readmission_ratio    0.357
number_of_discharges        0.569
number_of_readmissions      0.580
dtype: float64

=== FY 2025 ===
excess_readmission_ratio    0.356
number_of_discharges        0.549
number_of_readmissions      0.561
dtype: float64


## 4) Flag suppression and split kept vs suppressed
Suppressed if any key metric or counts are missing; keep identifiers + FY + measure_name for suppressed.

💡 CMS suppresses small cell counts (e.g., very few readmissions) to preserve anonymity. We flag these suppressed rows so they can be separated in analysis. Suppressed rows will be summarized separately to allow KPI reporting of suppression levels.

In [4]:

kept_frames, supp_frames = [], []

for df in norm_frames:
    metric_null = df[["excess_readmission_ratio",
                      "predicted_readmission_rate",
                      "expected_readmission_rate"]].isna().any(axis=1)
    count_null  = df[["number_of_discharges","number_of_readmissions"]].isna().any(axis=1)
    df["suppressed"] = metric_null | count_null

    cols_keep = [
        "FY","facility_id","facility_name","state","measure_name",
        "excess_readmission_ratio","predicted_readmission_rate","expected_readmission_rate",
        "number_of_discharges","number_of_readmissions","start_date","end_date"
    ]
    cols_supp = [
        "FY","facility_id","facility_name","state","measure_name","start_date","end_date","suppressed"
    ]

    kept_frames.append(df.loc[~df["suppressed"], cols_keep].copy())
    supp_frames.append(df.loc[df["suppressed"],  cols_supp].copy())

kept_all = pd.concat(kept_frames, ignore_index=True)
supp_all = pd.concat(supp_frames, ignore_index=True)

# safety checks
assert "measure_name" in kept_all.columns and "measure_name" in supp_all.columns
kept_all.shape, supp_all.shape


((46376, 12), (48142, 8))

## 5) Final cleanups
Coerce types and trim strings to ensure consistent grouping.

In [5]:

for c in ["number_of_discharges","number_of_readmissions"]:
    kept_all[c] = pd.to_numeric(kept_all[c], errors="coerce")

kept_all["FY"] = kept_all["FY"].astype(int)
supp_all["FY"] = supp_all["FY"].astype(int)

for c in ["facility_name","state","measure_name"]:
    kept_all[c] = kept_all[c].str.strip()
    supp_all[c] = supp_all[c].str.strip()



## 6) Build Tableau-ready summaries

- **state_summary.csv** — FY × state × condition, with avg ERR, hospital counts, and YoY delta.  
- **national_summary.csv** — FY × condition hospital-weighted national average.  
- **national_summary_stateavg.csv** — FY × condition mean of state averages.  
- **hospital_summary.csv** — Row-level hospital records (ERR + counts).  
- **suppression_by_fy_measure.csv** — FY × condition suppression % (for KPI).  
- **suppression_summary.csv** — FY overall suppression overview.


In [6]:

# State summary
state_summary = (
    kept_all.groupby(["FY","state","measure_name"], dropna=False)
            .agg(
                avg_err=("excess_readmission_ratio","mean"),
                hospitals_reporting=("facility_id","nunique"),
                total_rows=("facility_id","size")
            )
            .reset_index()
            .sort_values(["FY","measure_name","state"])
)
state_summary["delta_prev"] = (
    state_summary.groupby(["state","measure_name"], dropna=False)["avg_err"].diff()
)

# National (hospital-weighted)
national_summary = (
    kept_all.groupby(["FY","measure_name"], dropna=False)
            .agg(
                national_avg_err=("excess_readmission_ratio","mean"),
                hospitals_reporting=("facility_id","nunique")
            )
            .reset_index()
            .sort_values(["FY","measure_name"])
)

# National (mean of state averages)
national_summary_state = (
    state_summary.groupby(["FY","measure_name"], dropna=False)
                 .agg(
                     national_avg_err_state=("avg_err","mean"),
                     hospitals_reporting_sum=("hospitals_reporting","sum")
                 )
                 .reset_index()
                 .sort_values(["FY","measure_name"])
)

# Hospital summary
hospital_summary = kept_all.loc[:, [
    "FY","facility_id","facility_name","state","measure_name",
    "excess_readmission_ratio","number_of_discharges","number_of_readmissions"
]].copy()
hospital_summary = hospital_summary.rename(columns={"excess_readmission_ratio": "err"})
hospital_summary["rank_by_err"] = (
    hospital_summary.groupby(["FY","measure_name"], dropna=False)["err"]
                    .rank(method="dense", ascending=True)
)

# Suppression by FY × measure
supp_by = (
    supp_all.groupby(["FY","measure_name"], dropna=False)
            .size().reset_index(name="suppressed_rows")
)
kept_by = (
    kept_all.groupby(["FY","measure_name"], dropna=False)
            .size().reset_index(name="kept_rows")
)
suppression_by_fy_measure = (
    supp_by.merge(kept_by, on=["FY","measure_name"], how="outer")
           .fillna(0)
           .astype({"suppressed_rows":"int","kept_rows":"int"})
           .sort_values(["FY","measure_name"])
)
suppression_by_fy_measure["total_rows"] = (
    suppression_by_fy_measure["suppressed_rows"] + suppression_by_fy_measure["kept_rows"]
)
suppression_by_fy_measure["pct_suppressed"] = np.where(
    suppression_by_fy_measure["total_rows"] > 0,
    suppression_by_fy_measure["suppressed_rows"] / suppression_by_fy_measure["total_rows"],
    np.nan
)

# FY-level suppression overview
suppression_summary = (
    suppression_by_fy_measure.groupby("FY", dropna=False)
                             .agg(
                                 suppressed_rows=("suppressed_rows","sum"),
                                 total_rows=("total_rows","sum")
                             )
                             .reset_index()
)
suppression_summary["pct_suppressed"] = np.where(
    suppression_summary["total_rows"] > 0,
    suppression_summary["suppressed_rows"] / suppression_summary["total_rows"],
    np.nan
)

state_summary.shape, national_summary.shape, national_summary_state.shape, hospital_summary.shape, suppression_by_fy_measure.shape, suppression_summary.shape


((1496, 7), (30, 4), (30, 4), (46376, 9), (30, 6), (5, 4))

## 7) Save CSVs
Writes all outputs to `../data/processed/`.

In [7]:

state_summary.to_csv(PROC / "state_summary.csv", index=False)
national_summary.to_csv(PROC / "national_summary.csv", index=False)
national_summary_state.to_csv(PROC / "national_summary_stateavg.csv", index=False)
hospital_summary.to_csv(PROC / "hospital_summary.csv", index=False)
suppression_by_fy_measure.to_csv(PROC / "suppression_by_fy_measure.csv", index=False)
suppression_summary.to_csv(PROC / "suppression_summary.csv", index=False)

print("Saved:")
for f in [
    "state_summary.csv",
    "national_summary.csv",
    "national_summary_stateavg.csv",
    "hospital_summary.csv",
    "suppression_by_fy_measure.csv",
    "suppression_summary.csv",
]:
    print("  -", (PROC / f).resolve())


Saved:
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\state_summary.csv
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\national_summary.csv
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\national_summary_stateavg.csv
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\hospital_summary.csv
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\suppression_by_fy_measure.csv
  - C:\Users\derek\projects\HRRP-Analysis\data\processed\suppression_summary.csv


## 8) Sanity checks (optional)
Validates keys and prints simple ranges to spot obvious issues.

In [8]:

# Keys present
assert all(c in suppression_by_fy_measure.columns for c in ["FY","measure_name","pct_suppressed"])
assert all(c in national_summary.columns for c in ["FY","measure_name","national_avg_err"])

# Ranges
print("\nERR ranges by condition (national, hospital-weighted):")
print(
    national_summary
      .groupby("measure_name")["national_avg_err"]
      .agg(["min","max"])
      .round(3)
)

# FY coverage
print("\nFYs present in outputs:")
print("state_summary FYs             :", sorted(state_summary.FY.unique()))
print("national_summary FYs          :", sorted(national_summary.FY.unique()))
print("suppression_by_fy_measure FYs :", sorted(suppression_by_fy_measure.FY.unique()))



ERR ranges by condition (national, hospital-weighted):
                          min    max
measure_name                        
READM-30-AMI-HRRP       1.006  1.012
READM-30-CABG-HRRP      1.023  1.044
READM-30-COPD-HRRP      1.004  1.014
READM-30-HF-HRRP        1.004  1.004
READM-30-HIP-KNEE-HRRP  1.023  1.040
READM-30-PN-HRRP        1.004  1.007

FYs present in outputs:
state_summary FYs             : [2021, 2022, 2023, 2024, 2025]
national_summary FYs          : [2021, 2022, 2023, 2024, 2025]
suppression_by_fy_measure FYs : [2021, 2022, 2023, 2024, 2025]
