In [8]:
import duckdb

con = duckdb.connect("../db/synpuf.duckdb")

In [9]:
chronic_map = {
    "SP_ALZHDMTA": "has_alzheimers",
    "SP_CHF": "has_chf",
    "SP_CHRNKIDN": "has_ckd",
    "SP_CNCR": "has_cancer",
    "SP_COPD": "has_copd",
    "SP_DEPRESSN": "has_depression",
    "SP_DIABETES": "has_diabetes",
    "SP_ISCHMCHT": "has_ihd",
    "SP_OSTEOPRS": "has_osteoporosis",
    "SP_RA_OA": "has_ra_oa",
    "SP_STRKETIA": "has_stroke"
}

chronic_cases = [f"CASE WHEN {col}=1 THEN TRUE ELSE FALSE END AS {alias}" 
                 for col, alias in chronic_map.items()]

year_union = []
for year in ["2008","2009","2010"]:
    year_union.append(f"""
    SELECT
        DESYNPUF_ID,
        {year} AS year,
        CAST(substr(CAST(BENE_BIRTH_DT AS VARCHAR), 1, 4) AS INT) AS birth_year,
        BENE_SEX_IDENT_CD AS sex,
        BENE_RACE_CD AS race,
        CASE WHEN BENE_ESRD_IND='Y' THEN TRUE ELSE FALSE END AS esrd,
        {", ".join(chronic_cases)}
    FROM beneficiary_{year}
    """)

beneficiary_query = f"""
CREATE OR REPLACE TABLE beneficiary_all AS
SELECT * ,
       year - birth_year AS age
FROM ({' UNION ALL '.join(year_union)})
"""
con.execute(beneficiary_query)

con.execute("""
CREATE OR REPLACE TABLE inpatient_cost AS
SELECT DESYNPUF_ID,
       CAST(substr(CAST(CLM_FROM_DT AS VARCHAR), 1, 4) AS INT) AS year,
       SUM(COALESCE(CLM_PMT_AMT, 0)) AS ip_cost
FROM inpatient
GROUP BY DESYNPUF_ID, year
""")

con.execute("""
CREATE OR REPLACE TABLE outpatient_cost AS
SELECT DESYNPUF_ID,
       CAST(substr(CAST(CLM_FROM_DT AS VARCHAR), 1, 4) AS INT) AS year,
       SUM(COALESCE(CLM_PMT_AMT, 0)) AS op_cost
FROM outpatient
GROUP BY DESYNPUF_ID, year
""")

line_cols = [f"COALESCE(LINE_NCH_PMT_AMT_{i},0)" for i in range(1,14)]
sum_expr = " + ".join(line_cols)

con.execute(f"""
CREATE OR REPLACE TABLE carrier_cost AS
SELECT DESYNPUF_ID,
       CAST(substr(CAST(CLM_FROM_DT AS VARCHAR), 1, 4) AS INT) AS year,
       SUM({sum_expr}) AS car_cost
FROM carrier
GROUP BY DESYNPUF_ID, year
""")

con.execute("""
CREATE OR REPLACE TABLE rx_cost AS
SELECT DESYNPUF_ID,
       CAST(substr(CAST(SRVC_DT AS VARCHAR), 1, 4) AS INT) AS year,
       SUM(COALESCE(TOT_RX_CST_AMT, 0)) AS rx_cost
FROM pde
GROUP BY DESYNPUF_ID, year
""")

# ✅ Final join and compute total_cost
con.execute("""
CREATE OR REPLACE TABLE final_normalized AS
SELECT b.DESYNPUF_ID,
       b.year,
       b.age,
       b.sex,
       b.race,
       b.esrd,
       b.has_alzheimers, b.has_chf, b.has_ckd, b.has_cancer, b.has_copd,
       b.has_depression, b.has_diabetes, b.has_ihd, b.has_osteoporosis,
       b.has_ra_oa, b.has_stroke,
       COALESCE(i.ip_cost,0) AS ip_cost,
       COALESCE(o.op_cost,0) AS op_cost,
       COALESCE(c.car_cost,0) AS car_cost,
       COALESCE(r.rx_cost,0) AS rx_cost,
       COALESCE(i.ip_cost,0) + COALESCE(o.op_cost,0) + COALESCE(c.car_cost,0) + COALESCE(r.rx_cost,0) AS total_cost
FROM beneficiary_all b
LEFT JOIN inpatient_cost i USING (DESYNPUF_ID, year)
LEFT JOIN outpatient_cost o USING (DESYNPUF_ID, year)
LEFT JOIN carrier_cost c USING (DESYNPUF_ID, year)
LEFT JOIN rx_cost r USING (DESYNPUF_ID, year)
""")



<duckdb.duckdb.DuckDBPyConnection at 0x76ed681d98f0>

In [10]:

con.execute("DESC final_normalized").fetch_df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,DESYNPUF_ID,VARCHAR,YES,,,
1,year,INTEGER,YES,,,
2,age,INTEGER,YES,,,
3,sex,BIGINT,YES,,,
4,race,BIGINT,YES,,,
5,esrd,BOOLEAN,YES,,,
6,has_alzheimers,BOOLEAN,YES,,,
7,has_chf,BOOLEAN,YES,,,
8,has_ckd,BOOLEAN,YES,,,
9,has_cancer,BOOLEAN,YES,,,


In [11]:
con.execute("""
    COPY final_normalized TO '../features/final_normalized.parquet' (FORMAT PARQUET)
""")

# ✅ Close connection
con.close()