In [1]:
# =========================
# The purpose of this script is to find statistical associations between features
# and a binary target (readmission within 30 days) using GLMs (logistic regression)
# =========================
# Builds univariate & multivariable GLMs (logistic), fixes encoding/feature names,
# handles mixed dtypes, sanitizes column names for Patsy,
# and writes three Swiss-Excel-friendly outputs:
#  - reports/clean_univariate_results.csv
#  - reports/clean_multivariable_results.csv
#  - reports/final_feature_summary_no_shap.csv

import pandas as pd
import numpy as np
import re
import unicodedata
from pathlib import Path
import statsmodels.api as sm
import statsmodels.formula.api as smf

# ---------- CONFIG ----------
DATA_PATH = "../data/processed/diabetes_cleaned_data.csv"
TARGET = "readmitted_binary"  # must be 0/1
TOP_FEATURES = None           # if None → use all except IDs
RARE_THRESH = 0.01            # collapse rare categories (<1%)
SAMPLE_N = 100000              # stratified sample for speed; set None for all rows

# ---------- Helpers ----------
def read_csv_safely(path, expected_sep=None):
    try:
        return pd.read_csv(path, sep=expected_sep or None, engine="python",
                           encoding="utf-8", on_bad_lines="skip")
    except UnicodeDecodeError:
        return pd.read_csv(path, sep=expected_sep or None, engine="python",
                           encoding="latin-1", on_bad_lines="skip")

def strip_weird_chars(text):
    if not isinstance(text, str):
        return text
    text = ''.join(ch for ch in text if ch.isprintable())
    text = unicodedata.normalize('NFKC', text)
    return text

def clean_columns(df):
    df = df.copy()
    df.columns = [strip_weird_chars(str(c)).strip() for c in df.columns]
    return df

def save_swiss_excel(df, path):
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(path, sep=';', decimal=',', index=False)
    return path

# ---------- Load & prepare ----------
df = read_csv_safely(DATA_PATH)
df = clean_columns(df)
TOP_FEATURES = None  # <- put your list here, e.g. ["number_inpatient","insulin","age",...]
if TOP_FEATURES is None:
    candidates = [
        # demographics & admin
        "age","gender","race","admission_type_id","admission_source_id","discharge_disposition_id","payer_code",
        # utilization
        "number_inpatient","number_outpatient","number_emergency",
        # current stay & treatment
        "time_in_hospital","num_lab_procedures","num_procedures","num_medications",
        "insulin","change","diabetesMed",
        # diabetes specifics
        "A1Cresult","max_glu_serum","diab_type","diab_control",
        "diab_complication_binary","diab_complication_categories", "diag_1","diag_2","diag_3","num_diagnoses", 
        "diag_1_category","diag_2_category","diag_3_category"
    ]
    TOP_FEATURES = [c for c in candidates if c in df.columns]

use_cols = [TARGET] + TOP_FEATURES
data = df[use_cols].dropna().copy()
data[TARGET] = data[TARGET].astype(int)

# ---------- Identify categorical columns ----------
LIKELY_CATS = {
    'race', 'gender', 'age',
    'admission_type_id', 'discharge_disposition_id', 'admission_source_id',
    'payer_code', 'medical_specialty',
    'A1Cresult', 'max_glu_serum',
    'change', 'diabetesMed', 'insulin',
    'diag_1', 'diag_2', 'diag_3',
    'diab_type', 'diab_control',
    'diab_complication_binary', 'diab_complication_categories'
}

for c in TOP_FEATURES:
    if c in LIKELY_CATS or data[c].dtype == object:
        data[c] = data[c].astype("category")

cat_cols = [c for c in TOP_FEATURES if str(data[c].dtype).startswith("category")]
num_cols = [c for c in TOP_FEATURES if c not in cat_cols]

# ---------- Collapse rare categorical levels safely ----------
for c in cat_cols:
    data[c] = data[c].astype("string")
    vc = data[c].value_counts(normalize=True, dropna=False)
    rare = set(vc[vc < RARE_THRESH].index)
    if rare:
        data.loc[data[c].isin(rare), c] = "Other"
    data[c] = data[c].astype("category")

