In [None]:
from __future__ import annotations
from pathlib import Path
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy import stats
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier, plot_tree, export_text
from sklearn.metrics import (
    accuracy_score, roc_auc_score, roc_curve, RocCurveDisplay,
    confusion_matrix, classification_report
)
import statsmodels.api as sm
from statsmodels.formula.api import logit


# File paths
REPO_ROOT = Path(__file__).resolve().parents[1] if "__file__" in globals() else Path(".").resolve()
DATA_DIR  = REPO_ROOT / "data"
DEMO_DIR  = DATA_DIR / "demo"
OUT_FIG   = REPO_ROOT / "outputs" / "figures"
OUT_TAB   = REPO_ROOT / "outputs" / "tables"
OUT_MOD   = REPO_ROOT / "outputs" / "models"
for d in (OUT_FIG, OUT_TAB, OUT_MOD):
    d.mkdir(parents=True, exist_ok=True)


# Brand colors
MA_RED  = "#E31837"
MA_GRAY = "#717073"
MA_BLACK = "#231F20"
MA_TEAL  = "#00728F"


# Helpers
def to01(x):
    if pd.isna(x): return np.nan
    s = str(x).strip().lower()
    if s in {"1","yes","y","true","t"}: return 1
    if s in {"0","no","n","false","f"}: return 0
    try:
        return 1 if float(s) != 0.0 else 0
    except Exception:
        return np.nan

def safe_read_csv(filename: str, demo_name: str | None = None, required: bool = True) -> pd.DataFrame:
    """Read from data/, else data/demo/ if provided."""
    real_p = DATA_DIR / filename
    if real_p.exists():
        return pd.read_csv(real_p, low_memory=False)
    if demo_name:
        demo_p = DEMO_DIR / demo_name
        if demo_p.exists():
            print(f"[info] Using demo file for {filename}: {demo_p.name}")
            return pd.read_csv(demo_p, low_memory=False)
    if required:
        raise FileNotFoundError(f"Missing required data file: {filename}")
    print(f"[warn] Optional data missing (and no demo): {filename}")
    return pd.DataFrame()

def nonempty_series(x):
    return x.astype(str).str.strip().replace({"nan":"", "<na>":""})

SPORT_MAP = {
    0: "None", 1: "Basketball", 2: "Cross Country", 3: "Football",
    4: "Soccer", 5: "Track & Field", 6: "Volleyball",
    7: "Other (College Participation)", 8: "Alpine Ski", 9: "Lacrosse",
    10: "Other (No College Participation)"
}


# Load (demo-safe)
ma_bench = safe_read_csv("FOR ANALYSIS - MA Benchmark.csv",  "synthetic_benchmark.csv", required=True)
ma_pred  = safe_read_csv("FOR ANALYSIS - MA Predictive.csv", "synthetic_predictive.csv", required=True)
ncaa     = safe_read_csv("FOR ANALYSIS - NCAA.csv",          "synthetic_ncaa.csv", required=False)
nfhs     = safe_read_csv("FOR ANALYSIS - NFHS.csv",          "synthetic_nfhs.csv", required=False)

In [None]:
# RQ1 — Overall chi-square vs NCAA (6.3%)
def filter_cohort_no_ma_grad_nan(df, grad_col="MA_Graduate"):
    if grad_col not in df.columns:
        return df.copy()
    grad = df[grad_col].map(to01)
    return df[grad.notna()].copy()

def add_college_athlete_flag(frame):
    t = frame.copy()
    t["college_athlete"] = t["College_Athletics_Participant"].map(to01)
    if "College_NCAA_Division" in t.columns:
        has_div = nonempty_series(t["College_NCAA_Division"]) != ""
        m = t["college_athlete"].isna()
        t.loc[m, "college_athlete"] = has_div.loc[m].astype(int)
    t["college_athlete"] = t["college_athlete"].fillna(0).astype(int)
    return t

bench = add_college_athlete_flag(filter_cohort_no_ma_grad_nan(ma_bench, "MA_Graduate"))
obs_adv   = int(bench["college_athlete"].sum())
obs_total = int(len(bench))
obs_non   = obs_total - obs_adv

