<a href="https://colab.research.google.com/github/adyasha95/PredictionTendinopathy/blob/main/predictioninjury.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
uploaded = files.upload()
from google.colab import drive
drive.mount("/content/drive")  # authorize

Saving Final Dastaset Oct25.xlsx to Final Dastaset Oct25 (1).xlsx


In [None]:
!pip install imbalanced-learn xgboost lightgbm catboost openpyxl

In [None]:
# =========================
# Tendinopathy — NeuroMiner-style Nested CV (Leakage-Safe)
# =========================
import os, json, time, warnings, numpy as np, pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
warnings.filterwarnings("ignore")

# --- sklearn / imblearn
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, RobustScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    roc_auc_score, average_precision_score, recall_score, confusion_matrix
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.base import BaseEstimator, TransformerMixin, clone

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.ensemble import BalancedRandomForestClassifier, EasyEnsembleClassifier

# --- boosters (optional)
try:
    from xgboost import XGBClassifier
except Exception:
    XGBClassifier = None
try:
    from lightgbm import LGBMClassifier
except Exception:
    LGBMClassifier = None
try:
    from catboost import CatBoostClassifier
except Exception:
    CatBoostClassifier = None

# =========================
# Config (reduced folds for Oct 20 brief)
# =========================
@dataclass
class Config:
    DATA_PATH: str = "/content/Final Dastaset Oct25.xlsx"     # <-- set for Colab
    TARGET_NAME_PREFERRED: str = "Tendinopathy Injury"
    TARGET_EXCEL_LETTER: str = "BI"
    ORDINAL_RANGES: Tuple[str, ...] = ("AH:AO", "AQ:BG")
    RANDOM_STATE: int = 42
    OUTER_FOLDS: int = 2     # reduced for quick report; set 5 for final
    INNER_FOLDS: int = 3     # reduced for quick report; set 5 for final
    N_TRIALS: int = 5        # modest tuning for speed; increase later
    RESULTS_DIR: str = "./results_tendinopathy_nm"
    NUISANCE_COVARS: Tuple[str, ...] = ("Age",)  # example covariate(s) to regress out if present
    MISSING_THRESHOLD: float = 0.8  # Drop columns with more than this % missing

CFG = Config()
os.makedirs(CFG.RESULTS_DIR, exist_ok=True)


# =========================
# Utilities
# =========================
def excel_letter_to_index(letter: str) -> int:
    letter = letter.strip().upper()
    n = 0
    for ch in letter:
        n = n * 26 + (ord(ch) - 64)
    return n - 1

def expand_excel_range_to_names(df: pd.DataFrame, excel_range: str) -> List[str]:
    a, b = [s.strip().upper() for s in excel_range.split(":")]
    i1, i2 = excel_letter_to_index(a), excel_letter_to_index(b)
    lo, hi = min(i1, i2), max(i1, i2)
    cols = df.columns.tolist()
    hi = min(hi, len(cols) - 1); lo = max(lo, 0)
    return cols[lo:hi+1]

def binarize_yes_no(series: pd.Series) -> np.ndarray:
    # Handle potential NaNs or other values before mapping
    return series.astype(str).str.strip().str.lower().map({"yes":1,"no":0}).fillna(-1).astype(int).values # Map NaNs to -1 or another indicator