# ---------- Optional sampling ----------
if SAMPLE_N and len(data) > SAMPLE_N:
    data = (data.groupby(TARGET, group_keys=False)
                .apply(lambda x: x.sample(int(SAMPLE_N * len(x)/len(data)), random_state=42))
                .reset_index(drop=True))

# ---------- Make column names Patsy-safe ----------
def make_safe(name):
    return re.sub(r'[^0-9a-zA-Z_]', '_', str(name))

orig_to_safe = {c: make_safe(c) for c in data.columns}
safe_to_orig = {v: k for k, v in orig_to_safe.items()}
data = data.rename(columns=orig_to_safe)

TARGET_SAFE = orig_to_safe[TARGET]
TOP_FEATURES_SAFE = [orig_to_safe[c] for c in TOP_FEATURES]
cat_cols_safe = [orig_to_safe[c] for c in cat_cols]
num_cols_safe = [orig_to_safe[c] for c in num_cols]

# ---------- Categorical casting for Patsy ----------
for col in cat_cols_safe:
    data[col] = data[col].astype(str)

# ---------- Build formulas ----------
def term(col): 
    return f"C({col})" if col in cat_cols_safe else col

uni_formulas = {c: f"{TARGET_SAFE} ~ {term(c)}" for c in TOP_FEATURES_SAFE}
mv_formula   = f"{TARGET_SAFE} ~ " + " + ".join(term(c) for c in TOP_FEATURES_SAFE)

# ---------- Fit multivariable GLM ----------
glm_mv = smf.glm(formula=mv_formula, data=data, family=sm.families.Binomial()).fit()

mv = glm_mv.summary2().tables[1].copy()
mv = mv.rename(columns={"Coef.":"coef","Std.Err.":"stderr","P>|z|":"p_value",
                        "[0.025":"ci_low","0.975]":"ci_high"})
mv["term"] = mv.index.astype(str)
def base_from_term(term: str) -> str:
    if not isinstance(term, str):
        return str(term)
    m = re.match(r"C\((.*?)\)", term)
    if m:
        return m.group(1)
    return re.split(r'[\[:]', term)[0]
mv["feature_safe"] = mv["term"].apply(base_from_term)
mv["feature"] = mv["feature_safe"].map(safe_to_orig).fillna(mv["feature_safe"])
mv["OR"] = np.exp(mv["coef"])
mv["OR_ci_low"] = np.exp(mv["ci_low"])
mv["OR_ci_high"] = np.exp(mv["ci_high"])
mv = mv[["feature","term","coef","OR","OR_ci_low","OR_ci_high","stderr","z","p_value"]]
mv_path = save_swiss_excel(mv, "../reports/clean_multivariable_results.csv")

# ---------- Fit univariate GLMs ----------
uni_rows = []
for safe_feat, formula in uni_formulas.items():
    try:
        m = smf.glm(formula=formula, data=data, family=sm.families.Binomial()).fit()
        t = m.summary2().tables[1].copy()
        t = t.rename(columns={"Coef.":"coef","Std.Err.":"stderr","P>|z|":"p_value",
                              "[0.025":"ci_low","0.975]":"ci_high"})
        t["term"] = t.index.astype(str)
        t["feature_safe"] = safe_feat
        t["feature"] = safe_to_orig.get(safe_feat, safe_feat)
        t["OR"] = np.exp(t["coef"])
        t["OR_ci_low"] = np.exp(t["ci_low"])
        t["OR_ci_high"] = np.exp(t["ci_high"])
        uni_rows.append(t[["feature","term","coef","OR","OR_ci_low","OR_ci_high","stderr","z","p_value"]])
    except Exception:
        uni_rows.append(pd.DataFrame([{
            "feature": safe_to_orig.get(safe_feat, safe_feat), "term": "ERROR",
            "coef": np.nan, "OR": np.nan, "OR_ci_low": np.nan, "OR_ci_high": np.nan,
            "stderr": np.nan, "z": np.nan, "p_value": np.nan
        }]))
uni = pd.concat(uni_rows, ignore_index=True)
uni_path = save_swiss_excel(uni, "../reports/clean_univariate_results.csv")