In [2]:
# code code 

from __future__ import annotations

import numpy as np
import pandas as pd
from pathlib import Path

In [None]:
BASE_DIR = Path("data/input/ma-data/enrollment/Extracted Data")


def monthlist_for_year(y: int) -> list[str]:
    months = range(01, 12) if y == 2018 else range(01, 12)
    return [f"{m:02d}" for m in months]

In [None]:
CONTRACT_COLUMNS = [
    "contractid", "planid", "org_type", "plan_type", "partd", "snp", "eghp",
    "org_name", "org_marketing_name", "plan_name", "parent_org", "contract_date",
]
CONTRACT_DTYPES = {
    "contractid": "string",
    "planid": "float64",
    "org_type": "string",
    "plan_type": "string",
    "partd": "string",
    "snp": "string",
    "eghp": "string",
    "org_name": "string",
    "org_marketing_name": "string",
    "plan_name": "string",
    "parent_org": "string",
    "contract_date": "string",
}

In [None]:
ENROLL_COLUMNS = [
    "contractid", "planid", "ssa", "fips", "state", "county", "enrollment"
]
ENROLL_DTYPES = {
    "contractid": "string",
    "planid": "float64",
    "ssa": "float64",
    "fips": "float64",
    "state": "string",
    "county": "string",
    "enrollment": "float64",
}


def read_contract(path: Path) -> pd.DataFrame:
    return pd.read_csv(
        path,
        skiprows=1,
        header=None,
        names=CONTRACT_COLUMNS,
        dtype=CONTRACT_DTYPES,
        low_memory=False,
    )


def read_enroll(path: Path) -> pd.DataFrame:
    return pd.read_csv(
        path,
        skiprows=1,
        header=None,
        names=ENROLL_COLUMNS,
        dtype=ENROLL_DTYPES,
        na_values=["*"],
        low_memory=False,
    )

In [None]:
def fill_downup(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    df = df.copy()
    df[cols] = df[cols].ffill().bfill()
    return df



def load_month(m: str, y: int) -> pd.DataFrame:
    c_path = BASE_DIR / f"CPSC_Contract_Info_{y}_{m}.csv"
    e_path = BASE_DIR / f"CPSC_Enrollment_Info_{y}_{m}.csv"

    contract_info = (
        read_contract(c_path)
        .drop_duplicates(subset=["contractid", "planid"], keep="first")
    )

    enroll_info = read_enroll(e_path)

    return (
        contract_info
        .merge(enroll_info, on=["contractid", "planid"], how="left")
        .assign(month=int(m), year=y)
    )


def build_plan_year(y: int) -> pd.DataFrame:
    monthlist = monthlist_for_year(y)

    plan_year = pd.concat(
        [load_month(m, y) for m in monthlist],
        ignore_index=True
    )

    plan_year = plan_year.sort_values(
        ["contractid", "planid", "state", "county", "month"],
        kind="mergesort"
    )

In [None]:
 plan_year = (
        plan_year
        .groupby(["state", "county"], dropna=False, group_keys=False)
        .apply(lambda g: fill_downup(g, ["fips"]))
        .groupby(["contractid", "planid"], dropna=False, group_keys=False)
        .apply(lambda g: fill_downup(g, ["plan_type", "partd", "snp", "eghp", "plan_name"]))
        .groupby("contractid", dropna=False, group_keys=False)
        .apply(lambda g: fill_downup(g, ["org_type", "org_name", "org_marketing_name", "parent_org"]))
        .reset_index(drop=True)
    )

    return plan_year



def collapse_to_yearly_panel(plan_year: pd.DataFrame) -> pd.DataFrame:
    plan_year = plan_year.sort_values(
        ["contractid", "planid", "fips", "year", "month"],
        kind="mergesort"
    )

    def summarize(g):
        enroll = g["enrollment"]
        nonmiss = enroll.notna().sum()
        vals = enroll.dropna()

In [None]:
 return pd.Series({
            "n_nonmiss": nonmiss,
            "avg_enrollment": vals.mean() if nonmiss > 0 else np.nan,
            "sd_enrollment": vals.std(ddof=1) if nonmiss > 1 else np.nan,
            "min_enrollment": vals.min() if nonmiss > 0 else np.nan,
            "max_enrollment": vals.max() if nonmiss > 0 else np.nan,
            "first_enrollment": vals.iloc[0] if nonmiss > 0 else np.nan,
            "last_enrollment": vals.iloc[-1] if nonmiss > 0 else np.nan,
            "state": g["state"].iloc[-1],
            "county": g["county"].iloc[-1],
            "org_type": g["org_type"].iloc[-1],
            "plan_type": g["plan_type"].iloc[-1],
            "partd": g["partd"].iloc[-1],
            "snp": g["snp"].iloc[-1],
            "eghp": g["eghp"].iloc[-1],
            "org_name": g["org_name"].iloc[-1],
            "org_marketing_name": g["org_marketing_name"].iloc[-1],
            "plan_name": g["plan_name"].iloc[-1],
            "parent_org": g["parent_org"].iloc[-1],
            "contract_date": g["contract_date"].iloc[-1],
            "year": g["year"].iloc[-1],
        })

    return (
        plan_year
        .groupby(["contractid", "planid", "fips", "year"], dropna=False)
        .apply(summarize)
        .reset_index()
    )


if __name__ == "__main__":
    y = 2009
    plan_year = build_plan_year(y)
    final_plans = collapse_to_yearly_panel(plan_year)
     