def specificity_score(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tn/(tn+fp) if (tn+fp)>0 else 0.0

def compute_metrics(y_true, y_prob, thr=0.5) -> Dict[str,float]:
    y_pred = (y_prob >= thr).astype(int)
    sens = recall_score(y_true, y_pred)
    spec = specificity_score(y_true, y_pred)
    bac  = 0.5*(sens+spec)
    roc  = roc_auc_score(y_true, y_prob) if len(np.unique(y_true))>1 else np.nan
    pr   = average_precision_score(y_true, y_prob) if len(np.unique(y_true))>1 else np.nan
    return {"Sensitivity": sens, "Specificity": spec, "BAC": bac, "ROC_AUC": roc, "PR_AUC": pr}


# =========================
# Covariate Residualizer (NM-style correction inside folds)
# Regresses numeric features on nuisance covariates using only training data,
# then replaces features by residuals on both train and test in a fold.
# =========================
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression

class ResidualizeCovariates(BaseEstimator, TransformerMixin):
    """
    Residualizes numeric feature columns with respect to given covariates.

    NaN-safe:
      - During FIT: for each feature col, fit LinearRegression on rows where
        both covariates and that feature are finite.
      - Store covariate medians from the TRAINING rows to impute covariates
        at TRANSFORM time for prediction only.
      - During TRANSFORM: subtract prediction ONLY where the original feature
        is finite; leave NaNs as NaN so downstream imputers can handle them.
    """
    def __init__(self, covariate_cols):
        self.covariate_cols = covariate_cols
        self.models_ = {}            # col -> LinearRegression
        self.cov_medians_ = None     # medians for covariate imputation at predict time
        self.feature_cols_ = None
        self.covars_present_ = None

    def fit(self, X, y=None):
        if not hasattr(X, "columns"):
            # If X isn't a DataFrame, skip residualization
            self.feature_cols_ = [f"f{i}" for i in range(X.shape[1])]
            self.covars_present_ = []
            return self

        self.feature_cols_ = list(X.columns)
        self.covars_present_ = [c for c in (self.covariate_cols or []) if c in X.columns]
        if len(self.covars_present_) == 0:
            return self

        # covariate matrix
        C = X[self.covars_present_].to_numpy()
        # compute covariate medians on rows that are fully finite in C
        C_finite_mask = np.isfinite(C).all(axis=1)
        if C_finite_mask.any():
            self.cov_medians_ = np.nanmedian(C[C_finite_mask], axis=0)
        else:
            # fallback: column-wise median ignoring NaNs
            self.cov_medians_ = np.nanmedian(C, axis=0)

        # Fit a model per numeric feature where possible
        for col in self.feature_cols_:
            if col in self.covars_present_:
                continue
            xi = X[col].to_numpy()
            # only numeric columns
            if not np.issubdtype(np.asarray(xi).dtype, np.number):
                continue

            # mask: rows with finite covariates AND finite xi
            mask = np.isfinite(xi) & np.isfinite(C).all(axis=1)
            if mask.sum() < 5:
                # too few rows to fit reliably; skip this feature
                continue

            lr = LinearRegression()
            lr.fit(C[mask], xi[mask])
            self.models_[col] = lr

        return self

    def transform(self, X):
        if not hasattr(X, "copy") or len(self.models_) == 0:
            return X

        Xr = X.copy()
        # Build covariate matrix and impute NaNs in covariates with stored medians for prediction
        if len(self.covars_present_) > 0:
            C = Xr[self.covars_present_].to_numpy()
            C_imp = C.copy()
            if self.cov_medians_ is None:
                # if somehow unset, compute simple medians per column
                med = np.nanmedian(C_imp, axis=0)
            else:
                med = self.cov_medians_
            # fill NaNs in covariates for prediction only
            nan_mask = ~np.isfinite(C_imp)
            if nan_mask.any():
                # broadcast medians to NaN positions
                C_imp[nan_mask] = np.take(med, np.where(nan_mask)[1])

            # Apply per-feature residualization
            for col, lr in self.models_.items():
                if col in Xr.columns:
                    xi = Xr[col].to_numpy()
                    pred = lr.predict(C_imp)
                    # subtract only where xi is finite; preserve NaNs to be imputed later
                    finite_mask = np.isfinite(xi)
                    xi_resid = xi.copy()
                    xi_resid[finite_mask] = xi[finite_mask] - pred[finite_mask]
                    Xr[col] = xi_resid
        return Xr
# =========================
# Load data & declare feature sets
# =========================
assert os.path.exists(CFG.DATA_PATH), f"File not found: {CFG.DATA_PATH}"
df = pd.read_excel(CFG.DATA_PATH)

# pick target by name or by Excel letter (BI)
if CFG.TARGET_NAME_PREFERRED in df.columns:
    target_col = CFG.TARGET_NAME_PREFERRED
else:
    target_col = df.columns[excel_letter_to_index(CFG.TARGET_EXCEL_LETTER)]

# Handle potential NaNs in the target column before binarization
df = df.dropna(subset=[target_col]).copy()

# Drop leakage / metadata columns (injury description etc.)
drop_cols = [
    'Date of Injury', 'Location', 'Affected tendon', 'Leg./arm Injured',
    'Injury severity', 'If other, please write',
    'No. of partial games participation before injury',
    'No. of training sessions before injury',
    'No. of full games participation before injury', # This column has many NaNs, drop it.
    'Code'
]
df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore')

# Drop columns with excessive missing values
missing_counts = df.isnull().sum()
cols_to_drop_missing = missing_counts[missing_counts / len(df) > CFG.MISSING_THRESHOLD].index.tolist()
df = df.drop(columns=cols_to_drop_missing, errors='ignore')


# Ordinals by Excel ranges (as requested)
ordinal_cols = []
for r in CFG.ORDINAL_RANGES:
    ordinal_cols += [c for c in expand_excel_range_to_names(df, r) if c != target_col and c in df.columns]
ordinal_cols = list(dict.fromkeys(ordinal_cols))

y = binarize_yes_no(df[target_col])
X = df.drop(columns=[target_col]).copy()

num_cols  = [c for c in X.select_dtypes(include=[np.number]).columns if c not in ordinal_cols]
cat_cols  = [c for c in X.select_dtypes(exclude=[np.number]).columns if c not in ordinal_cols]
ord_num   = [c for c in ordinal_cols if c in X.columns and pd.api.types.is_numeric_dtype(X[c])]
ord_cat   = [c for c in ordinal_cols if c in X.columns and not pd.api.types.is_numeric_dtype(X[c])]


# =========================
# Build preprocessing (NOT FIT HERE) — will be cloned & fit INSIDE CV ONLY
# Order: Residualize (raw df) -> ColumnTransformer(Impute/Encode/Scale)
# =========================
# =========================================================
# 🧠 CombinedPreprocessor Fix
# =========================================================
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, RobustScaler
from sklearn.impute import SimpleImputer

class CombinedPreprocessor(BaseEstimator, TransformerMixin):
    """
    Single transformer that does:
      ResidualizeCovariates (on pandas DataFrame)
      -> ColumnTransformer (Impute/Encode/Scale)

    IMPORTANT for sklearn.clone:
    - __init__ stores params EXACTLY as received (no casting to list, no copies).
    - All derived objects are created in fit().
    """
    def __init__(self, num_cols, cat_cols, ord_num, ord_cat, covariates):
        # Store raw params unchanged for sklearn.clone compatibility
        self.num_cols = num_cols
        self.cat_cols = cat_cols
        self.ord_num = ord_num
        self.ord_cat = ord_cat
        self.covariates = covariates

        # Will be built in fit()
        self._resid = None
        self._ct = None
        self.feature_names_out_ = None

    def fit(self, X, y=None):
        # Build residualizer fresh in fit (train fold only)
        self._resid = ResidualizeCovariates(list(self.covariates) if self.covariates is not None else [])
        Xr = self._resid.fit_transform(X)

        # Build ColumnTransformer fresh in fit, using COPIES of column lists
        ohe_kwargs = dict(handle_unknown="ignore")
        try:
            # sklearn >=1.2
            ohe_kwargs["sparse_output"] = False
        except Exception:
            # older versions ignore this key; we'll set sparse=False instead below
            pass

        # Back-compat for pre-1.2
        def _ohe():
            try:
                return OneHotEncoder(**ohe_kwargs)
            except TypeError:
                return OneHotEncoder(handle_unknown="ignore", sparse=False)

        num_block = Pipeline([
            ("impute", SimpleImputer(strategy="median")),
            ("scale",  RobustScaler(with_centering=True))
        ])
        cat_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
            ("ohe",    _ohe())
        ])
        ord_num_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
        ])
        ord_cat_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
            ("ordenc", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1))
        ])

        self._ct = ColumnTransformer(
            transformers=[
                ("num",     num_block,  list(self.num_cols) if self.num_cols is not None else []),
                ("cat",     cat_block,  list(self.cat_cols) if self.cat_cols is not None else []),
                ("ord_num", ord_num_block, list(self.ord_num) if self.ord_num is not None else []),
                ("ord_cat", ord_cat_block, list(self.ord_cat) if self.ord_cat is not None else []),
            ],
            remainder="drop",
            verbose_feature_names_out=False
        )

        Xt = self._ct.fit_transform(Xr)

        # Cache feature names
        try:
            self.feature_names_out_ = self._ct.get_feature_names_out()
        except Exception:
            self.feature_names_out_ = np.array([f"f{i}" for i in range(Xt.shape[1])])

        return self

    def transform(self, X):
        Xr = self._resid.transform(X) if self._resid is not None else X
        Xt = self._ct.transform(Xr)    if self._ct    is not None else Xr
        return Xt

    def get_feature_names_out(self):
        return self.feature_names_out_