NCAA_BASE = 0.063
exp_adv = obs_total * NCAA_BASE
exp_non = obs_total * (1 - NCAA_BASE)
chi2, p = stats.chisquare(f_obs=[obs_adv, obs_non], f_exp=[exp_adv, exp_non])

(pd.DataFrame({
    "Advanced":[obs_adv], "Not_Advanced":[obs_non], "N":[obs_total],
    "Expected_Adv":[round(exp_adv,2)], "Expected_Not":[round(exp_non,2)],
    "Chi2":[round(chi2,2)], "p_value":[p]
})
 .to_csv(OUT_TAB / "rq1_overall_chi_square.csv", index=False))

# Figure: Observed vs Expected
fig, ax = plt.subplots(figsize=(7.5,5.5))
df_plot = pd.DataFrame({
    "Observed (MA)": [obs_adv, obs_non],
    "Expected (NCAA)": [exp_adv, exp_non]
}, index=["Advanced","Not Advanced"])
df_plot.plot(kind="bar", ax=ax, color=[MA_RED, MA_GRAY], width=0.75, edgecolor="black")
ax.set_title("Observed vs Expected Advancement (Overall)", fontweight="bold")
ax.set_ylabel("Count"); ax.set_xlabel("")
ax.legend(loc="upper left"); ax.grid(axis="y", linestyle="--", alpha=0.4)
for pch in ax.patches:
    h = pch.get_height()
    if np.isfinite(h):
        ax.annotate(f"{h:,.0f}", (pch.get_x()+pch.get_width()/2, h),
                    ha="center", va="bottom", fontsize=9, xytext=(0,3), textcoords="offset points")
fig.tight_layout()
fig.savefig(OUT_FIG / "rq1_overall_observed_vs_expected.png", dpi=150)
plt.close(fig)

In [None]:
# RQ1 — Per-sport + z-tests
def build_hs_participation(ma_pred_df: pd.DataFrame, id_col="Identifier") -> pd.DataFrame:
    df = filter_cohort_no_ma_grad_nan(ma_pred_df, "MA_Graduate")
    sport_cols = [c for c in df.columns if "sport_code" in c.lower() and "college" not in c.lower()]
    long = df[[id_col, *sport_cols]].melt(id_vars=[id_col], value_vars=sport_cols,
                                          var_name="season", value_name="Sport_Code")
    long["Sport_Code"] = pd.to_numeric(long["Sport_Code"], errors="coerce")
    long["Sport"] = long["Sport_Code"].map(SPORT_MAP)
    long = long[long["Sport"].notna() & (long["Sport"] != "None")]
    hs_counts = (long.drop_duplicates([id_col,"Sport"])
                      .groupby("Sport", as_index=False)
                      .agg(MA_HS_Participants=(id_col,"nunique")))
    return hs_counts

def build_college_advancements(ma_pred_df: pd.DataFrame, id_col="Identifier") -> pd.DataFrame:
    df = add_college_athlete_flag(filter_cohort_no_ma_grad_nan(ma_pred_df, "MA_Graduate"))
    df["Sport_Code"] = pd.to_numeric(df.get("College_Sport_Code_1"), errors="coerce")
    df["Sport"] = df["Sport_Code"].map(SPORT_MAP)
    adv = (df[(df["college_athlete"] == 1) & df["Sport"].notna() & (df["Sport"] != "None")]
             [[id_col,"Sport"]]
             .drop_duplicates())
    return adv.groupby("Sport", as_index=False).agg(Advanced=(id_col,"nunique"))

