# Build data (Python)

This notebook reproduces the **data build pipeline** from the R version, but using **Python only**.
It reads the raw Medicare Advantage files (contract/enrollment and service area), tidies them, and saves:

- `data/output/plan_data.csv`
- `data/output/service_area.csv`

> Paths below match the R notebook (relative to this notebookâ€™s location). If your raw data live elsewhere, update the `DATA_ROOT` paths.


## Preliminaries: imports and paths

In [1]:
# Core
from pathlib import Path
import re

import numpy as np
import pandas as pd

# Display (optional)
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

# --- Paths (edit if needed) ---
# The R notebook uses paths like "../ma-data/ma/enrollment/Extracted Data/..."
DATA_ROOT = Path("..") / "ma-data" / "ma"
ENROLL_DIR = DATA_ROOT / "enrollment" / "Extracted Data"
SERVICE_AREA_DIR = DATA_ROOT / "service-area" / "Extracted Data"

OUT_DIR = Path("data") / "output"
OUT_DIR.mkdir(parents=True, exist_ok=True)

YEAR = 2018
MONTHS = [f"{m:02d}" for m in range(1, 13)]

## Read enrollment and contract data

In [2]:
def _clean_colnames(cols):
    # Standardize column names: lower, strip, replace spaces/punct with underscores
    out = []
    for c in cols:
        c2 = re.sub(r"[^0-9a-zA-Z]+", "_", str(c).strip()).strip("_").lower()
        out.append(c2)
    return out


def read_contract(path: Path) -> pd.DataFrame:
    """Read a single monthly Contract Info file.

    The R notebook reads with skip=1 and supplies names; here we do the same but
    also fall back gracefully if files include headers.
    """
    expected = [
        "contractid","planid","org_type","plan_type","partd","snp","eghp",
        "org_name","org_marketing_name","plan_name","parent_org","contract_date"
    ]

    # Try: skip first line, no header (common in CMS extracts)
    try:
        df = pd.read_csv(path, skiprows=1, header=None, dtype=str, encoding="latin1")
        if df.shape[1] == len(expected):
            df.columns = expected
        else:
            # If column count doesn't match, try reading with header inferred
            df = pd.read_csv(path, skiprows=1, dtype=str, encoding="latin1")
            df.columns = _clean_colnames(df.columns)
    except Exception:
        # Last resort: default read
        df = pd.read_csv(path, dtype=str)
        df.columns = _clean_colnames(df.columns)

    # Coerce planid to numeric where possible (to match R col_double intent)
    if "planid" in df.columns:
        df["planid"] = pd.to_numeric(df["planid"], errors="coerce")

    return df


def read_enroll(path: Path) -> pd.DataFrame:
    """Read a single monthly Enrollment Info file.

    The exact layout can differ across extracts; this function:
    - reads with skiprows=1 by default,
    - standardizes names,
    - ensures `contractid` and `planid` exist.
    """
    try:
        df = pd.read_csv(path, skiprows=1, dtype=str, encoding="latin1")
    except Exception:
        df = pd.read_csv(path, dtype=str, encoding="latin1")

    df.columns = _clean_colnames(df.columns)

    # Harmonize common variants
    ren = {
        "contract_id": "contractid",
        "contract": "contractid",
        "plan_id": "planid",
        "plan": "planid",
        "county_name": "county",
        "state_abbr": "state",
        "fips_state_county_code": "fips",
    }
    df = df.rename(columns={k: v for k, v in ren.items() if k in df.columns})

    # Type fixes
    if "planid" in df.columns:
        df["planid"] = pd.to_numeric(df["planid"], errors="coerce")
    if "fips" in df.columns:
        df["fips"] = pd.to_numeric(df["fips"], errors="coerce")

    return df


def load_month(m: str, y: int) -> pd.DataFrame:
    c_path = ENROLL_DIR / f"CPSC_Contract_Info_{y}_{m}.csv"
    e_path = ENROLL_DIR / f"CPSC_Enrollment_Info_{y}_{m}.csv"

    contract = read_contract(c_path)

    # Match R: distinct(contractid, planid, .keep_all = TRUE)
    if {"contractid", "planid"}.issubset(contract.columns):
        contract = contract.drop_duplicates(subset=["contractid", "planid"], keep="first")

    enroll = read_enroll(e_path)

    # Match R: left_join by contractid, planid where possible
    if {"contractid", "planid"}.issubset(enroll.columns) and {"contractid", "planid"}.issubset(contract.columns):
        df = contract.merge(enroll, on=["contractid", "planid"], how="left", suffixes=("", "_enroll"))
    else:
        # If keys are missing, fall back to a concat that at least preserves contract rows
        df = contract.copy()

    df["month"] = int(m)
    df["year"] = int(y)
    return df