# --- rebuild helper to instantiate the preprocessor
def build_preprocessor() -> CombinedPreprocessor:
    return CombinedPreprocessor(
        num_cols=num_cols,      # use the variables you already defined above
        cat_cols=cat_cols,
        ord_num=ord_num,
        ord_cat=ord_cat,
        covariates=list(CFG.NUISANCE_COVARS)
    )


# =========================
# Model specs & imbalance strategies (all inside inner CV)
# =========================
def make_model_specs() -> List[Tuple[str, BaseEstimator, Dict, str]]:
    specs = []

    # Logistic Regression (cost-sensitive via class_weight)
    lr = LogisticRegression(max_iter=2000, solver="saga", penalty="l2", random_state=CFG.RANDOM_STATE)
    lr_grid = {"clf__C": np.logspace(-3, 2, 12)}
    specs += [("LogReg", lr, lr_grid, "cost_sensitive"),
              ("LogReg_SMOTE", clone(lr), lr_grid, "smote"),
              ("LogReg_Under", clone(lr), lr_grid, "undersample")]

    # Random Forest (with cost-sensitive and sampling; BalancedRF is separate)
    rf = RandomForestClassifier(n_estimators=400, random_state=CFG.RANDOM_STATE, n_jobs=-1)
    rf_grid = {
        "clf__n_estimators": [200, 400, 600],
        "clf__max_depth": [None, 4, 6, 10],
        "clf__min_samples_split": [2, 5, 10],
        "clf__min_samples_leaf": [1, 2, 4]
    }
    specs += [("RF", rf, rf_grid, "cost_sensitive"),
              ("RF_SMOTE", clone(rf), rf_grid, "smote"),
              ("RF_Under", clone(rf), rf_grid, "undersample")]

    # Balanced Random Forest (ensemble balancing)
    brf = BalancedRandomForestClassifier(n_estimators=400, random_state=CFG.RANDOM_STATE, n_jobs=-1)
    brf_grid = {"clf__n_estimators": [200, 400, 600], "clf__max_depth": [None, 4, 6, 10]}
    specs += [("BalancedRF", brf, brf_grid, "balanced_rf")]

    # EasyEnsemble (ensemble balancing via under-sampled AdaBoost)
    eec = EasyEnsembleClassifier(
        n_estimators=10, random_state=CFG.RANDOM_STATE, n_jobs=-1
    )
    eec_grid = {"clf__n_estimators": [6, 10, 14]}
    specs += [("EasyEnsemble", eec, eec_grid, "ensemble_balance")]  # imbalance handled internally

    # XGBoost (include scale_pos_weight as proxy for cost-sensitivity)
    if XGBClassifier is not None:
        xgb = XGBClassifier(
            objective="binary:logistic", eval_metric="auc",
            random_state=CFG.RANDOM_STATE, tree_method="hist",
            n_estimators=400, n_jobs=-1
        )
        xgb_grid = {
            "clf__max_depth": [3, 4, 5, 6],
            "clf__learning_rate": np.linspace(0.01, 0.2, 8),
            "clf__subsample": [0.7, 0.85, 1.0],
            "clf__colsample_bytree": [0.7, 0.85, 1.0],
            "clf__min_child_weight": [1, 5, 10],
            "clf__scale_pos_weight": [1, 2, 3, 4],  # simple grid; fold-specific ratio approximated
        }
        specs += [("XGB", xgb, xgb_grid, "cost_sensitive"),
                  ("XGB_SMOTE", clone(xgb), xgb_grid, "smote"),
                  ("XGB_Under", clone(xgb), xgb_grid, "undersample")]

    # LightGBM
    if LGBMClassifier is not None:
        lgb = LGBMClassifier(objective="binary", random_state=CFG.RANDOM_STATE, n_estimators=600, n_jobs=-1)
        lgb_grid = {
            "clf__num_leaves": [15, 31, 63],
            "clf__max_depth": [-1, 4, 6, 8],
            "clf__learning_rate": np.linspace(0.01, 0.2, 8),
            "clf__subsample": [0.7, 0.85, 1.0],
            "clf__colsample_bytree": [0.7, 0.85, 1.0],
            "clf__min_child_samples": [5, 10, 20],
            "clf__class_weight": [None, "balanced"]
        }
        specs += [("LGBM", lgb, lgb_grid, "cost_sensitive"),
                  ("LGBM_SMOTE", clone(lgb), lgb_grid, "smote"),
                  ("LGBM_Under", clone(lgb), lgb_grid, "undersample")]

    # CatBoost (works fine on dense arrays produced by CT)
    if CatBoostClassifier is not None:
        cb = CatBoostClassifier(
            loss_function="Logloss",
            eval_metric="AUC",
            random_seed=CFG.RANDOM_STATE,
            verbose=False,
            iterations=600
        )
        cb_grid = {
            "clf__depth": [4, 6, 8],
            "clf__learning_rate": np.linspace(0.01, 0.2, 6),
            "clf__l2_leaf_reg": [1, 3, 5, 7, 9]
        }
        specs += [("CatBoost", cb, cb_grid, "cost_sensitive")]  # CB lacks class_weight for dense; rely on params
    return specs


# =========================
# Build full pipeline for a given imbalance mode
# IMPORTANT: All steps are inside the pipeline -> fit on training folds only.
# =========================
def build_pipeline(imbalance_mode: str, estimator) -> ImbPipeline:
    preproc = build_preprocessor()
    steps = [("prep", preproc)]

    if imbalance_mode == "cost_sensitive":
        if hasattr(estimator, "class_weight"):
            estimator = clone(estimator).set_params(class_weight="balanced")
        steps.append(("clf", estimator))

    elif imbalance_mode == "smote":
        steps += [("smote", SMOTE(random_state=CFG.RANDOM_STATE)), ("clf", estimator)]

    elif imbalance_mode == "undersample":
        steps += [("under", RandomUnderSampler(random_state=CFG.RANDOM_STATE)), ("clf", estimator)]

    elif imbalance_mode == "balanced_rf":
        steps.append(("clf", estimator))  # BRF handles rebalancing internally

    elif imbalance_mode == "ensemble_balance":   # EasyEnsemble
        steps.append(("clf", estimator))          # internal under-sampled ensembles

    else:
        steps.append(("clf", estimator))          # fallback

    return ImbPipeline(steps)