def load_ncaa_rates_clean(ncaa_df: pd.DataFrame, combine_genders=True) -> pd.DataFrame:
    if ncaa_df.empty:
        return pd.DataFrame(columns=["Sport","NCAA_Rate"])
    rates = ncaa_df.copy()
    # Expect columns: Sport_Code, HS_Participants_2022_23, Pct_HS_to_NCAA_Overall
    rates["Sport_Code"] = pd.to_numeric(rates["Sport_Code"], errors="coerce")
    rates["HS_Participants_2022_23"] = (
        rates["HS_Participants_2022_23"].astype(str).str.replace(",","", regex=False)
    ).astype(float)
    col = "Pct_HS_to_NCAA_Overall"
    rates[col] = (rates[col].astype(str).str.replace("%","", regex=False)
                  .str.replace(",","", regex=False)).astype(float) / 100.0
    if combine_genders and "Gender" in rates.columns:
        tmp = rates.dropna(subset=["Sport_Code","HS_Participants_2022_23",col]).copy()
        tmp["w"] = tmp["HS_Participants_2022_23"] * tmp[col]
        agg = (tmp.groupby("Sport_Code", as_index=False)
                  .agg(w_sum=("w","sum"), hs_sum=("HS_Participants_2022_23","sum")))
        agg["NCAA_Rate"] = np.where(agg["hs_sum"]>0, agg["w_sum"]/agg["hs_sum"], np.nan)
        out = agg[["Sport_Code","NCAA_Rate"]]
    else:
        out = rates.rename(columns={col:"NCAA_Rate"})[["Sport_Code","NCAA_Rate"]]
    out["Sport"] = out["Sport_Code"].map(SPORT_MAP)
    return out.dropna(subset=["Sport","NCAA_Rate"])[["Sport","NCAA_Rate"]]

hs_counts = build_hs_participation(ma_pred)
adv_counts = build_college_advancements(ma_pred)
ncaa_rates = load_ncaa_rates_clean(ncaa, combine_genders=True)

sport_df = (hs_counts.merge(adv_counts, on="Sport", how="left")
                     .merge(ncaa_rates, on="Sport", how="left"))
sport_df["Advanced"] = sport_df["Advanced"].fillna(0).astype(int)
sport_df = sport_df.dropna(subset=["NCAA_Rate"])
sport_df["MA_Rate"] = np.where(sport_df["MA_HS_Participants"]>0,
                               sport_df["Advanced"] / sport_df["MA_HS_Participants"], np.nan)

# Per-sport z-test table
rows = []
for _, r in sport_df.iterrows():
    n = int(r["MA_HS_Participants"]); x = int(r["Advanced"]); p0 = float(r["NCAA_Rate"])
    if n <= 0 or not (0 < p0 < 1): continue
    phat = x / n
    se = np.sqrt(p0 * (1 - p0) / n)
    z = (phat - p0) / se
    pval = 2 * (1 - stats.norm.cdf(abs(z)))
    rows.append({
        "Sport": r["Sport"],
        "MA_HS_Participants": n,
        "Advanced": x,
        "MA_Rate": phat,
        "NCAA_Rate": p0,
        "z": z, "p_value": pval,
        "Significant_(p<.05)": "Yes" if pval < 0.05 else "No"
    })

z_table = pd.DataFrame(rows).sort_values("p_value") if rows else pd.DataFrame()
if not z_table.empty:
    z_out = z_table.copy()
    z_out["MA_Rate"]   = (z_out["MA_Rate"]*100).round(1)
    z_out["NCAA_Rate"] = (z_out["NCAA_Rate"]*100).round(1)
    z_out.to_csv(OUT_TAB / "rq1_per_sport_ztests.csv", index=False)

# Basketball + football bar charts
def plot_sport_rate_bar(sport_name: str, out_png: str):
    row = sport_df.loc[sport_df["Sport"] == sport_name]
    if row.empty: 
        print(f"[warn] No data for sport: {sport_name}"); 
        return
    ma_rate = float(row["MA_Rate"].iloc[0]) * 100.0
    ncaa_rt = float(row["NCAA_Rate"].iloc[0]) * 100.0
    plt.figure(figsize=(5.5,5))
    plt.bar(["Minnehaha Academy","NCAA Average"], [ma_rate, ncaa_rt],
            color=[MA_RED, MA_GRAY], edgecolor="black")
    plt.ylabel("Advancement Rate (%)")
    plt.title(f"{sport_name}: MA vs NCAA")
    plt.ylim(0, max(ma_rate, ncaa_rt)*1.25 if np.isfinite(ma_rate) and np.isfinite(ncaa_rt) else 20)
    for i, v in enumerate([ma_rate, ncaa_rt]):
        if np.isfinite(v):
            plt.text(i, v+0.5, f"{v:.1f}%", ha="center", va="bottom", fontsize=9)
    plt.tight_layout()
    plt.savefig(OUT_FIG / out_png, dpi=150); plt.close()