In [3]:
# Read all months, then tidy once (mirrors the R notebook structure)
plan_year = pd.concat([load_month(m, YEAR) for m in MONTHS], ignore_index=True)

# Ensure stable order before fills (as in R: arrange(contractid, planid, state, county, month))
sort_cols = [c for c in ["contractid", "planid", "state", "county", "month"] if c in plan_year.columns]
plan_year = plan_year.sort_values(sort_cols).reset_index(drop=True)

# Fill missing FIPS within (state, county), down+up like tidyr::fill(.direction="downup")
if {"state", "county", "fips"}.issubset(plan_year.columns):
    plan_year["fips"] = (
        plan_year.groupby(["state", "county"], dropna=False)["fips"]
        .apply(lambda s: s.ffill().bfill())
        .reset_index(level=[0,1], drop=True)
    )

# Fill plan descriptors within (contractid, planid)
fill_cols_planid = [c for c in ["plan_type", "partd", "snp", "eghp", "plan_name"] if c in plan_year.columns]
if fill_cols_planid and {"contractid", "planid"}.issubset(plan_year.columns):
    plan_year[fill_cols_planid] = (
        plan_year.groupby(["contractid", "planid"], dropna=False)[fill_cols_planid]
        .apply(lambda df: df.ffill().bfill())
        .reset_index(level=[0,1], drop=True)
    )

# Fill organization descriptors within contractid
fill_cols_contract = [c for c in ["org_name", "org_marketing_name", "org_type", "parent_org", "contract_date"] if c in plan_year.columns]
if fill_cols_contract and "contractid" in plan_year.columns:
    plan_year[fill_cols_contract] = (
        plan_year.groupby(["contractid"], dropna=False)[fill_cols_contract]
        .apply(lambda df: df.ffill().bfill())
        .reset_index(level=0, drop=True)
    )

plan_year.head()

  .apply(lambda df: df.ffill().bfill())
  .apply(lambda df: df.ffill().bfill())


Unnamed: 0,contractid,planid,org_type,plan_type,partd,snp,eghp,org_name,org_marketing_name,plan_name,parent_org,contract_date,month,year
0,90091,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00,1,2018
1,90091,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00,2,2018
2,90091,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00,3,2018
3,90091,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00,4,2018
4,90091,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00,5,2018


In [4]:
# Collapse to one row per (contractid, planid, fips, year) using the *last* month after sorting,
# analogous to the R summarize(last(...)) pattern.

group_cols = [c for c in ["contractid", "planid", "fips", "year"] if c in plan_year.columns]
if not group_cols:
    raise ValueError("Could not find grouping columns. Check that your raw files include contractid/planid/fips/year.")

plan_year = plan_year.sort_values([*group_cols, "month"]).reset_index(drop=True)

# Choose which columns to carry forward with "last"
carry_cols = [c for c in [
    "state","county","org_type","plan_type","partd","snp","eghp",
    "org_name","org_marketing_name","plan_name","parent_org","contract_date",
] if c in plan_year.columns]

# Also carry forward any obvious enrollment measures if present
enroll_like = [c for c in plan_year.columns if any(k in c for k in ["enroll", "enrollment", "benefici", "member", "eligible"])]
carry_cols += [c for c in enroll_like if c not in carry_cols and c not in group_cols and c != "month"]

final_plans = (
    plan_year.groupby(group_cols, dropna=False, as_index=False)
    .agg({c: "last" for c in carry_cols})
)

final_plans.head()