# =========================
# Nested CV (outer evaluation / inner tuning) — NeuroMiner-style
# =========================
def run_nested_cv(X: pd.DataFrame, y: np.ndarray) -> Tuple[pd.DataFrame, pd.DataFrame]:
    outer = StratifiedKFold(n_splits=CFG.OUTER_FOLDS, shuffle=True, random_state=CFG.RANDOM_STATE)
    rows = []
    specs = make_model_specs()

    for model_name, est, grid, imb_mode in specs:
        print(f"\n=== {model_name} | {imb_mode} ===")
        ofold = 0
        for tr, te in outer.split(X, y):
            ofold += 1
            X_tr, X_te = X.iloc[tr].copy(), X.iloc[te].copy()
            y_tr, y_te = y[tr], y[te]

            pipe = build_pipeline(imb_mode, est)
            inner = StratifiedKFold(n_splits=CFG.INNER_FOLDS, shuffle=True, random_state=CFG.RANDOM_STATE)

            # Hyperparameter search **inside** inner CV (no peek at outer test) -> NM-compliant
            search = RandomizedSearchCV(
                estimator=pipe,
                param_distributions=grid,
                n_iter=CFG.N_TRIALS,
                scoring="roc_auc",
                cv=inner,
                n_jobs=-1,
                refit=True,
                random_state=CFG.RANDOM_STATE,
                verbose=0
            )
            t0 = time.time()
            search.fit(X_tr, y_tr)
            t_elapsed = round(time.time() - t0, 2)

            best = search.best_estimator_
            if hasattr(best, "predict_proba"):
                y_prob = best.predict_proba(X_te)[:,1]
            else:
                y_prob = best.decision_function(X_te)

            metrics = compute_metrics(y_te, y_prob, thr=0.5)
            rows.append({
                "Model": model_name, "Imbalance": imb_mode, "OuterFold": ofold,
                "BestParams": json.dumps(search.best_params_), "SearchTimeSec": t_elapsed, **metrics
            })

    fold_df = pd.DataFrame(rows)
    summary = (fold_df
               .groupby(["Model","Imbalance"], as_index=False)
               .agg(N=("OuterFold","count"),
                    Sensitivity_mean=("Sensitivity","mean"), Sensitivity_sd=("Sensitivity","std"),
                    Specificity_mean=("Specificity","mean"), Specificity_sd=("Specificity","std"),
                    BAC_mean=("BAC","mean"), BAC_sd=("BAC","std"),
                    ROC_AUC_mean=("ROC_AUC","mean"), ROC_AUC_sd=("ROC_AUC","std"),
                    PR_AUC_mean=("PR_AUC","mean"), PR_AUC_sd=("PR_AUC","std"),
                    SearchTimeSec_sum=("SearchTimeSec","sum")))
    return fold_df, summary


# =========================
# Run (reduced folds for Oct 20 brief)
# =========================
fold_df, summary_df = run_nested_cv(X, y)

out1 = os.path.join(CFG.RESULTS_DIR, "nm_fold_metrics.csv")
out2 = os.path.join(CFG.RESULTS_DIR, "nm_summary_by_model.csv")
fold_df.to_csv(out1, index=False); summary_df.to_csv(out2, index=False)

print("\n[DONE] Nested CV complete (NM-style).")
print(f"Saved -> {out1}\n      -> {out2}")

# quick preview of top performers by PR-AUC then BAC (for the brief)
preview = summary_df.assign(_rk=summary_df["PR_AUC_mean"].fillna(-1) + 1e-3*summary_df["BAC_mean"].fillna(-1)) \
                    .sort_values("_rk", ascending=False) \
                    .drop(columns=["_rk"])
display(preview.head(10))

Unnamed: 0,Model,Imbalance,N,Sensitivity_mean,Sensitivity_sd,Specificity_mean,Specificity_sd,BAC_mean,BAC_sd,ROC_AUC_mean,ROC_AUC_sd,PR_AUC_mean,PR_AUC_sd,SearchTimeSec_sum
12,XGB_SMOTE,smote,2,0.375,0.0,0.85,0.141421,0.6125,0.070711,0.665625,0.07513,0.574094,0.099239,4.04
11,XGB,cost_sensitive,2,0.375,0.176777,0.9,0.070711,0.6375,0.053033,0.721875,0.030936,0.567401,0.030195,5.49
6,LogReg_SMOTE,smote,2,0.5,0.176777,0.8,0.070711,0.65,0.053033,0.625,0.044194,0.548468,0.077877,4.73
8,RF,cost_sensitive,2,0.0,0.0,0.975,0.035355,0.4875,0.017678,0.7375,0.053033,0.540357,0.006501,30.33
5,LogReg,cost_sensitive,2,0.4375,0.265165,0.8,0.070711,0.61875,0.097227,0.6,0.070711,0.512928,0.11551,4.23
9,RF_SMOTE,smote,2,0.0,0.0,0.975,0.035355,0.4875,0.017678,0.709375,0.013258,0.504167,0.048614,14.83
0,BalancedRF,balanced_rf,2,0.3125,0.088388,0.9,0.0,0.60625,0.044194,0.723437,0.050823,0.496487,0.0044,34.56
1,EasyEnsemble,ensemble_balance,2,0.375,0.176777,0.8,0.070711,0.5875,0.123744,0.609375,0.110485,0.477195,0.220426,17.98
2,LGBM,cost_sensitive,2,0.125,0.0,0.975,0.035355,0.55,0.017678,0.59375,0.088388,0.42606,0.026332,11.69
4,LGBM_Under,undersample,2,0.625,0.53033,0.45,0.636396,0.5375,0.053033,0.58125,0.114905,0.421296,0.191741,6.85


In [None]:
# =========================
# NeuroMiner-compliant nested CV with permutation tests
# =========================
import os, json, time, warnings, numpy as np, pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
warnings.filterwarnings("ignore")

# sklearn / imblearn
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, RobustScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import roc_auc_score, average_precision_score, recall_score, confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.utils import check_random_state

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.ensemble import BalancedRandomForestClassifier, EasyEnsembleClassifier

# Boosters (conditional)
try:
    from xgboost import XGBClassifier
except Exception:
    XGBClassifier = None