for sp, fn in [("Basketball","rq1_bar_basketball.png"),
               ("Football","rq1_bar_football.png")]:
    plot_sport_rate_bar(sp, fn)


In [None]:
# RQ2 — Logistic regression (odds ratios) + Decision Tree
def build_rq2_frame(ma_pred_df: pd.DataFrame) -> pd.DataFrame:
    df = ma_pred_df.copy()
    # target with NCAA division fallback
    df["college_athlete"] = df["College_Athletics_Participant"].map(to01)
    if "College_NCAA_Division" in df.columns:
        has_div = nonempty_series(df["College_NCAA_Division"]) != ""
        m = df["college_athlete"].isna()
        df.loc[m, "college_athlete"] = has_div.loc[m].astype(int)

    df["gpa"]          = pd.to_numeric(df.get("Cumulative_GPA"), errors="coerce")
    df["sports_count"] = pd.to_numeric(df.get("Total_Count_of_HS_Sports_All_Years"), errors="coerce")
    df["fine_arts"]    = (pd.to_numeric(df.get("Years_As_Fine_Arts"), errors="coerce").fillna(0) > 0).astype(int)
    df["ma_graduate"]  = df.get("MA_Graduate").map(to01)
    df["fin_aid"]      = df.get("MA_Financial_Aid").map(to01)
    df["free_lunch"]   = df.get("Federal_Free_Lunch").map(to01)

    days_enrolled = pd.to_numeric(df.get("Total_Days_Enrolled_MA_All_Years"), errors="coerce")
    tardies       = pd.to_numeric(df.get("Total_Number_Tardies_All_Years"), errors="coerce")
    abs_pct       = pd.to_numeric(df.get("Percentage_of_Absences_All_Years"), errors="coerce")
    df["abs_percent"] = np.where(abs_pct > 1, abs_pct/100.0, abs_pct)
    df["tardy_rate"]  = np.where(days_enrolled.fillna(0)>0, tardies / days_enrolled, np.nan)

    df["gender"] = df.get("Gender").astype("object")
    df["region"] = df.get("Region_City_Based").astype("object")
    df["race"]   = df.get("Reported_Federal_Race").astype("object")

    cols = ["college_athlete","gpa","sports_count","fine_arts","ma_graduate",
            "abs_percent","tardy_rate","fin_aid","free_lunch",
            "gender","region","race"]
    dfm = df[cols].copy()
    dfm = dfm.dropna(subset=["college_athlete","gpa","sports_count","abs_percent","tardy_rate"])
    dfm["college_athlete"] = dfm["college_athlete"].astype(int)
    return dfm

rq2 = build_rq2_frame(ma_pred)

# Split
train_df, test_df = train_test_split(rq2, test_size=0.20, random_state=123, stratify=rq2["college_athlete"])

# --- Statsmodels logit for ORs ---
num_cols = ["gpa","sports_count","fine_arts","ma_graduate","abs_percent","tardy_rate","fin_aid","free_lunch"]
cat_cols = ["gender","region","race"]

ohe = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
ct  = ColumnTransformer([("num","passthrough", num_cols), ("cat", ohe, cat_cols)], remainder="drop")

Xtr = ct.fit_transform(train_df.drop(columns=["college_athlete"]))
Xte = ct.transform(test_df.drop(columns=["college_athlete"]))
ytr = train_df["college_athlete"].values
yte = test_df["college_athlete"].values

Xtr_sm = sm.add_constant(Xtr)
Xte_sm = sm.add_constant(Xte)

logit_model = sm.Logit(ytr, Xtr_sm, missing="raise")
logit_res   = logit_model.fit(disp=False)

# OR table
feature_names = num_cols + list(ct.named_transformers_["cat"].get_feature_names_out(cat_cols))
or_df = pd.DataFrame({
    "term": ["Intercept"] + feature_names,
    "odds_ratio": np.exp(logit_res.params),
    "ci_low": np.exp(logit_res.conf_int()[0]),
    "ci_high": np.exp(logit_res.conf_int()[1]),
    "p_value": logit_res.pvalues
})
or_df.to_csv(OUT_TAB / "rq2_logit_odds_ratios.csv", index=False)