Unnamed: 0,contractid,planid,year,org_type,plan_type,partd,snp,eghp,org_name,org_marketing_name,plan_name,parent_org,contract_date
0,90091,,2018,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,United Mine Workers of America Health & Retire...,,UMWA Health and Retirement Funds,02/01/1974 0:00:00
1,E0654,801.0,2018,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00
2,E3014,801.0,2018,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,PSERS HOP PROGRAM,Pennsylvania Public School Employees Retiremen...,PSERS Health Options Program (Employer PDP),Commonwealth of PA Pub Schools Retirement System,01/01/2007 0:00:00
3,E4744,801.0,2018,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,MODOT/MSHP MEDICAL AND LIFE INSURANCE PLAN,MISSOURI DEPARTMENT OF TRANSPORTATION,Missouri Department of Transportatio/ Highway ...,Missouri Highways and Transportation Commission,01/01/2007 0:00:00
4,H0022,1.0,2018,Demo,Medicare-Medicaid Plan HMO/HMOPOS,Yes,No,No,"BUCKEYE COMMUNITY HEALTH PLAN, INC.",Buckeye Health Plan - MyCare Ohio,Buckeye Health Plan - MyCare Ohio (Medicare-Me...,Centene Corporation,05/01/2014 0:00:00


In [None]:
# Save
final_plans.to_csv(OUT_DIR / "plan_data.csv", index=False)
print("Wrote:", OUT_DIR / "plan_data.csv")

## Read service area data

In [5]:
def read_service_area(path: Path) -> pd.DataFrame:
    expected = ["contractid","org_name","org_type","plan_type","partial","eghp","ssa","fips","county","state","notes"]

    # Try: skip first line, no header
    try:
        df = pd.read_csv(path, skiprows=1, header=None, dtype=str, na_values=["*"], encoding="latin1")
        if df.shape[1] == len(expected):
            df.columns = expected
        else:
            df = pd.read_csv(path, skiprows=1, dtype=str, na_values=["*"], encoding="latin1")
            df.columns = _clean_colnames(df.columns)
    except Exception:
        df = pd.read_csv(path, dtype=str, na_values=["*"])
        df.columns = _clean_colnames(df.columns)

    # Type fixes
    if "partial" in df.columns:
        # partial is logical in R
        df["partial"] = df["partial"].map(lambda x: str(x).strip().lower() in {"true","t","1","yes","y"} if pd.notna(x) else np.nan)
    for col in ["ssa", "fips"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    return df


def load_month_sa(m: str, y: int) -> pd.DataFrame:
    path = SERVICE_AREA_DIR / f"MA_Cnty_SA_{y}_{m}.csv"
    df = read_service_area(path)
    df["month"] = int(m)
    df["year"] = int(y)
    return df

In [6]:
service_year = pd.concat([load_month_sa(m, YEAR) for m in MONTHS], ignore_index=True)

# Stable order before fills
sort_cols = [c for c in ["contractid", "fips", "state", "county", "month"] if c in service_year.columns]
service_year = service_year.sort_values(sort_cols).reset_index(drop=True)

# Fill missing fips within (state, county)
if {"state", "county", "fips"}.issubset(service_year.columns):
    service_year["fips"] = (
        service_year.groupby(["state", "county"], dropna=False)["fips"]
        .apply(lambda s: s.ffill().bfill())
        .reset_index(level=[0,1], drop=True)
    )

# Fill other labels within contractid (mirrors tidyr::fill)
fill_cols_sa = [c for c in ["org_name","org_type","plan_type","partial","eghp","ssa","notes"] if c in service_year.columns]
if fill_cols_sa and "contractid" in service_year.columns:
    service_year[fill_cols_sa] = (
        service_year.groupby(["contractid"], dropna=False)[fill_cols_sa]
        .apply(lambda df: df.ffill().bfill())
        .reset_index(level=0, drop=True)
    )

service_year.head()

  .apply(lambda df: df.ffill().bfill())


Unnamed: 0,contractid,org_name,org_type,plan_type,partial,eghp,ssa,fips,county,state,notes,month,year
0,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",1,2018
1,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",2,2018
2,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",3,2018
3,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",4,2018
4,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",5,2018


In [7]:
# Collapse to one row per (contractid, fips, year) taking last month after sorting
group_cols = [c for c in ["contractid", "fips", "year"] if c in service_year.columns]
service_year = service_year.sort_values([*group_cols, "month"]).reset_index(drop=True)

carry_cols = [c for c in ["state","county","org_name","org_type","plan_type","partial","eghp","ssa","notes"] if c in service_year.columns]

final_service_area = (
    service_year.groupby(group_cols, dropna=False, as_index=False)
    .agg({c: "last" for c in carry_cols})
)

final_service_area.head()

Unnamed: 0,contractid,fips,year,state,county,org_name,org_type,plan_type,partial,eghp,ssa,notes
0,90091,,2018,,,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,"Covers the entire US, all States and Counties"
1,H0022,39023.0,2018,OH,Clark,"BUCKEYE COMMUNITY HEALTH PLAN, INC.",Demo,Medicare-Medicaid Plan HMO/HMOPOS,,,36110.0,
2,H0022,39035.0,2018,OH,Cuyahoga,"BUCKEYE COMMUNITY HEALTH PLAN, INC.",Demo,Medicare-Medicaid Plan HMO/HMOPOS,,,36170.0,
3,H0022,39051.0,2018,OH,Fulton,"BUCKEYE COMMUNITY HEALTH PLAN, INC.",Demo,Medicare-Medicaid Plan HMO/HMOPOS,,,36260.0,
4,H0022,39055.0,2018,OH,Geauga,"BUCKEYE COMMUNITY HEALTH PLAN, INC.",Demo,Medicare-Medicaid Plan HMO/HMOPOS,,,36280.0,


In [None]:
# Save
final_service_area.to_csv(OUT_DIR / "service_area.csv", index=False)
print("Wrote:", OUT_DIR / "service_area.csv")