try:
    from lightgbm import LGBMClassifier
except Exception:
    LGBMClassifier = None
try:
    from catboost import CatBoostClassifier
except Exception:
    CatBoostClassifier = None

# Tabular deep models (conditional)
try:
    from pytorch_tabnet.tab_model import TabNetClassifier
except Exception:
    TabNetClassifier = None

# A minimal TabTransformer via pytorch-widedeep (optional)
try:
    from pytorch_widedeep.models import TabTransformer
    from pytorch_widedeep import Trainer, TabularDataset
    TABTRANSFORMER_AVAILABLE = True
except Exception:
    TABTRANSFORMER_AVAILABLE = False

# -------------------------
# Config
# -------------------------
@dataclass
class Config:
    DATA_PATH: str = "/content/Final Dastaset Oct25.xlsx"
    TARGET_NAME_PREFERRED: str = "Tendinopathy Injury"
    TARGET_EXCEL_LETTER: str = "BI"
    ORDINAL_RANGES: Tuple[str, ...] = ("AH:AO", "AQ:BG")
    RANDOM_STATE: int = 42
    OUTER_FOLDS: int = 2      # use 5 for final
    INNER_FOLDS: int = 3      # use 5 for final
    N_TRIALS: int = 5         # increase for final
    RESULTS_DIR: str = "./results_nm_full"
    NUISANCE_COVARS: Tuple[str, ...] = ("Age",)
    MISSING_THRESHOLD: float = 0.8
    N_PERMUTATIONS: int = 50  # raise for final report if feasible

CFG = Config()
os.makedirs(CFG.RESULTS_DIR, exist_ok=True)

# -------------------------
# Utilities / helpers
# -------------------------
def excel_letter_to_index(letter: str) -> int:
    letter = letter.strip().upper()
    n=0
    for ch in letter:
        n = n*26 + (ord(ch)-64)
    return n-1

def expand_excel_range_to_names(df: pd.DataFrame, excel_range: str) -> List[str]:
    a,b = [s.strip().upper() for s in excel_range.split(":")]
    i1,i2 = excel_letter_to_index(a), excel_letter_to_index(b)
    lo,hi = min(i1,i2), max(i1,i2)
    cols = df.columns.tolist()
    hi = min(hi, len(cols)-1); lo=max(lo,0)
    return cols[lo:hi+1]

def binarize_yes_no(series: pd.Series) -> np.ndarray:
    mapped = series.astype(str).str.strip().str.lower().map({"yes":1,"no":0})
    keep = mapped.isin([0,1])
    return mapped[keep].astype(int).values, keep.values

def specificity_score(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tn/(tn+fp) if (tn+fp)>0 else 0.0

def compute_metrics(y_true, y_prob, thr=0.5) -> Dict[str,float]:
    y_pred = (y_prob >= thr).astype(int)
    sens = recall_score(y_true, y_pred)
    spec = specificity_score(y_true, y_pred)
    bac  = 0.5*(sens+spec)
    roc  = roc_auc_score(y_true, y_prob) if len(np.unique(y_true))>1 else np.nan
    pr   = average_precision_score(y_true, y_prob) if len(np.unique(y_true))>1 else np.nan
    return {"Sensitivity": sens, "Specificity": spec, "BAC": bac, "ROC_AUC": roc, "PR_AUC": pr}

# -------------------------
# Residualizer (NaN-safe)
# -------------------------
from sklearn.linear_model import LinearRegression

class ResidualizeCovariates(BaseEstimator, TransformerMixin):
    def __init__(self, covariate_cols):
        self.covariate_cols = covariate_cols
        self.models_ = {}
        self.cov_medians_ = None
        self.feature_cols_ = None
        self.covars_present_ = None

    def fit(self, X, y=None):
        if not hasattr(X, "columns"):
            self.feature_cols_ = [f"f{i}" for i in range(X.shape[1])]
            self.covars_present_ = []
            return self
        self.feature_cols_ = list(X.columns)
        self.covars_present_ = [c for c in (self.covariate_cols or []) if c in X.columns]
        if not self.covars_present_:
            return self

        C = X[self.covars_present_].to_numpy()
        C_finite = np.isfinite(C).all(axis=1)
        self.cov_medians_ = np.nanmedian(C[C_finite], axis=0) if C_finite.any() else np.nanmedian(C, axis=0)

        for col in self.feature_cols_:
            if col in self.covars_present_:
                continue
            xi = X[col].to_numpy()
            if not np.issubdtype(np.asarray(xi).dtype, np.number):
                continue
            mask = np.isfinite(xi) & np.isfinite(C).all(axis=1)
            if mask.sum() < 5:
                continue
            lr = LinearRegression()
            lr.fit(C[mask], xi[mask])
            self.models_[col] = lr
        return self

    def transform(self, X):
        if not hasattr(X, "copy") or not self.models_:
            return X
        Xr = X.copy()
        if self.covars_present_:
            C = Xr[self.covars_present_].to_numpy()
            C_imp = C.copy()
            med = self.cov_medians_ if self.cov_medians_ is not None else np.nanmedian(C_imp, axis=0)
            nan_mask = ~np.isfinite(C_imp)
            if nan_mask.any():
                C_imp[nan_mask] = np.take(med, np.where(nan_mask)[1])
            for col, lr in self.models_.items():
                if col in Xr.columns:
                    xi = Xr[col].to_numpy()
                    pred = lr.predict(C_imp)
                    finite = np.isfinite(xi)
                    xi_resid = xi.copy()
                    xi_resid[finite] = xi[finite] - pred[finite]
                    Xr[col] = xi_resid
        return Xr

# -------------------------
# Combined Preprocessor (single transformer for imblearn.Pipeline)
# -------------------------
class CombinedPreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self, num_cols, cat_cols, ord_num, ord_cat, covariates):
        self.num_cols = num_cols
        self.cat_cols = cat_cols
        self.ord_num = ord_num
        self.ord_cat = ord_cat
        self.covariates = covariates
        self._resid = None
        self._ct = None
        self.feature_names_out_ = None

    def fit(self, X, y=None):
        self._resid = ResidualizeCovariates(list(self.covariates) if self.covariates is not None else [])
        Xr = self._resid.fit_transform(X)

        def _ohe():
            try:
                return OneHotEncoder(handle_unknown="ignore", sparse_output=False)
            except TypeError:
                return OneHotEncoder(handle_unknown="ignore", sparse=False)

        num_block = Pipeline([
            ("impute", SimpleImputer(strategy="median")),
            ("scale",  RobustScaler(with_centering=True)),
        ])
        cat_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
            ("ohe", _ohe())
        ])
        ord_num_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
        ])
        ord_cat_block = Pipeline([
            ("impute", SimpleImputer(strategy="most_frequent")),
            ("ordenc", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1))
        ])

        self._ct = ColumnTransformer(
            transformers=[
                ("num",     num_block,  list(self.num_cols) if self.num_cols is not None else []),
                ("cat",     cat_block,  list(self.cat_cols) if self.cat_cols is not None else []),
                ("ord_num", ord_num_block, list(self.ord_num) if self.ord_num is not None else []),
                ("ord_cat", ord_cat_block, list(self.ord_cat) if self.ord_cat is not None else []),
            ],
            remainder="drop",
            verbose_feature_names_out=False
        )
        Xt = self._ct.fit_transform(Xr)
        try:
            self.feature_names_out_ = self._ct.get_feature_names_out()
        except Exception:
            self.feature_names_out_ = np.array([f"f{i}" for i in range(Xt.shape[1])])
        return self

    def transform(self, X):
        Xr = self._resid.transform(X) if self._resid is not None else X
        Xt = self._ct.transform(Xr) if self._ct is not None else Xr
        return Xt

    def get_feature_names_out(self):
        return self.feature_names_out_