# Test metrics + ROC (logit)
prob_glm = logit_res.predict(Xte_sm)
pred_glm = (prob_glm >= 0.5).astype(int)
acc_glm  = accuracy_score(yte, pred_glm)
auc_glm  = roc_auc_score(yte, prob_glm)
cm_glm   = confusion_matrix(yte, pred_glm)

pd.DataFrame([{"model":"logit","accuracy":acc_glm,"auc":auc_glm,"n_train":len(ytr),"n_test":len(yte)}]) \
  .to_csv(OUT_TAB / "rq2_logit_metrics.csv", index=False)
pd.DataFrame(cm_glm, index=["Actual 0","Actual 1"], columns=["Pred 0","Pred 1"]) \
  .to_csv(OUT_TAB / "rq2_logit_confusion_matrix.csv")

fpr, tpr, _ = roc_curve(yte, prob_glm)
plt.figure(figsize=(7,5))
plt.plot(fpr, tpr, lw=2, label=f"Logit (AUC={auc_glm:.3f})", color=MA_TEAL)
plt.plot([0,1],[0,1],"--", color="grey")
plt.xlabel("1 - Specificity"); plt.ylabel("Sensitivity")
plt.title("ROC — Logistic Regression (Test Set)")
plt.legend(loc="lower right")
plt.tight_layout(); plt.savefig(OUT_FIG / "rq2_logit_roc.png", dpi=150); plt.close()

# --- Decision Tree
preprocess = ColumnTransformer(
    transformers=[
        ("num", SimpleImputer(strategy="median"), num_cols),
        ("cat", Pipeline(steps=[
            ("imp", SimpleImputer(strategy="most_frequent")),
            ("ohe", OneHotEncoder(handle_unknown="ignore"))
        ]), cat_cols)
])
tree = DecisionTreeClassifier(random_state=123)
pipe = Pipeline([("prep", preprocess), ("clf", tree)])

grid = GridSearchCV(
    pipe,
    param_grid={"clf__max_depth":[3,4,5], "clf__min_samples_leaf":[10,15,20], "clf__class_weight":[None,"balanced"]},
    cv=5, scoring="roc_auc", n_jobs=-1
).fit(train_df.drop(columns=["college_athlete"]), ytr)

best = grid.best_estimator_
prob_tree = best.predict_proba(test_df.drop(columns=["college_athlete"]))[:,1]
auc_tree  = roc_auc_score(yte, prob_tree)
yhat_tree = (prob_tree >= 0.5).astype(int)
pd.DataFrame(confusion_matrix(yte, yhat_tree), index=["Actual 0","Actual 1"], columns=["Pred 0","Pred 1"]) \
  .to_csv(OUT_TAB / "rq2_tree_confusion_matrix.csv")
pd.DataFrame([{"model":"decision_tree","auc":auc_tree}]).to_csv(OUT_TAB / "rq2_tree_metrics.csv", index=False)

# Tree figure + text rules
prep = best.named_steps["prep"]
ohe_names = []
for name, trans, cols in prep.transformers_:
    if name == "num":
        num_names = cols
    elif name == "cat":
        ohe_names = list(trans.named_steps["ohe"].get_feature_names_out(cols))
feat_names = list(num_cols) + ohe_names

plt.figure(figsize=(18,10))
plot_tree(best.named_steps["clf"], feature_names=feat_names, class_names=["No","Yes"], filled=False)
plt.tight_layout(); plt.savefig(OUT_FIG / "rq2_decision_tree.png", dpi=150); plt.close()

rules_txt = export_text(best.named_steps["clf"], feature_names=feat_names)
(OUT_MOD / "rq2_tree_rules.txt").write_text(rules_txt)

# Save model summary
(OUT_MOD / "rq2_logit_summary.txt").write_text(str(logit_res.summary()))

print("\n[done] Outputs saved to ./outputs/{figures,tables,models}")