# -------------------------
# Load data
# -------------------------
assert os.path.exists(CFG.DATA_PATH), f"File not found: {CFG.DATA_PATH}"
df = pd.read_excel(CFG.DATA_PATH)

# Target column
if CFG.TARGET_NAME_PREFERRED in df.columns:
    target_col = CFG.TARGET_NAME_PREFERRED
else:
    target_col = df.columns[excel_letter_to_index(CFG.TARGET_EXCEL_LETTER)]

# Drop rows with missing target
df = df.dropna(subset=[target_col]).copy()

# Drop potential leakage/meta
drop_cols = [
    'Date of Injury','Location','Affected tendon','Leg./arm Injured',
    'Injury severity','If other, please write',
    'No. of partial games participation before injury',
    'No. of training sessions before injury',
    'No. of full games participation before injury',
    'Code'
]
df = df.drop(columns=[c for c in drop_cols if c in df.columns], errors='ignore')

# Drop columns with too much missingness
miss = df.isnull().sum()/len(df)
df = df.drop(columns=miss[miss>CFG.MISSING_THRESHOLD].index.tolist(), errors='ignore')

# Ordinals from Excel ranges
ordinal_cols = []
for r in CFG.ORDINAL_RANGES:
    ordinal_cols += [c for c in expand_excel_range_to_names(df, r) if c != target_col and c in df.columns]
ordinal_cols = list(dict.fromkeys(ordinal_cols))

# y and X
y_full, keep_mask = binarize_yes_no(df[target_col])
X_full = df.loc[keep_mask, :].drop(columns=[target_col]).copy()

num_cols  = [c for c in X_full.select_dtypes(include=[np.number]).columns if c not in ordinal_cols]
cat_cols  = [c for c in X_full.select_dtypes(exclude=[np.number]).columns if c not in ordinal_cols]
ord_num   = [c for c in ordinal_cols if c in X_full.columns and pd.api.types.is_numeric_dtype(X_full[c])]
ord_cat   = [c for c in ordinal_cols if c in X_full.columns and not pd.api.types.is_numeric_dtype(X_full[c])]

# -------------------------
# Preprocessor factory (no fitting here)
# -------------------------
def build_preprocessor() -> CombinedPreprocessor:
    return CombinedPreprocessor(
        num_cols=num_cols, cat_cols=cat_cols, ord_num=ord_num, ord_cat=ord_cat,
        covariates=list(CFG.NUISANCE_COVARS)
    )

# -------------------------
# Model specs (all inside CV2)
# -------------------------
def make_model_specs() -> List[Tuple[str, BaseEstimator, Dict, str]]:
    specs: List[Tuple[str, BaseEstimator, Dict, str]] = []

    # Logistic Regression
    lr = LogisticRegression(max_iter=3000, solver="saga", penalty="l2", random_state=CFG.RANDOM_STATE)
    lr_grid = {"clf__C": np.logspace(-3, 2, 12)}
    specs += [
        ("LogReg", lr, lr_grid, "cost_sensitive"),
        ("LogReg_SMOTE", clone(lr), lr_grid, "smote"),
        ("LogReg_Under", clone(lr), lr_grid, "undersample"),
    ]

    # Random Forest
    rf = RandomForestClassifier(n_estimators=400, random_state=CFG.RANDOM_STATE, n_jobs=-1)
    rf_grid = {
        "clf__n_estimators": [200, 400, 600],
        "clf__max_depth": [None, 3, 5, 8],
        "clf__min_samples_split": [2, 5, 10],
        "clf__min_samples_leaf": [1, 2, 4],
    }
    specs += [
        ("RF", rf, rf_grid, "cost_sensitive"),
        ("RF_SMOTE", clone(rf), rf_grid, "smote"),
        ("RF_Under", clone(rf), rf_grid, "undersample"),
    ]

    # Balanced Random Forest
    brf = BalancedRandomForestClassifier(n_estimators=400, random_state=CFG.RANDOM_STATE, n_jobs=-1)
    brf_grid = {"clf__n_estimators": [200, 400, 600], "clf__max_depth": [None, 3, 5, 8]}
    specs += [("BalancedRF", brf, brf_grid, "balanced_rf")]

    # EasyEnsemble
    eec = EasyEnsembleClassifier(n_estimators=10, random_state=CFG.RANDOM_STATE, n_jobs=-1)
    eec_grid = {"clf__n_estimators": [6, 10, 14]}
    specs += [("EasyEnsemble", eec, eec_grid, "ensemble_balance")]

    # XGBoost
    if XGBClassifier is not None:
        xgb = XGBClassifier(
            objective="binary:logistic", eval_metric="auc",
            random_state=CFG.RANDOM_STATE, tree_method="hist",
            n_estimators=400, n_jobs=-1
        )
        xgb_grid = {
            "clf__max_depth": [3, 4, 5, 6],
            "clf__learning_rate": np.linspace(0.01, 0.2, 8),
            "clf__subsample": [0.7, 0.9, 1.0],
            "clf__colsample_bytree": [0.7, 0.9, 1.0],
            "clf__min_child_weight": [1, 5, 10],
            "clf__scale_pos_weight": [1, 2, 3, 4],
        }
        specs += [
            ("XGB", xgb, xgb_grid, "cost_sensitive"),
            ("XGB_SMOTE", clone(xgb), xgb_grid, "smote"),
            ("XGB_Under", clone(xgb), xgb_grid, "undersample"),
        ]

    # LightGBM
    if LGBMClassifier is not None:
        lgb = LGBMClassifier(
            objective="binary", random_state=CFG.RANDOM_STATE,
            n_estimators=600, n_jobs=-1, verbose=-1
        )
        lgb_grid = {
            "clf__num_leaves": [7, 15, 31],
            "clf__max_depth": [-1, 3, 5],
            "clf__learning_rate": [0.01, 0.05, 0.1],
            "clf__subsample": [0.7, 0.9, 1.0],
            "clf__colsample_bytree": [0.7, 0.9, 1.0],
            "clf__min_child_samples": [5, 10, 20],
            "clf__min_split_gain": [0.0, 0.01],
            "clf__class_weight": [None, "balanced"],
        }
        specs += [
            ("LGBM", lgb, lgb_grid, "cost_sensitive"),
            ("LGBM_SMOTE", clone(lgb), lgb_grid, "smote"),
            ("LGBM_Under", clone(lgb), lgb_grid, "undersample"),
        ]

    # CatBoost
    if CatBoostClassifier is not None:
        cb = CatBoostClassifier(
            loss_function="Logloss", eval_metric="AUC",
            random_seed=CFG.RANDOM_STATE, verbose=False, iterations=600
        )
        cb_grid = {
            "clf__depth": [4, 6, 8],
            "clf__learning_rate": np.linspace(0.01, 0.2, 6),
            "clf__l2_leaf_reg": [1, 3, 5, 7, 9],
        }
        specs += [("CatBoost", cb, cb_grid, "cost_sensitive")]

    # TabNet (if available)
    if TabNetClassifier is not None:
        tabnet = TabNetClassifier(seed=CFG.RANDOM_STATE, verbose=0)
        tn_grid = {
            "clf__n_d": [8, 16], "clf__n_a": [8, 16], "clf__n_steps": [3, 4],
            "clf__gamma": [1.0, 1.5], "clf__lambda_sparse": [0.0, 1e-4]
        }
        specs += [
            ("TabNet_SMOTE", tabnet, tn_grid, "smote"),
            ("TabNet_Under", clone(tabnet), tn_grid, "undersample"),
        ]

    # --------- Hybrid Stacking Meta-learner (INSIDE the function) ---------
    base_estimators = [
        ("lr", LogisticRegression(max_iter=2000, solver="saga", penalty="l2")),
        ("rf", RandomForestClassifier(n_estimators=300, random_state=CFG.RANDOM_STATE)),
    ]
    if XGBClassifier is not None:
        base_estimators.append(
            ("xgb", XGBClassifier(objective="binary:logistic", eval_metric="auc",
                                  random_state=CFG.RANDOM_STATE, tree_method="hist",
                                  n_estimators=300, n_jobs=-1))
        )
    if LGBMClassifier is not None:
        base_estimators.append(
            ("lgb", LGBMClassifier(objective="binary", random_state=CFG.RANDOM_STATE,
                                   n_estimators=400, n_jobs=-1, verbose=-1))
        )
    if len(base_estimators) >= 2:
        stack = StackingClassifier(
            estimators=base_estimators,
            final_estimator=LogisticRegression(max_iter=2000, solver="lbfgs"),
            n_jobs=-1, passthrough=False
        )
        # Small but valid grid on the meta-learner
        stack_grid = {
            "clf__final_estimator__C": [0.25, 1.0, 4.0],
            "clf__final_estimator__class_weight": [None, "balanced"],
        }
        specs += [("Stacking", stack, stack_grid, "cost_sensitive")]

    return specs

# -------------------------
# Build pipeline by imbalance mode
# -------------------------
def build_pipeline(imbalance_mode: str, estimator) -> ImbPipeline:
    preproc = build_preprocessor()
    steps = [("prep", preproc)]
    if imbalance_mode == "cost_sensitive":
        if hasattr(estimator, "class_weight"):
            estimator = clone(estimator).set_params(class_weight="balanced")
        steps.append(("clf", estimator))
    elif imbalance_mode == "smote":
        steps += [("smote", SMOTE(random_state=CFG.RANDOM_STATE)), ("clf", estimator)]
    elif imbalance_mode == "undersample":
        steps += [("under", RandomUnderSampler(random_state=CFG.RANDOM_STATE)), ("clf", estimator)]
    elif imbalance_mode in ("balanced_rf", "ensemble_balance"):
        steps.append(("clf", estimator))  # internal balancing
    else:
        steps.append(("clf", estimator))
    return ImbPipeline(steps)

# -------------------------
# INNER CV: tune inside CV1-train (returns best params + best score)
# -------------------------
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

def inner_cv(X_tr, y_tr, model_name, estimator, grid, imb_mode, random_state):
    pipe = build_pipeline(imb_mode, estimator)
    inner = StratifiedKFold(n_splits=CFG.INNER_FOLDS, shuffle=True, random_state=random_state)

    if not grid:  # e.g., StackingClassifier with no search space
        search = GridSearchCV(
            estimator=pipe,
            param_grid={},          # single fit
            scoring="roc_auc",
            cv=inner,
            n_jobs=-1,
            refit=True,
            verbose=0
        )
    else:
        search = RandomizedSearchCV(
            estimator=pipe,
            param_distributions=grid,
            n_iter=CFG.N_TRIALS,
            scoring="roc_auc",
            cv=inner,
            n_jobs=-1,
            refit=True,
            random_state=random_state,
            verbose=0
        )

    search.fit(X_tr, y_tr)
    return search.best_params_, search.best_score_

# -------------------------
# OUTER CV: orchestrate NM nesting & evaluation
# -------------------------
def outer_cv(X: pd.DataFrame, y: np.ndarray) -> Tuple[pd.DataFrame, pd.DataFrame]:
    rng = check_random_state(CFG.RANDOM_STATE)
    outer = StratifiedKFold(n_splits=CFG.OUTER_FOLDS, shuffle=True, random_state=CFG.RANDOM_STATE)
    rows = []
    specs = make_model_specs()

    for model_name, est, grid, imb_mode in specs:
        print(f"\n=== {model_name} | {imb_mode} ===")
        for ofold, (tr, te) in enumerate(outer.split(X, y), start=1):
            X_tr, X_te = X.iloc[tr].copy(), X.iloc[te].copy()
            y_tr, y_te = y[tr], y[te]
            t0 = time.time()
            best_params, inner_auc = inner_cv(X_tr, y_tr, model_name, est, grid, imb_mode, random_state=rng.randint(0,1_000_000))
            # Rebuild best pipeline, refit on full CV1-train
            best_pipe = build_pipeline(imb_mode, clone(est))
            if best_params:
                best_pipe.set_params(**best_params)
            best_pipe.fit(X_tr, y_tr)

            # Evaluate on CV1-test
            if hasattr(best_pipe, "predict_proba"):
                y_prob = best_pipe.predict_proba(X_te)[:,1]
            else:
                y_prob = best_pipe.decision_function(X_te)
            metrics = compute_metrics(y_te, y_prob, thr=0.5)
            elapsed = round(time.time()-t0, 2)

            rows.append({
                "Model": model_name, "Imbalance": imb_mode, "OuterFold": ofold,
                "BestParams": json.dumps(best_params), "InnerROC_AUC": inner_auc,
                "Search+RefitTimeSec": elapsed, **metrics
            })

    fold_df = pd.DataFrame(rows)
    summary = (fold_df.groupby(["Model","Imbalance"], as_index=False)
               .agg(N=("OuterFold","count"),
                    Sensitivity_mean=("Sensitivity","mean"), Sensitivity_sd=("Sensitivity","std"),
                    Specificity_mean=("Specificity","mean"), Specificity_sd=("Specificity","std"),
                    BAC_mean=("BAC","mean"), BAC_sd=("BAC","std"),
                    ROC_AUC_mean=("ROC_AUC","mean"), ROC_AUC_sd=("ROC_AUC","std"),
                    PR_AUC_mean=("PR_AUC","mean"), PR_AUC_sd=("PR_AUC","std"),
                    TimeSec_sum=("Search+RefitTimeSec","sum")))
    return fold_df, summary

# -------------------------
# Permutation testing (fully nested)
# -------------------------
def run_permutation_tests(X: pd.DataFrame, y: np.ndarray, observed_summary: pd.DataFrame,
                          n_permutations: int, random_state: int = CFG.RANDOM_STATE) -> pd.DataFrame:
    rng = check_random_state(random_state)
    # We’ll compute null distribution for BAC_mean per (Model,Imbalance)
    key_cols = ["Model","Imbalance"]
    obs = observed_summary[key_cols + ["BAC_mean"]].copy().rename(columns={"BAC_mean":"BAC_obs"})

    perm_rows = []
    for p in range(1, n_permutations+1):
        y_perm = y.copy()
        rng.shuffle(y_perm)
        fold_df_p, summary_p = outer_cv(X, y_perm)  # full nesting on permuted labels
        tmp = summary_p[key_cols + ["BAC_mean"]].copy().rename(columns={"BAC_mean":"BAC_perm"})
        tmp["perm_idx"] = p
        perm_rows.append(tmp)
        print(f"[perm {p}/{n_permutations}] done")

    perm_df = pd.concat(perm_rows, ignore_index=True)
    # p-value = (1 + #perm >= obs) / (1 + n_perm)
    merged = perm_df.merge(obs, on=key_cols, how="right")
    pvals = (merged
             .groupby(key_cols, as_index=False)
             .apply(lambda g: pd.Series({
                 "p_value": (1 + (g["BAC_perm"] >= g["BAC_obs"].iloc[0]).sum()) / (1 + n_permutations)
             }))
             .reset_index(drop=True))
    return pvals

# -------------------------
# Final training (optional): refit best on full data
# -------------------------
def train_final_model(X: pd.DataFrame, y: np.ndarray, top_model: Tuple[str,str]) -> ImbPipeline:
    """top_model = (Model, Imbalance) from observed summary ranking."""
    # pick estimator & grid, but we won't do search here—use best params from outer CV if desired
    specs_map = { (m,imb):(est,grid) for (m,est,grid,imb) in make_model_specs() }
    assert top_model in specs_map, f"Unknown model: {top_model}"
    est, grid = specs_map[top_model]
    # Simple approach: one more inner search on all data (quick) then refit full
    best_params, _ = inner_cv(X, y, top_model[0], est, grid, top_model[1], random_state=CFG.RANDOM_STATE+777)
    pipe = build_pipeline(top_model[1], clone(est))
    if best_params:
        pipe.set_params(**best_params)
    pipe.fit(X, y)
    return pipe

# =========================
# RUN (Oct-20 brief settings)
# =========================
fold_df, summary_df = outer_cv(X_full, y_full)
CFG.RESULTS_DIR = "/content/drive/MyDrive/tendinopathy/results_nm_full"
import os; os.makedirs(CFG.RESULTS_DIR, exist_ok=True)
fold_path = os.path.join(CFG.RESULTS_DIR, "fold_metrics.csv")
summ_path = os.path.join(CFG.RESULTS_DIR, "summary_by_model.csv")
fold_df.to_csv(fold_path, index=False)
summary_df.to_csv(summ_path, index=False)
print(f"[DONE] Saved fold metrics -> {fold_path}")
print(f"[DONE] Saved summary     -> {summ_path}")

# point outputs to Drive

# (Optional for brief) Permutation tests; consider lowering for quick check
pval_df = run_permutation_tests(X_full, y_full, summary_df, n_permutations=CFG.N_PERMUTATIONS)
pval_path = os.path.join(CFG.RESULTS_DIR, "permutation_pvalues.csv")
pval_df.to_csv(pval_path, index=False); print(f"[DONE] Saved permutation p-values -> {pval_path}")

50

In [None]:
# 1) Mount Drive if you want outputs persisted
from google.colab import drive
drive.mount("/content/drive")

# 2) Point EDA output to the same results dir you’ve been using
RESULTS_DIR = "/content/drive/MyDrive/tendinopathy/results_nm_full"  # same as CFG.RESULTS_DIR
import os; os.makedirs(RESULTS_DIR, exist_ok=True)

# 3) Import and run
from eda_quality_report import run_eda
# assuming your loaded DataFrame is `df` and target column is `target_col`
run_eda(df, target_col=target_col, results_dir=RESULTS_DIR)