In [1]:
# ============================================
# Quantum-Entropy Synergy & ML Baselines
# Data: merged_ovars_plco.csv (label: is_case 0/1)
# Outputs: CSVs + printed slide-ready summary
# ============================================
import numpy as np
import pandas as pd
from numpy.linalg import eigh
from itertools import combinations
from typing import Dict, List, Tuple

from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.inspection import permutation_importance
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)

# -----------------------------
# Config
# -----------------------------
DATA_PATH = r"D:\Lakshmi Kumari\merged_oc_ovars.csv"  
LABEL_COL = "is_case"
PANEL_PREFIXES = {
    "Panel_A": "panela_",
    "Panel_B": "panelb_",
    "Panel_C": "panelc_",
    "Panel_D": "paneld_",
    "Panel_E": "panele_",
}
BOOTSTRAP_B = 500   # set to 1000 if you want tighter CIs and have time
RANDOM_SEED = 7

# -----------------------------
# Helpers
# -----------------------------
def vne_entropy(X: np.ndarray, eps: float = 1e-12) -> float:
    """Von Neumann entropy of covariance-derived density matrix."""
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    # center and simple mean-impute to keep covariance defined
    X = X - np.nanmean(X, axis=0, keepdims=True)
    col_means = np.nanmean(X, axis=0)
    inds = np.where(np.isnan(X))
    X[inds] = np.take(col_means, inds[1])
    C = np.cov(X, rowvar=False)
    tr = np.trace(C)
    if tr <= eps:
        return 0.0
    rho = C / tr
    vals = eigh(rho, UPLO='U')[0]
    vals = vals[vals > eps]
    return float(-(vals * np.log2(vals)).sum())

def synergy_ratio(SA: float, SB: float, S_union: float, eps: float = 1e-12) -> float:
    return S_union / (SA + SB + eps)

def bootstrap_ci(values: np.ndarray, alpha: float = 0.05) -> Tuple[float, float]:
    lo = np.quantile(values, alpha/2)
    hi = np.quantile(values, 1 - alpha/2)
    return float(lo), float(hi)

def bootstrap_synergy(XA: np.ndarray, XB: np.ndarray, B: int = 500, seed: int = 42) -> Tuple[float, Tuple[float,float]]:
    rng = np.random.default_rng(seed)
    n = XA.shape[0]
    s_vals = np.empty(B, dtype=float)
    for i in range(B):
        idx = rng.integers(0, n, n)
        SA = vne_entropy(XA[idx])
        SB = vne_entropy(XB[idx])
        S_union = vne_entropy(np.hstack([XA[idx], XB[idx]]))
        s_vals[i] = synergy_ratio(SA, SB, S_union)
    mean_s = float(np.mean(s_vals))
    ci = bootstrap_ci(s_vals, alpha=0.05)
    return mean_s, ci

def prefix_groups(df: pd.DataFrame, prefixes: Dict[str, str]) -> Dict[str, List[str]]:
    groups = {}
    for gname, pref in prefixes.items():
        cols = [c for c in df.columns if str(c).startswith(pref)]
        groups[gname] = cols
    all_cols = sorted({c for cols in groups.values() for c in cols})
    groups["All"] = all_cols
    return groups

def extract_arrays(df: pd.DataFrame, groups: Dict[str, List[str]]) -> Dict[str, np.ndarray]:
    return {g: df[cols].to_numpy() for g, cols in groups.items() if len(cols) > 0}

def entropy_by_group(group_arrays: Dict[str, np.ndarray], all_group_name: str = "All") -> pd.DataFrame:
    rows = []
    S_all = vne_entropy(group_arrays[all_group_name]) if all_group_name in group_arrays else None
    for g, X in group_arrays.items():
        Sg = vne_entropy(X)
        rows.append((g, Sg))
    ent_df = pd.DataFrame(rows, columns=["Group", "vNE"])
    denom = S_all if (S_all is not None and S_all > 0) else (ent_df["vNE"].max() if ent_df["vNE"].max() > 0 else 1.0)
    ent_df["vNE_norm"] = ent_df["vNE"] / denom
    return ent_df.sort_values("vNE_norm", ascending=False).reset_index(drop=True)

def synergy_scan(group_arrays: Dict[str, np.ndarray],
                 exclude_near_zero: bool = True,
                 threshold: float = 0.01,
                 B: int = 500) -> pd.DataFrame:
    raw_ent = {g: vne_entropy(X) for g, X in group_arrays.items()}
    valid = {g: X for g, X in group_arrays.items() if (not exclude_near_zero) or (raw_ent[g] >= threshold)}
    rows = []
    keys = [k for k in valid.keys() if k != "All"]  # avoid pairing "All" with everything
    for A, Bname in combinations(keys, 2):
        XA, XB = valid[A], valid[Bname]
        SA, SB = raw_ent[A], raw_ent[Bname]
        S_union = vne_entropy(np.hstack([XA, XB]))
        Sratio = synergy_ratio(SA, SB, S_union)
        mean_s, ci = bootstrap_synergy(XA, XB, B=B, seed=RANDOM_SEED)
        rows.append((A, Bname, SA, SB, S_union, Sratio, mean_s, ci[0], ci[1]))
    res = pd.DataFrame(rows, columns=["G_A", "G_B", "S_A", "S_B", "S_AuB", "S_ratio", "S_boot_mean", "CI_low", "CI_high"])
    return res.sort_values("S_boot_mean", ascending=False).reset_index(drop=True)

def cv_auc_baseline(X: np.ndarray, y: np.ndarray, model: str = "gbm", seed: int = 7) -> float:
    if model == "gbm":
        clf = GradientBoostingClassifier(random_state=seed)
    elif model == "mlp":
        clf = MLPClassifier(hidden_layer_sizes=(64, 32),
              solver="adam",
              learning_rate_init=1e-3,
              alpha=1e-3,           # a bit more regularization helps converge
              early_stopping=True,  # splits off validation fold internally
              n_iter_no_change=15,
              tol=1e-4,
              max_iter=1000,
              random_state=7)

    else:
        raise ValueError("model must be 'gbm' or 'mlp'")
    pipe = Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler(with_mean=True, with_std=True)),
        ("clf", clf),
    ])
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
    auc = cross_val_score(pipe, X, y, cv=cv, scoring="roc_auc").mean()
    return float(auc)

def auc_for_panels(df: pd.DataFrame, y: np.ndarray, groups: Dict[str, List[str]]) -> pd.DataFrame:
    rows = []
    for g in ["Panel_A", "Panel_B", "Panel_C", "Panel_D", "Panel_E", "All"]:
        if g in groups and len(groups[g]) > 0:
            Xg = df[groups[g]].to_numpy()
            auc_gbm = cv_auc_baseline(Xg, y, model="gbm", seed=RANDOM_SEED)
            auc_mlp = cv_auc_baseline(Xg, y, model="mlp", seed=RANDOM_SEED)
            rows.append((g, auc_gbm, auc_mlp, len(groups[g])))

    if all(k in groups and len(groups[k]) > 0 for k in ["Panel_A", "Panel_C"]):
        cols = groups["Panel_A"] + groups["Panel_C"]
        Xac = df[cols].to_numpy()
        rows.append(("Panel_A∪Panel_C",
                     cv_auc_baseline(Xac, y, "gbm", RANDOM_SEED),
                     cv_auc_baseline(Xac, y, "mlp", RANDOM_SEED),
                     len(cols)))
    if all(k in groups and len(groups[k]) > 0 for k in ["All", "Panel_C"]):
        cols = sorted(set(groups["All"] + groups["Panel_C"]))
        Xallc = df[cols].to_numpy()
        rows.append(("All∪Panel_C",
                     cv_auc_baseline(Xallc, y, "gbm", RANDOM_SEED),
                     cv_auc_baseline(Xallc, y, "mlp", RANDOM_SEED),
                     len(cols)))

    auc_df = pd.DataFrame(rows, columns=["Group", "AUC_GBM_5CV", "AUC_MLP_5CV", "Num_Features"])
    return auc_df.sort_values("AUC_GBM_5CV", ascending=False).reset_index(drop=True)

LEAKAGE_KEYWORDS = ["exit", "death", "event", "status", "eligible", "follow", "survival", "days"]

def leakage_report(df: pd.DataFrame, label_col: str) -> pd.DataFrame:
    cols = [c for c in df.columns if c != label_col]
    flags = []
    for c in cols:
        low = str(c).lower()
        suspicious = any(k in low for k in LEAKAGE_KEYWORDS)
        flags.append((c, suspicious))
    return pd.DataFrame(flags, columns=["feature", "possible_leak"]).sort_values("possible_leak", ascending=False)

def permutation_importance_report(df: pd.DataFrame, y: np.ndarray, cols: List[str], seed: int = 7) -> pd.DataFrame:
    X = df[cols].to_numpy()
    pipe = Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler(with_mean=True, with_std=True)),
        ("clf", GradientBoostingClassifier(random_state=seed)),
    ])
    pipe.fit(X, y)
    r = permutation_importance(pipe, X, y, n_repeats=10, random_state=seed, scoring="roc_auc")
    df_imp = pd.DataFrame({"feature": cols, "importance": r.importances_mean})
    return df_imp.sort_values("importance", ascending=False).reset_index(drop=True)

# -----------------------------
# Load data
# -----------------------------
df = pd.read_csv(DATA_PATH)
assert LABEL_COL in df.columns, f"Label column '{LABEL_COL}' not found."
y = df[LABEL_COL].astype(int).to_numpy()

# Build groups by prefix and arrays
groups = prefix_groups(df, PANEL_PREFIXES)
group_arrays = extract_arrays(df, groups)

# 1) Entropy landscape
entropy_df = entropy_by_group(group_arrays, all_group_name="All")
entropy_df.to_csv("entropy_by_group.csv", index=False)
print("\n=== Entropy by Group (vNE; normalized to 'All' if present) ===")
print(entropy_df.round(4).to_string(index=False))

# 2) Synergy scan with bootstrapped CI
synergy_df = synergy_scan(group_arrays, exclude_near_zero=True, threshold=0.01, B=BOOTSTRAP_B)
synergy_df.to_csv("synergy_pairs.csv", index=False)
print("\n=== Pairwise Synergy (bootstrapped) ===")
print(synergy_df.round(4).head(20).to_string(index=False))

# 3) AUC comparisons (GBM & MLP)
auc_df = auc_for_panels(df, y, groups)
auc_df.to_csv("auc_comparison.csv", index=False)
print("\n=== AUC Comparison (5-fold CV) ===")
print(auc_df.round(4).to_string(index=False))

# 4) Permutation importance for A∪C (if present)
if all(k in groups and len(groups[k])>0 for k in ["Panel_A", "Panel_C"]):
    ac_cols = groups["Panel_A"] + groups["Panel_C"]
    imp_ac_df = permutation_importance_report(df, y, ac_cols, seed=RANDOM_SEED)
    imp_ac_df.to_csv("feature_importance_A_C.csv", index=False)
    print("\n=== Permutation Importance (Panel A ∪ C) top 25 ===")
    print(imp_ac_df.head(25).round(6).to_string(index=False))

# 5) Leakage keyword scan
leak_df = leakage_report(df, LABEL_COL)
leak_df.to_csv("leakage_report.csv", index=False)
print("\n=== Possible Leakage Features (keyword scan; top 50) ===")
print(leak_df.head(50).to_string(index=False))

# 6) Slide-ready summary
summary_rows = []
if not synergy_df.empty:
    top = synergy_df.iloc[0]
    summary_rows.append({
        "Best Synergy Pair": f"{top['G_A']} + {top['G_B']}",
        "Mean Synergy": round(top["S_boot_mean"], 4),
        "95% CI": f"[{round(top['CI_low'],4)}, {round(top['CI_high'],4)}]"
    })

def _get_auc(group_name, col):
    if group_name in set(auc_df["Group"]):
        return float(auc_df.loc[auc_df["Group"]==group_name, col].iloc[0])
    return np.nan

auc_ac_gbm = _get_auc("Panel_A∪Panel_C", "AUC_GBM_5CV")
auc_c_gbm   = _get_auc("Panel_C", "AUC_GBM_5CV")
auc_all_gbm = _get_auc("All", "AUC_GBM_5CV")
auc_allc_gbm= _get_auc("All∪Panel_C", "AUC_GBM_5CV")

summary_rows.append({
    "Delta AUC (GBM): A∪C vs C": None if np.isnan(auc_ac_gbm) or np.isnan(auc_c_gbm) else round(auc_ac_gbm - auc_c_gbm, 4),
    "Delta AUC (GBM): All∪C vs All": None if np.isnan(auc_allc_gbm) or np.isnan(auc_all_gbm) else round(auc_allc_gbm - auc_all_gbm, 4),
    "AUC (GBM) All": None if np.isnan(auc_all_gbm) else round(auc_all_gbm, 4)
})
summary_df = pd.DataFrame(summary_rows)
summary_df.to_csv("summary_slide_ready.csv", index=False)
print("\n=== Slide-Ready Summary ===")
print(summary_df.to_string(index=False))

print("\nSaved files in working directory:")
print(" - entropy_by_group.csv")
print(" - synergy_pairs.csv")
print(" - auc_comparison.csv")
print(" - feature_importance_A_C.csv (if A,C exist)")
print(" - leakage_report.csv")



=== Entropy by Group (vNE; normalized to 'All' if present) ===
  Group    vNE  vNE_norm
Panel_E 1.8892    2.4042
Panel_D 1.2590    1.6022
Panel_B 1.0712    1.3632
    All 0.7858    1.0000
Panel_C 0.0045    0.0058
Panel_A 0.0004    0.0006

=== Pairwise Synergy (bootstrapped) ===
    G_A     G_B    S_A    S_B  S_AuB  S_ratio  S_boot_mean  CI_low  CI_high
Panel_B Panel_E 1.0712 1.8892 1.9131   0.6462       0.6576  0.5803   0.7724
Panel_D Panel_E 1.2590 1.8892 1.8902   0.6004       0.5988  0.5581   0.6313
Panel_B Panel_D 1.0712 1.2590 1.2376   0.5311       0.5226  0.4002   0.6233

=== AUC Comparison (5-fold CV) ===
          Group  AUC_GBM_5CV  AUC_MLP_5CV  Num_Features
            All       0.7068       0.6316            36
    All∪Panel_C       0.7068       0.6316            36
        Panel_B       0.7067       0.6659             7
Panel_A∪Panel_C       0.7053       0.5403            14
        Panel_C       0.6881       0.5482             8
        Panel_A       0.6735       0.5305   

In [2]:
# ============================================================
# PLCO Console Report: Quantum Entropy + Group Summary + AUC
# Reproduces the "Analyzing PLCO dataset" style printout
# ============================================================

import re
import numpy as np
import pandas as pd
from numpy.linalg import eigh
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score

# ---------- CONFIG ----------
FILE =  r"D:\Lakshmi Kumari\merged_oc_ovars.csv"      # change path if needed
LABEL_COL = "is_case"               # 0/1
DEC = 4                             # print precision for vNE

# Panel prefixes (exactly like your dataset)
PANEL_PREFIX = {
    "Panel_A": "panela_",
    "Panel_B": "panelb_",
    "Panel_C": "panelc_",
    "Panel_D": "paneld_",
    "Panel_E": "panele_",
}

# Keyword buckets (edit if your column names differ)
KEYWORDS = {
    # classical reproductive markers / gyneco history
    "Reproductive": [
        "menarche", "menopause", "post_menopausal", "parity",
        "preg", "hyst", "bcontr", "ovariesr", "horm_stat", "horm_f"
    ],
    # lifestyle / anthropometrics
    "Lifestyle": [
        "bmi", "height", "weight", "smok", "cig", "alcohol", "pack_years", "race", "hispanic"
    ],
    # family history
    "Family_History": [
        "fam_", "family", "mother", "sister", "relative", "polyps_f", "arthrit_f", "osteopor_f", "divertic_f", "gallblad_f"
    ],
}

# Clinical_Lifestyle = a broader join of Reproductive + Lifestyle (+ a few extras)
CLINICAL_EXTRAS = [
    "eligible", "sqx", "dhq", "bq_age", "preg_f", "horm_", "menstrs_stat_type"
]
# ---------------------------

def vne_entropy(X: np.ndarray, eps: float = 1e-12) -> float:
    """Von Neumann entropy of covariance-derived density matrix."""
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    # center + mean-impute to stabilize covariance
    X = X - np.nanmean(X, axis=0, keepdims=True)
    col_means = np.nanmean(X, axis=0)
    inds = np.where(np.isnan(X))
    if inds[0].size:
        X[inds] = np.take(col_means, inds[1])
    C = np.cov(X, rowvar=False)
    tr = np.trace(C)
    if tr <= eps:
        return 0.0
    rho = C / tr
    vals = eigh(rho, UPLO="U")[0]
    vals = vals[vals > eps]
    return float(-(vals * np.log2(vals)).sum())

def columns_by_prefix(df: pd.DataFrame, prefix: str):
    return [c for c in df.columns if str(c).startswith(prefix)]

def columns_by_keywords(df: pd.DataFrame, words):
    cols = []
    for c in df.columns:
        s = str(c).lower()
        if any(w in s for w in words):
            cols.append(c)
    return sorted(list(set(cols)))

def make_groups(df: pd.DataFrame):
    groups = {}
    # Panels by prefix
    for g, pref in PANEL_PREFIX.items():
        groups[g] = columns_by_prefix(df, pref)

    # All_Biomarkers = union of all panel columns
    biomarker_cols = sorted({c for glist in groups.values() for c in glist})
    groups["All_Biomarkers"] = biomarker_cols

    # Reproductive / Lifestyle / Family_History by keywords (excluding panel columns)
    remaining_cols = [c for c in df.columns if c not in biomarker_cols + [LABEL_COL]]

    groups["Reproductive"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Reproductive"])]
    groups["Lifestyle"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Lifestyle"])]
    groups["Family_History"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Family_History"])]

    # Clinical_Lifestyle = Lifestyle ∪ Reproductive plus a few extra clinical admin bits
    clin = set(groups["Lifestyle"]) | set(groups["Reproductive"])
    clin |= set([c for c in remaining_cols if any(k in c.lower() for k in CLINICAL_EXTRAS)])
    groups["Clinical_Lifestyle"] = sorted(list(clin))

    return groups

def entropy_report(df: pd.DataFrame, groups: dict):
    # compute vNE for each group, normalized to All_Biomarkers
    vne = {}
    for g, cols in groups.items():
        if len(cols) == 0:
            vne[g] = np.nan
        else:
            vne[g] = vne_entropy(df[cols].to_numpy())
    base = vne.get("All_Biomarkers", np.nan)
    norm = {g: (v/base if (base and base > 0) else np.nan) for g, v in vne.items()}

    order = ["Panel_A","Panel_B","Panel_C","Panel_D","Panel_E",
             "Reproductive","Lifestyle","Family_History","All_Biomarkers","Clinical_Lifestyle"]

    lines = []
    for g in order:
        if g in vne and not np.isnan(vne[g]):
            lines.append((g, round(vne[g], DEC), round(norm[g], DEC)))
    return lines

def quick_auc(df: pd.DataFrame, Xcols, y, seed=7):
    clf = GradientBoostingClassifier(random_state=seed)
    pipe = Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler(with_mean=True, with_std=True)),
        ("clf", clf),
    ])
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
    scores = cross_val_score(pipe, df[Xcols], y, cv=cv, scoring="roc_auc")
    return float(scores.mean()), float(scores.std())

# =========================
# Main pretty print report
# =========================
df = pd.read_csv(FILE)
print("\nPLCO")
print("================================")
print("Analyzing PLCO dataset")
print("================================")

# Basic stats
n = len(df)
assert LABEL_COL in df.columns, f"Label '{LABEL_COL}' not found."
y = df[LABEL_COL].astype(int)
pos = int((y == 1).sum())
neg = n - pos
bal = pos / max(1, n)

print(f"✅ Loaded PLCO: {n} samples, {df.drop(columns=[LABEL_COL]).shape[1]} features, target balance = {bal:.2f}")
print(f"Loaded {n} rows. Target distribution:")
print(y.value_counts().rename_axis('target').to_frame('count'))
print()

# Grouping
groups = make_groups(df)
group_sizes = {k: len(v) for k, v in groups.items()}
print("Feature groups:", group_sizes)
print()

# Entropy block
print("Quantum Entropy Results:")
ent_lines = entropy_report(df, groups)
for g, v, vn in ent_lines:
    print(f"- {g}: {vn:.4f}")
print()

# AUC sanity (use All_Biomarkers to reflect the “Synergy Metrics” line style)
if len(groups["All_Biomarkers"]) > 0:
    auc_mean, auc_std = quick_auc(df, groups["All_Biomarkers"], y)
    print("Synergy Metrics:")
    print(f"AUC-ROC: {auc_mean:.3f} ± {auc_std:.3f}")
else:
    print("Synergy Metrics:")
    print("AUC-ROC: n/a (no biomarker columns found)")



PLCO
Analyzing PLCO dataset
✅ Loaded PLCO: 1101 samples, 177 features, target balance = 0.11
Loaded 1101 rows. Target distribution:
        count
target       
0         982
1         119

Feature groups: {'Panel_A': 6, 'Panel_B': 7, 'Panel_C': 8, 'Panel_D': 7, 'Panel_E': 8, 'All_Biomarkers': 36, 'Reproductive': 12, 'Lifestyle': 18, 'Family_History': 6, 'Clinical_Lifestyle': 44}

Quantum Entropy Results:
- Panel_A: 0.0006
- Panel_B: 1.3632
- Panel_C: 0.0058
- Panel_D: 1.6022
- Panel_E: 2.4042
- Reproductive: 3.0290
- Lifestyle: 1.9183
- Family_History: 1.2661
- All_Biomarkers: 1.0000
- Clinical_Lifestyle: 0.8189

Synergy Metrics:
AUC-ROC: 0.707 ± 0.073


In [3]:
# ─────────────────────────────────────────────────────────────────────────────
# Biomarker Metrics Logger (Notebook Version)
# - Paste this cell once near the top of your notebook.
# - Call `compute_basic_metrics(...)` for each run,
#   then `logger.log_panel(...)` or `logger.log_pair(...)`.
# - Finally call `logger.export_all()` to get an Excel workbook + PNG figures.
# ─────────────────────────────────────────────────────────────────────────────

from dataclasses import dataclass, field
from typing import Optional, Tuple, Dict, Any, List
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Optional: sklearn metrics (falls back gracefully if missing)
try:
    from sklearn.metrics import roc_auc_score, accuracy_score, f1_score, brier_score_loss
except Exception:
    roc_auc_score = None
    accuracy_score = None
    f1_score = None
    brier_score_loss = None

# --------------------------
# Metric helpers
# --------------------------

def expected_calibration_error(y_true: np.ndarray, y_prob: np.ndarray, n_bins: int = 10) -> float:
    """
    ECE with equal-width bins over [0,1].
    y_true: shape [N] in {0,1}
    y_prob: shape [N] in [0,1]
    """
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.clip(np.asarray(y_prob), 0.0, 1.0)

    edges = np.linspace(0.0, 1.0, n_bins + 1)
    ece = 0.0
    N = len(y_true)

    for i in range(n_bins):
        lo, hi = edges[i], edges[i+1]
        mask = (y_prob >= lo) & (y_prob < hi) if i < n_bins - 1 else (y_prob >= lo) & (y_prob <= hi)
        if not np.any(mask):
            continue
        conf = y_prob[mask].mean()
        acc = y_true[mask].mean()
        ece += (mask.sum() / N) * abs(acc - conf)
    return float(ece)


def compute_basic_metrics(
    y_true: np.ndarray,
    y_prob: np.ndarray,
    y_pred: Optional[np.ndarray] = None,
    n_bins: int = 10,
    positive_label: int = 1,
) -> Dict[str, Any]:
    """
    Compute AUROC, Accuracy, Macro-F1, Brier, and ECE for binary classification.
    - y_true: 0/1 labels
    - y_prob: predicted probability for the positive class
    - y_pred: optional; if None, uses threshold 0.5
    """
    y_true = np.asarray(y_true)
    y_prob = np.asarray(y_prob)
    if y_pred is None:
        y_pred = (y_prob >= 0.5).astype(int)
    else:
        y_pred = np.asarray(y_pred)

    # Defaults (no sklearn)
    auroc = np.nan
    acc = float((y_pred == y_true).mean())
    macro_f1 = np.nan
    brier = float(np.mean((y_prob - (y_true == positive_label).astype(float))**2))
    ece = expected_calibration_error((y_true == positive_label).astype(int), y_prob, n_bins=n_bins)

    # sklearn-enabled (if available)
    if roc_auc_score is not None:
        try:
            auroc = float(roc_auc_score((y_true == positive_label).astype(int), y_prob))
        except Exception:
            pass
    if f1_score is not None:
        try:
            macro_f1 = float(f1_score(y_true, y_pred, average="macro"))
        except Exception:
            pass
    if brier_score_loss is not None:
        try:
            brier = float(brier_score_loss((y_true == positive_label).astype(int), y_prob))
        except Exception:
            pass

    # rounded outputs
    def r(x): 
        return (None if x is None else (np.nan if isinstance(x, float) and np.isnan(x) else round(float(x), 6)))

    return dict(
        auroc=r(auroc),
        accuracy=r(acc),
        macro_f1=r(macro_f1),
        brier=r(brier),
        ece=r(ece),
    )


# --------------------------
# Logger
# --------------------------

@dataclass
class MetricsLogger:
    out_dir: str = "results_metrics"
    panels_rows: List[Dict[str, Any]] = field(default_factory=list)
    pairs_rows: List[Dict[str, Any]] = field(default_factory=list)

    def log_panel(
        self,
        panel_name: str,
        n_samples: int,
        auroc: Optional[float] = None,
        accuracy: Optional[float] = None,
        macro_f1: Optional[float] = None,
        brier: Optional[float] = None,
        ece: Optional[float] = None,
        vne_mean: Optional[float] = None,
        vne_lo: Optional[float] = None,
        vne_hi: Optional[float] = None,
        extra: Optional[Dict[str, Any]] = None,
    ) -> None:
        row = dict(
            panel=panel_name,
            auroc=auroc,
            accuracy=accuracy,
            macro_f1=macro_f1,
            brier=brier,
            ece=ece,
            vne_mean=vne_mean,
            vne_lo=vne_lo,
            vne_hi=vne_hi,
            n_samples=int(n_samples),
        )
        if extra: row.update(extra)
        self.panels_rows.append(row)

    def log_pair(
        self,
        pair_label: str,
        n_samples: int,
        auroc: Optional[float] = None,
        accuracy: Optional[float] = None,
        macro_f1: Optional[float] = None,
        brier: Optional[float] = None,
        ece: Optional[float] = None,
        vne_mean: Optional[float] = None,
        vne_lo: Optional[float] = None,
        vne_hi: Optional[float] = None,
        extra: Optional[Dict[str, Any]] = None,
    ) -> None:
        row = dict(
            panel_pair=pair_label,
            auroc=auroc,
            accuracy=accuracy,
            macro_f1=macro_f1,
            brier=brier,
            ece=ece,
            vne_mean=vne_mean,
            vne_lo=vne_lo,
            vne_hi=vne_hi,
            n_samples=int(n_samples),
        )
        if extra: row.update(extra)
        self.pairs_rows.append(row)

    @staticmethod
    def _add_ranks(df: pd.DataFrame, keys: Tuple[str, ...] = ("auroc","macro_f1","brier","ece","vne_mean")) -> pd.DataFrame:
        """Create metric-wise ranks and an overall composite rank (sum of ranks)."""
        if df.empty:
            return df
        df = df.copy()
        lower_better = {"brier","ece"}
        for k in keys:
            if k not in df.columns:
                df[k] = np.nan
            asc = (k in lower_better)  # lower is better for brier/ece
            df[f"rank_{k}"] = df[k].rank(ascending=asc, method="min")
        rank_cols = [c for c in df.columns if c.startswith("rank_")]
        df["rank_sum"] = df[rank_cols].sum(axis=1)
        df["rank_overall"] = df["rank_sum"].rank(ascending=True, method="min")
        return df

    def export_all(self) -> Tuple[str, Dict[str, str]]:
        """
        Writes:
          - Excel workbook with Panels_Summary, Pairs_Summary, Top5 sheets
          - Figures (PNG): per-panel AUROC, per-panel vNE±CI, per-pair Composite Rank,
                           Top-10 AUROC pairs, Top-10 ECE pairs
        Returns: (excel_path, dict_of_figure_paths)
        """
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        out_dir = Path(self.out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)

        # Build dataframes
        df_panels = pd.DataFrame(self.panels_rows) if self.panels_rows else pd.DataFrame(columns=[
            "panel","auroc","accuracy","macro_f1","brier","ece","vne_mean","vne_lo","vne_hi","n_samples"
        ])
        df_pairs  = pd.DataFrame(self.pairs_rows) if self.pairs_rows else pd.DataFrame(columns=[
            "panel_pair","auroc","accuracy","macro_f1","brier","ece","vne_mean","vne_lo","vne_hi","n_samples"
        ])

        df_panels_ranked = self._add_ranks(df_panels)
        df_pairs_ranked  = self._add_ranks(df_pairs)

        # Excel
        excel_path = str(out_dir / f"biomarker_metrics_{ts}.xlsx")
        with pd.ExcelWriter(excel_path, engine="xlsxwriter") as writer:
            df_panels_ranked.sort_values("rank_overall", na_position="last").to_excel(writer, index=False, sheet_name="Panels_Summary")
            df_pairs_ranked.sort_values("rank_overall", na_position="last").to_excel(writer, index=False, sheet_name="Pairs_Summary")
            if not df_panels_ranked.empty:
                df_panels_ranked.sort_values("rank_overall").head(5).to_excel(writer, index=False, sheet_name="Top5_Panels")
            if not df_pairs_ranked.empty:
                df_pairs_ranked.sort_values("rank_overall").head(5).to_excel(writer, index=False, sheet_name="Top5_Pairs")

        # Figures (one per plot; no seaborn, no explicit colors)
        figs = {}
        figs_dir = out_dir / f"figs_{ts}"
        figs_dir.mkdir(parents=True, exist_ok=True)

        if not df_panels_ranked.empty:
            try:
                plt.figure()
                order = df_panels_ranked.sort_values("auroc", ascending=False)
                plt.bar(order["panel"], order["auroc"])
                plt.xticks(rotation=45, ha="right")
                plt.ylabel("AUROC")
                plt.title("Per-Panel AUROC")
                plt.tight_layout()
                p = str(figs_dir / "auroc_by_panel.png")
                plt.savefig(p, dpi=200); plt.close()
                figs["auroc_by_panel"] = p
            except Exception:
                pass

            try:
                plt.figure()
                idx = np.arange(len(df_panels_ranked))
                vmean = pd.to_numeric(df_panels_ranked["vne_mean"], errors="coerce")
                vlo = pd.to_numeric(df_panels_ranked["vne_lo"], errors="coerce")
                vhi = pd.to_numeric(df_panels_ranked["vne_hi"], errors="coerce")
                yerr = np.vstack([vmean - vlo, vhi - vmean])
                yerr = np.nan_to_num(yerr, nan=0.0)
                plt.errorbar(idx, vmean, yerr=yerr, fmt='o')
                plt.xticks(idx, df_panels_ranked["panel"], rotation=45, ha="right")
                plt.ylabel("vNE (mean ± CI)")
                plt.title("Per-Panel von Neumann Entropy")
                plt.tight_layout()
                p = str(figs_dir / "vne_by_panel.png")
                plt.savefig(p, dpi=200); plt.close()
                figs["vne_by_panel"] = p
            except Exception:
                pass

        if not df_pairs_ranked.empty:
            try:
                plt.figure()
                orderp = df_pairs_ranked.sort_values("rank_overall")
                plt.bar(orderp["panel_pair"], orderp["rank_overall"])
                plt.xticks(rotation=45, ha="right")
                plt.ylabel("Composite Rank (lower is better)")
                plt.title("Per-Pair Composite Rank")
                plt.tight_layout()
                p = str(figs_dir / "composite_rank_by_pair.png")
                plt.savefig(p, dpi=200); plt.close()
                figs["composite_rank_by_pair"] = p
            except Exception:
                pass

            try:
                plt.figure()
                orderp10 = df_pairs_ranked.sort_values("auroc", ascending=False).head(10)
                plt.bar(orderp10["panel_pair"], orderp10["auroc"])
                plt.xticks(rotation=45, ha="right")
                plt.ylabel("AUROC")
                plt.title("Top 10 Pairs by AUROC")
                plt.tight_layout()
                p = str(figs_dir / "auroc_top10_pairs.png")
                plt.savefig(p, dpi=200); plt.close()
                figs["auroc_top10_pairs"] = p
            except Exception:
                pass

            try:
                plt.figure()
                orderp10_ece = df_pairs_ranked.sort_values("ece", ascending=True).head(10)
                plt.bar(orderp10_ece["panel_pair"], orderp10_ece["ece"])
                plt.xticks(rotation=45, ha="right")
                plt.ylabel("ECE")
                plt.title("Top 10 Pairs by Calibration (ECE)")
                plt.tight_layout()
                p = str(figs_dir / "ece_top10_pairs.png")
                plt.savefig(p, dpi=200); plt.close()
                figs["ece_top10_pairs"] = p
            except Exception:
                pass

        print("✅ Export complete")
        print("Excel:", excel_path)
        print("Figures:", figs)
        return excel_path, figs


In [4]:
# ─────────────────────────────────────────────────────────────────────────────
# PLCO Biomarker Metrics: Panels + Pairs (Clean Rerun, No Dummy Numbers)
# - Uses out-of-fold probabilities for metrics (AUROC/Acc/F1/Brier/ECE)
# - Computes vNE + bootstrap CI; normalizes vNE by All_Biomarkers
# - Exports timestamped Excel + PNG figures (Matplotlib; no seaborn/colors)
# ─────────────────────────────────────────────────────────────────────────────
import re
import os
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
from numpy.linalg import eigh

from sklearn.model_selection import StratifiedKFold, cross_val_predict
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score, brier_score_loss

import matplotlib.pyplot as plt

# ---------- CONFIG ----------
FILE =  r"D:\Lakshmi Kumari\merged_oc_ovars.csv"      # change path if needed
LABEL_COL = "is_case"               # 0/1
DEC = 4                             # print precision for vNE in console
N_SPLITS = 5                        # CV splits
SEED = 7                            # reproducibility
N_BOOT_VNE = 200                    # bootstrap iterations for vNE CI (set 0 to skip)
OUT_DIR = "OC_Quantum_results"         # output folder
# ---------------------------

# Panel prefixes (exactly like your dataset)
PANEL_PREFIX = {
    "Panel_A": "panela_",
    "Panel_B": "panelb_",
    "Panel_C": "panelc_",
    "Panel_D": "paneld_",
    "Panel_E": "panele_",
}

# Keyword buckets (edit if your column names differ)
KEYWORDS = {
    "Reproductive": [
        "menarche", "menopause", "post_menopausal", "parity",
        "preg", "hyst", "bcontr", "ovariesr", "horm_stat", "horm_f"
    ],
    "Lifestyle": [
        "bmi", "height", "weight", "smok", "cig", "alcohol", "pack_years", "race", "hispanic"
    ],
    "Family_History": [
        "fam_", "family", "mother", "sister", "relative",
        "polyps_f", "arthrit_f", "osteopor_f", "divertic_f", "gallblad_f"
    ],
}

# Clinical_Lifestyle = Lifestyle ∪ Reproductive plus a few extras
CLINICAL_EXTRAS = [
    "eligible", "sqx", "dhq", "bq_age", "preg_f", "horm_", "menstrs_stat_type"
]

# ─────────────────────────────────────────────────────────────────────────────
# Helpers: vNE, grouping, metrics
# ─────────────────────────────────────────────────────────────────────────────
def vne_entropy(X: np.ndarray, eps: float = 1e-12) -> float:
    """Von Neumann entropy of covariance-derived density matrix."""
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    X = X - np.nanmean(X, axis=0, keepdims=True)
    col_means = np.nanmean(X, axis=0)
    inds = np.where(np.isnan(X))
    if inds[0].size:
        X[inds] = np.take(col_means, inds[1])
    C = np.cov(X, rowvar=False)
    tr = np.trace(C)
    if tr <= eps:
        return 0.0
    rho = C / tr
    vals = eigh(rho, UPLO="U")[0]
    vals = vals[vals > eps]
    return float(-(vals * np.log2(vals)).sum())

def columns_by_prefix(df: pd.DataFrame, prefix: str):
    return [c for c in df.columns if str(c).startswith(prefix)]

def make_groups(df: pd.DataFrame):
    groups = {}
    # Panels by prefix
    for g, pref in PANEL_PREFIX.items():
        groups[g] = columns_by_prefix(df, pref)

    # All_Biomarkers = union of all panel columns
    biomarker_cols = sorted({c for glist in groups.values() for c in glist})
    groups["All_Biomarkers"] = biomarker_cols

    # Reproductive / Lifestyle / Family_History by keywords (excluding panel columns)
    remaining_cols = [c for c in df.columns if c not in biomarker_cols + [LABEL_COL]]

    groups["Reproductive"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Reproductive"])]
    groups["Lifestyle"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Lifestyle"])]
    groups["Family_History"] = [c for c in remaining_cols if any(k in c.lower() for k in KEYWORDS["Family_History"])]

    # Clinical_Lifestyle
    clin = set(groups["Lifestyle"]) | set(groups["Reproductive"])
    clin |= set([c for c in remaining_cols if any(k in c.lower() for k in CLINICAL_EXTRAS)])
    groups["Clinical_Lifestyle"] = sorted(list(clin))

    return groups

def expected_calibration_error(y_true: np.ndarray, y_prob: np.ndarray, n_bins: int = 10) -> float:
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.clip(np.asarray(y_prob), 0.0, 1.0)
    edges = np.linspace(0.0, 1.0, n_bins + 1)
    ece = 0.0
    N = len(y_true)
    for i in range(n_bins):
        lo, hi = edges[i], edges[i+1]
        mask = (y_prob >= lo) & (y_prob < hi) if i < n_bins - 1 else (y_prob >= lo) & (y_prob <= hi)
        if not np.any(mask):
            continue
        conf = y_prob[mask].mean()
        acc = y_true[mask].mean()
        ece += (mask.sum() / N) * abs(acc - conf)
    return float(ece)

def oof_metrics(df: pd.DataFrame, Xcols, y, n_splits=5, seed=7):
    """Out-of-fold probabilities -> AUROC, Acc, Macro-F1, Brier, ECE."""
    if len(Xcols) == 0:
        return dict(auroc=np.nan, accuracy=np.nan, macro_f1=np.nan, brier=np.nan, ece=np.nan, n=len(y))
    pipe = Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler(with_mean=True, with_std=True)),
        ("clf", GradientBoostingClassifier(random_state=seed)),
    ])
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)
    # Get OOF probabilities for positive class
    prob = cross_val_predict(pipe, df[Xcols], y, cv=skf, method="predict_proba")[:, 1]
    pred = (prob >= 0.5).astype(int)
    return dict(
        auroc=float(roc_auc_score(y, prob)),
        accuracy=float(accuracy_score(y, pred)),
        macro_f1=float(f1_score(y, pred, average="macro")),
        brier=float(brier_score_loss(y, prob)),
        ece=float(expected_calibration_error(y, prob, n_bins=10)),
        n=len(y),
    )

def vne_with_ci(df: pd.DataFrame, cols, n_boot=200, seed=7):
    """Point vNE + bootstrap CI (percentiles)."""
    if len(cols) == 0:
        return np.nan, np.nan, np.nan
    X = df[cols].to_numpy()
    v = vne_entropy(X)
    if n_boot <= 0:
        return v, np.nan, np.nan
    rng = np.random.default_rng(seed)
    N = X.shape[0]
    boots = []
    for _ in range(n_boot):
        idx = rng.integers(0, N, N)
        boots.append(vne_entropy(X[idx]))
    lo, hi = np.percentile(boots, [2.5, 97.5])
    return float(v), float(lo), float(hi)

# ─────────────────────────────────────────────────────────────────────────────
# Load data
# ─────────────────────────────────────────────────────────────────────────────
df = pd.read_csv(FILE)
print("\nPLCO")
print("================================")
print("Analyzing PLCO dataset")
print("================================")

# Basic stats
n = len(df)
assert LABEL_COL in df.columns, f"Label '{LABEL_COL}' not found."
y = df[LABEL_COL].astype(int).to_numpy()
pos = int((y == 1).sum()); neg = n - pos
bal = pos / max(1, n)

print(f"✅ Loaded PLCO: {n} samples, {df.drop(columns=[LABEL_COL]).shape[1]} features, target balance = {bal:.2f}")
print(f"Loaded {n} rows. Target distribution:")
print(pd.Series(y).value_counts().rename_axis('target').to_frame('count'))
print()

# Grouping
groups = make_groups(df)
group_sizes = {k: len(v) for k, v in groups.items()}
print("Feature groups:", group_sizes)
print()

# ─────────────────────────────────────────────────────────────────────────────
# Compute vNE (normalized) + metrics for each group; and for all panel PAIRS
# ─────────────────────────────────────────────────────────────────────────────
# vNE normalization base
vne_all, vne_all_lo, vne_all_hi = vne_with_ci(df, groups["All_Biomarkers"], n_boot=N_BOOT_VNE, seed=SEED)
def vne_norm(v):
    return (v / vne_all) if (isinstance(v, (int, float)) and vne_all and vne_all > 0) else np.nan

# Per-panel & keyword groups
rows_panels = []
for gname, cols in groups.items():
    # skip empty sets cleanly
    v, lo, hi = vne_with_ci(df, cols, n_boot=N_BOOT_VNE, seed=SEED)
    metrics = oof_metrics(df, cols, y, n_splits=N_SPLITS, seed=SEED) if gname != "All_Biomarkers" or len(cols) > 0 \
              else dict(auroc=np.nan, accuracy=np.nan, macro_f1=np.nan, brier=np.nan, ece=np.nan, n=len(y))
    rows_panels.append({
        "panel": gname,
        "auroc": metrics["auroc"],
        "accuracy": metrics["accuracy"],
        "macro_f1": metrics["macro_f1"],
        "brier": metrics["brier"],
        "ece": metrics["ece"],
        "vne_mean": v,
        "vne_lo": lo,
        "vne_hi": hi,
        "vne_norm": vne_norm(v),
        "n_samples": metrics["n"],
        "n_features": len(cols),
    })

# Panel pairs among A..E
def panel_cols(name):
    return groups.get(name, [])

core_panels = ["Panel_A","Panel_B","Panel_C","Panel_D","Panel_E"]
rows_pairs = []
for i in range(len(core_panels)):
    for j in range(i+1, len(core_panels)):
        pa, pb = core_panels[i], core_panels[j]
        label = f"{pa.split('_')[1]}+{pb.split('_')[1]}"  # "A+C" style
        cols = sorted(list(set(panel_cols(pa)) | set(panel_cols(pb))))
        v, lo, hi = vne_with_ci(df, cols, n_boot=N_BOOT_VNE, seed=SEED)
        metrics = oof_metrics(df, cols, y, n_splits=N_SPLITS, seed=SEED)
        rows_pairs.append({
            "panel_pair": label,
            "auroc": metrics["auroc"],
            "accuracy": metrics["accuracy"],
            "macro_f1": metrics["macro_f1"],
            "brier": metrics["brier"],
            "ece": metrics["ece"],
            "vne_mean": v,
            "vne_lo": lo,
            "vne_hi": hi,
            "vne_norm": vne_norm(v),
            "n_samples": metrics["n"],
            "n_features": len(cols),
        })

df_panels = pd.DataFrame(rows_panels)
df_pairs  = pd.DataFrame(rows_pairs)

# Rankers
def add_ranks(df, keys=("auroc","macro_f1","brier","ece","vne_mean")):
    df = df.copy()
    lower_better = {"brier","ece"}
    for k in keys:
        if k not in df.columns:
            df[k] = np.nan
        asc = (k in lower_better)
        df[f"rank_{k}"] = df[k].rank(ascending=asc, method="min")
    rank_cols = [c for c in df.columns if c.startswith("rank_")]
    df["rank_sum"] = df[rank_cols].sum(axis=1)
    df["rank_overall"] = df["rank_sum"].rank(ascending=True, method="min")
    return df

df_panels_ranked = add_ranks(df_panels)
df_pairs_ranked  = add_ranks(df_pairs)

best_pair = df_pairs_ranked.sort_values("rank_overall").iloc[0]["panel_pair"] if not df_pairs_ranked.empty else None

# ─────────────────────────────────────────────────────────────────────────────
# Export Excel + Figures
# ─────────────────────────────────────────────────────────────────────────────
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
excel_path = str(Path(OUT_DIR) / f"biomarker_metrics_{ts}.xlsx")

with pd.ExcelWriter(excel_path, engine="xlsxwriter") as writer:
    df_panels_ranked.sort_values("rank_overall", na_position="last").to_excel(writer, index=False, sheet_name="Panels_Summary")
    df_pairs_ranked.sort_values("rank_overall", na_position="last").to_excel(writer, index=False, sheet_name="Pairs_Summary")
    if not df_panels_ranked.empty:
        df_panels_ranked.sort_values("rank_overall").head(5).to_excel(writer, index=False, sheet_name="Top5_Panels")
    if not df_pairs_ranked.empty:
        df_pairs_ranked.sort_values("rank_overall").head(5).to_excel(writer, index=False, sheet_name="Top5_Pairs")
    # Notes
    notes = pd.DataFrame({
        "notes": [f"Generated at {datetime.now().isoformat()}",
                  f"Best pair by composite rank: {best_pair}"]
    })
    notes.to_excel(writer, index=False, sheet_name="Notes")

# Figures (no styles/colors; one chart per fig)
fig_dir = Path(OUT_DIR) / f"figs_{ts}"
fig_dir.mkdir(parents=True, exist_ok=True)
figs = {}

if not df_panels_ranked.empty:
    plt.figure()
    order = df_panels_ranked.sort_values("auroc", ascending=False)
    plt.bar(order["panel"], order["auroc"])
    plt.xticks(rotation=45, ha="right"); plt.ylabel("AUROC"); plt.title("Per-Panel AUROC")
    plt.tight_layout()
    p = str(fig_dir / "auroc_by_panel.png"); plt.savefig(p, dpi=200); plt.close()
    figs["auroc_by_panel"] = p

    plt.figure()
    idx = np.arange(len(df_panels_ranked))
    vmean = pd.to_numeric(df_panels_ranked["vne_mean"], errors="coerce")
    vlo = pd.to_numeric(df_panels_ranked["vne_lo"], errors="coerce")
    vhi = pd.to_numeric(df_panels_ranked["vne_hi"], errors="coerce")
    yerr = np.vstack([vmean - vlo, vhi - vmean]); yerr = np.nan_to_num(yerr, nan=0.0)
    plt.errorbar(idx, vmean, yerr=yerr, fmt='o')
    plt.xticks(idx, df_panels_ranked["panel"], rotation=45, ha="right")
    plt.ylabel("vNE (mean ± CI)"); plt.title("Per-Panel von Neumann Entropy")
    plt.tight_layout()
    p = str(fig_dir / "vne_by_panel.png"); plt.savefig(p, dpi=200); plt.close()
    figs["vne_by_panel"] = p

if not df_pairs_ranked.empty:
    plt.figure()
    orderp = df_pairs_ranked.sort_values("rank_overall")
    plt.bar(orderp["panel_pair"], orderp["rank_overall"])
    plt.xticks(rotation=45, ha="right"); plt.ylabel("Composite Rank (lower is better)")
    plt.title("Per-Pair Composite Rank")
    plt.tight_layout()
    p = str(fig_dir / "composite_rank_by_pair.png"); plt.savefig(p, dpi=200); plt.close()
    figs["composite_rank_by_pair"] = p

    plt.figure()
    orderp10 = df_pairs_ranked.sort_values("auroc", ascending=False).head(10)
    plt.bar(orderp10["panel_pair"], orderp10["auroc"])
    plt.xticks(rotation=45, ha="right"); plt.ylabel("AUROC"); plt.title("Top 10 Pairs by AUROC")
    plt.tight_layout()
    p = str(fig_dir / "auroc_top10_pairs.png"); plt.savefig(p, dpi=200); plt.close()
    figs["auroc_top10_pairs"] = p

    plt.figure()
    orderp10_ece = df_pairs_ranked.sort_values("ece", ascending=True).head(10)
    plt.bar(orderp10_ece["panel_pair"], orderp10_ece["ece"])
    plt.xticks(rotation=45, ha="right"); plt.ylabel("ECE"); plt.title("Top 10 Pairs by Calibration (ECE)")
    plt.tight_layout()
    p = str(fig_dir / "ece_top10_pairs.png"); plt.savefig(p, dpi=200); plt.close()
    figs["ece_top10_pairs"] = p

# ─────────────────────────────────────────────────────────────────────────────
# Console summary
# ─────────────────────────────────────────────────────────────────────────────
print("\nQuantum Entropy Results (vNE normalized to All_Biomarkers):")
for _, r in df_panels_ranked.iterrows():
    if pd.notna(r["vne_norm"]):
        print(f"- {r['panel']}: {r['vne_norm']:.{DEC}f}")

if "auroc" in df_panels_ranked.columns:
    print("\nSynergy Metrics (All_Biomarkers baseline):")
    try:
        all_row = df_panels_ranked[df_panels_ranked["panel"] == "All_Biomarkers"].iloc[0]
        print(f"AUC-ROC: {all_row['auroc']:.3f}")
    except Exception:
        print("AUC-ROC: n/a")

print("\n✅ Export complete")
print("Excel:", excel_path)
print("Figures:", figs)
print("Best pair (composite rank):", best_pair)



PLCO
Analyzing PLCO dataset
✅ Loaded PLCO: 1101 samples, 177 features, target balance = 0.11
Loaded 1101 rows. Target distribution:
        count
target       
0         982
1         119

Feature groups: {'Panel_A': 6, 'Panel_B': 7, 'Panel_C': 8, 'Panel_D': 7, 'Panel_E': 8, 'All_Biomarkers': 36, 'Reproductive': 12, 'Lifestyle': 18, 'Family_History': 6, 'Clinical_Lifestyle': 44}


Quantum Entropy Results (vNE normalized to All_Biomarkers):
- Panel_A: 0.0006
- Panel_B: 1.3632
- Panel_C: 0.0058
- Panel_D: 1.6022
- Panel_E: 2.4042
- All_Biomarkers: 1.0000
- Reproductive: 3.0290
- Lifestyle: 1.9183
- Family_History: 1.2661
- Clinical_Lifestyle: 0.8189

Synergy Metrics (All_Biomarkers baseline):
AUC-ROC: 0.703

✅ Export complete
Excel: OC_Quantum_results\biomarker_metrics_20260207_061107.xlsx
Figures: {'auroc_by_panel': 'OC_Quantum_results\\figs_20260207_061107\\auroc_by_panel.png', 'vne_by_panel': 'OC_Quantum_results\\figs_20260207_061107\\vne_by_panel.png', 'composite_rank_by_pair': 'OC_

In [8]:
"""
Automatic ML Model Benchmarking for PLCO Biomarker Data
-------------------------------------------------------
- Loads tabular data (e.g., PLCO biomarker panels)
- Runs multiple ML models with 5-fold Stratified CV
- Computes rich set of metrics:
    * Accuracy, Balanced Accuracy
    * Precision, Recall, Specificity, F1
    * ROC-AUC, PR-AUC
    * Brier Score, Log Loss
    * MCC, Cohen's Kappa
    * ECE (Expected Calibration Error)
- Saves results to: model_comparison_metrics.csv

You can compare these baselines against your Quantum-Inspired PLCO model.
"""

import numpy as np
import pandas as pd

from sklearn.model_selection import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import (
    accuracy_score, balanced_accuracy_score,
    precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score,
    brier_score_loss, log_loss,
    matthews_corrcoef, cohen_kappa_score,
    confusion_matrix
)

import warnings
warnings.filterwarnings("ignore")


# -------------------------------------------------
# Config: CHANGE THESE FOR YOUR DATA
# -------------------------------------------------
DATA_PATH = r"D:\Lakshmi Kumari\merged_oc_ovars.csv"   # your PLCO CSV
LABEL_COL = "is_case"                        # binary label column (0/1)
RANDOM_SEED = 7
N_SPLITS = 5


# -------------------------------------------------
# Utility: Expected Calibration Error (ECE)
# -------------------------------------------------
def compute_ece(y_true, y_prob, n_bins=10):
    """Expected Calibration Error (ECE) for binary classification."""
    y_true = np.asarray(y_true)
    y_prob = np.asarray(y_prob)
    bins = np.linspace(0.0, 1.0, n_bins + 1)
    indices = np.digitize(y_prob, bins) - 1

    ece = 0.0
    for i in range(n_bins):
        mask = indices == i
        if np.sum(mask) > 0:
            avg_pred = np.mean(y_prob[mask])
            avg_true = np.mean(y_true[mask])
            ece += (np.sum(mask) / len(y_prob)) * abs(avg_pred - avg_true)
    return float(ece)


# -------------------------------------------------
# Utility: Metrics for one fold
# -------------------------------------------------
def compute_metrics_fold(y_true, y_prob, threshold=0.5):
    """Compute all evaluation metrics for a single fold."""
    y_true = np.asarray(y_true)
    y_prob = np.asarray(y_prob)
    y_pred = (y_prob >= threshold).astype(int)

    # Confusion matrix
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    # Core metrics
    acc = accuracy_score(y_true, y_pred)
    bacc = balanced_accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    try:
        roc = roc_auc_score(y_true, y_prob)
    except ValueError:
        roc = np.nan
    try:
        pr_auc = average_precision_score(y_true, y_prob)
    except ValueError:
        pr_auc = np.nan

    # Specificity
    specificity = tn / (tn + fp + 1e-12)

    # Calibration
    try:
        brier = brier_score_loss(y_true, y_prob)
    except ValueError:
        brier = np.nan
    try:
        ll = log_loss(y_true, y_prob)
    except ValueError:
        ll = np.nan

    # Other
    mcc = matthews_corrcoef(y_true, y_pred)
    kappa = cohen_kappa_score(y_true, y_pred)

    # ECE
    ece = compute_ece(y_true, y_prob, n_bins=10)

    return {
        "Accuracy": acc,
        "Balanced_Accuracy": bacc,
        "Precision": prec,
        "Recall": rec,
        "Specificity": specificity,
        "F1": f1,
        "ROC_AUC": roc,
        "PR_AUC": pr_auc,
        "Brier": brier,
        "LogLoss": ll,
        "MCC": mcc,
        "Kappa": kappa,
        "ECE": ece,
        "TP": tp,
        "FP": fp,
        "TN": tn,
        "FN": fn,
    }


# -------------------------------------------------
# Define Models
# -------------------------------------------------
def get_models():
    """
    Returns a dict of model_name -> (estimator, needs_scaling_flag)
    All will be wrapped in Pipeline with Imputer (+ optional Scaler).
    """
    models = {}

    # Logistic Regression (L2)
    models["LogReg_L2"] = (
        LogisticRegression(
            penalty="l2",
            solver="lbfgs",
            max_iter=500,
            class_weight=None,
            random_state=RANDOM_SEED
        ),
        True
    )

    # Logistic Regression (L1)
    models["LogReg_L1"] = (
        LogisticRegression(
            penalty="l1",
            solver="liblinear",
            max_iter=500,
            class_weight=None,
            random_state=RANDOM_SEED
        ),
        True
    )

    # Linear SVM
    models["SVM_Linear"] = (
        SVC(
            kernel="linear",
            probability=True,
            random_state=RANDOM_SEED
        ),
        True
    )

    # RBF SVM
    models["SVM_RBF"] = (
        SVC(
            kernel="rbf",
            probability=True,
            random_state=RANDOM_SEED
        ),
        True
    )

    # Random Forest
    models["RandomForest"] = (
        RandomForestClassifier(
            n_estimators=300,
            max_depth=None,
            random_state=RANDOM_SEED,
            n_jobs=-1
        ),
        False
    )

    # Gradient Boosting
    models["GradientBoosting"] = (
        GradientBoostingClassifier(
            random_state=RANDOM_SEED
        ),
        False
    )

    # Naive Bayes
    models["GaussianNB"] = (
        GaussianNB(),
        False  # scaling is not required
    )

    # MLP (simple)
    models["MLP_64_32"] = (
        MLPClassifier(
            hidden_layer_sizes=(64, 32),
            activation="relu",
            solver="adam",
            alpha=1e-3,
            learning_rate_init=1e-3,
            max_iter=500,
            early_stopping=True,
            random_state=RANDOM_SEED
        ),
        True
    )

    # Optionally add XGBoost / LightGBM / CatBoost if installed
    try:
        from xgboost import XGBClassifier
        models["XGBoost"] = (
            XGBClassifier(
                n_estimators=400,
                learning_rate=0.05,
                max_depth=4,
                subsample=0.9,
                colsample_bytree=0.9,
                eval_metric="logloss",
                random_state=RANDOM_SEED,
                n_jobs=-1
            ),
            False
        )
    except ImportError:
        print("XGBoost not installed; skipping XGBoost model.")


    try:
        from catboost import CatBoostClassifier
        models["CatBoost"] = (
            CatBoostClassifier(
                iterations=400,
                learning_rate=0.05,
                depth=4,
                verbose=False,
                random_state=RANDOM_SEED
            ),
            False
        )
    except ImportError:
        print("CatBoost not installed; skipping CatBoost model.")

    return models


# -------------------------------------------------
# Main CV Evaluation
# -------------------------------------------------
def evaluate_models_cv(X, y, n_splits=5, random_state=7):
    models = get_models()
    results = []

    cv = StratifiedKFold(
        n_splits=n_splits,
        shuffle=True,
        random_state=random_state
    )

    for model_name, (base_clf, need_scaling) in models.items():
        print(f"\n=== Evaluating model: {model_name} ===")

        # Build pipeline
        steps = [("imp", SimpleImputer(strategy="median"))]
        if need_scaling:
            steps.append(("sc", StandardScaler()))
        steps.append(("clf", base_clf))
        pipe = Pipeline(steps)

        fold_metrics = []

        for fold_idx, (train_idx, test_idx) in enumerate(cv.split(X, y), start=1):
            X_train, X_test = X[train_idx], X[test_idx]
            y_train, y_test = y[train_idx], y[test_idx]

            pipe.fit(X_train, y_train)
            # predicted probabilities for positive class
            y_prob = pipe.predict_proba(X_test)[:, 1]

            m = compute_metrics_fold(y_test, y_prob, threshold=0.5)
            fold_metrics.append(m)
            print(f" Fold {fold_idx}: ROC_AUC={m['ROC_AUC']:.4f}, F1={m['F1']:.4f}, Brier={m['Brier']:.4f}")

        # Aggregate metrics over folds (mean & std)
        metrics_df = pd.DataFrame(fold_metrics)
        mean_metrics = metrics_df.mean()
        std_metrics = metrics_df.std()

        row = {"Model": model_name}
        for metric_name in mean_metrics.index:
            row[f"{metric_name}_mean"] = mean_metrics[metric_name]
            row[f"{metric_name}_std"] = std_metrics[metric_name]

        results.append(row)

    results_df = pd.DataFrame(results)
    return results_df

import warnings
warnings.filterwarnings("ignore")

# -------------------------------------------------
# Run Everything
# -------------------------------------------------
if __name__ == "__main__":
    # Load data
    df = pd.read_csv(DATA_PATH)
    assert LABEL_COL in df.columns, f"Label column '{LABEL_COL}' not found."

    # ======== FIXED FEATURE SELECTION HERE ========
    feature_df = df.drop(columns=[LABEL_COL])
    numeric_cols = feature_df.select_dtypes(include=[np.number]).columns.tolist()

    X = feature_df[numeric_cols].to_numpy()
    y = df[LABEL_COL].astype(int).to_numpy()

    print(f"Loaded data: {df.shape[0]} samples")
    print(f"Total feature columns: {feature_df.shape[1]}")
    print(f"Numeric feature columns used: {len(numeric_cols)}")

    dropped_cols = sorted(set(feature_df.columns) - set(numeric_cols))
    if dropped_cols:
        print("\nDropped non-numeric columns (not used as features):")
        for c in dropped_cols:
            print(" -", c)
    # =============================================

    results_df = evaluate_models_cv(X, y, n_splits=N_SPLITS, random_state=RANDOM_SEED)

    out_path = r"D:\Lakshmi Kumari\OC_Quantum_results"
    results_df.to_csv(out_path, index=False)

    print("\nSaved model comparison metrics to:", out_path)
    print("\n=== Model Comparison (summary) ===")
    print(results_df[["Model", "ROC_AUC_mean", "PR_AUC_mean", "F1_mean",
                      "Accuracy_mean", "Balanced_Accuracy_mean",
                      "Brier_mean", "ECE_mean"]].round(4).to_string(index=False))



Loaded data: 1101 samples
Total feature columns: 177
Numeric feature columns used: 176

Dropped non-numeric columns (not used as features):
 - build

=== Evaluating model: LogReg_L2 ===
 Fold 1: ROC_AUC=0.9998, F1=0.9787, Brier=0.0043
 Fold 2: ROC_AUC=1.0000, F1=1.0000, Brier=0.0011
 Fold 3: ROC_AUC=0.9562, F1=0.9362, Brier=0.0108
 Fold 4: ROC_AUC=1.0000, F1=1.0000, Brier=0.0014
 Fold 5: ROC_AUC=0.9998, F1=0.9796, Brier=0.0041

=== Evaluating model: LogReg_L1 ===
 Fold 1: ROC_AUC=1.0000, F1=0.9787, Brier=0.0043
 Fold 2: ROC_AUC=1.0000, F1=1.0000, Brier=0.0001
 Fold 3: ROC_AUC=0.9996, F1=0.9796, Brier=0.0044
 Fold 4: ROC_AUC=1.0000, F1=0.9796, Brier=0.0016
 Fold 5: ROC_AUC=0.9998, F1=0.9583, Brier=0.0057

=== Evaluating model: SVM_Linear ===
 Fold 1: ROC_AUC=0.9989, F1=0.9787, Brier=0.0049
 Fold 2: ROC_AUC=0.9989, F1=0.9787, Brier=0.0042
 Fold 3: ROC_AUC=0.9558, F1=0.9583, Brier=0.0109
 Fold 4: ROC_AUC=0.9996, F1=0.9796, Brier=0.0040
 Fold 5: ROC_AUC=0.9996, F1=0.9583, Brier=0.0068

===

In [9]:
# ================================================================
# Quantum-Entropy PLCO Model (Clean Baseline Comparison)
# ================================================================
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score, average_precision_score, f1_score
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, ClassifierMixin
from scipy.stats import entropy
import warnings
warnings.filterwarnings("ignore")



# ---------------------------------------------------------
# Quantum-Entropy Feature Modulation
# ---------------------------------------------------------
def quantum_entropy_modulation(X):
    """
    Computes per-feature Tsallis entropy and rescales features.
    """
    mod_X = np.zeros_like(X)
    for j in range(X.shape[1]):
        col = X[:, j]
        col = np.nan_to_num(col, nan=np.nanmedian(col))
        # Tsallis entropy (q=1.2)
        hist, _ = np.histogram(col, bins=30, density=True)
        hist = hist + 1e-10
        S = (1 - np.sum(hist ** 1.2)) / 0.2
        mod_X[:, j] = col * (1 + S)  # entropy-modulated feature
    return mod_X

# ---------------------------------------------------------
# QE Model: logistic baseline + entropy modulation + calibration
# ---------------------------------------------------------
class QuantumEntropyCLF(BaseEstimator, ClassifierMixin):

    def __init__(self):
        self.scaler = StandardScaler()
        self.base = LogisticRegression(penalty="l2", max_iter=5000)
        self.calib = CalibratedClassifierCV(self.base, cv="prefit")

    def fit(self, X, y):
        Xq = quantum_entropy_modulation(X)
        Xs = self.scaler.fit_transform(Xq)
        self.base.fit(Xs, y)
        self.calib = CalibratedClassifierCV(self.base, cv=3)
        self.calib.fit(Xs, y)
        return self

    def predict_proba(self, X):
        Xq = quantum_entropy_modulation(X)
        Xs = self.scaler.transform(Xq)
        return self.calib.predict_proba(Xs)

    def predict(self, X):
        return (self.predict_proba(X)[:,1] >= 0.5).astype(int)

# ---------------------------------------------------------
# Metrics
# ---------------------------------------------------------
def ece_score(y_true, y_prob, bins=10):
    bins_edges = np.linspace(0, 1, bins + 1)
    ece = 0.0
    for i in range(bins):
        start, end = bins_edges[i], bins_edges[i+1]
        mask = (y_prob >= start) & (y_prob < end)
        if mask.sum() == 0:
            continue
        acc = y_true[mask].mean()
        conf = y_prob[mask].mean()
        ece += abs(acc - conf) * mask.mean()
    return ece

# ---------------------------------------------------------
# Cross-Validation
# ---------------------------------------------------------
results = []
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

fold = 1
for train_idx, test_idx in skf.split(X, y):

    print(f"\n=== Fold {fold} ===")
    fold += 1

    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    model = QuantumEntropyCLF()
    model.fit(X_train, y_train)

    prob = model.predict_proba(X_test)[:,1]
    pred = (prob >= 0.5).astype(int)

    roc = roc_auc_score(y_test, prob)
    pr  = average_precision_score(y_test, prob)
    f1  = f1_score(y_test, pred)
    brier = np.mean((prob - y_test)**2)
    ece  = ece_score(y_test, prob)

    print(f"AUC={roc:.4f}, PR-AUC={pr:.4f}, F1={f1:.4f}, Brier={brier:.4f}, ECE={ece:.4f}")

    results.append([roc, pr, f1, brier, ece])

# ---------------------------------------------------------
# Summary
# ---------------------------------------------------------
results = np.array(results)
summary = pd.DataFrame({
    "Metric": ["ROC_AUC", "PR_AUC", "F1", "Brier", "ECE"],
    "Mean": results.mean(axis=0),
    "Std": results.std(axis=0)
})

print("\n=== Quantum-Entropy Model Summary ===")
print(summary)

summary.to_csv("quantum_entropy_plco_results.csv", index=False)
print("\nSaved QE model results → quantum_entropy_plco_results.csv")



=== Fold 1 ===
AUC=1.0000, PR-AUC=1.0000, F1=0.0000, Brier=0.1084, ECE=0.1085

=== Fold 2 ===
AUC=0.9925, PR-AUC=0.9495, F1=0.0000, Brier=0.1045, ECE=0.1045

=== Fold 3 ===
AUC=0.9998, PR-AUC=0.9983, F1=0.1967, Brier=0.8909, ECE=0.8909

=== Fold 4 ===
AUC=0.9994, PR-AUC=0.9948, F1=0.0000, Brier=0.0763, ECE=0.0901

=== Fold 5 ===
AUC=1.0000, PR-AUC=1.0000, F1=0.0000, Brier=0.1090, ECE=0.1091

=== Quantum-Entropy Model Summary ===
    Metric      Mean       Std
0  ROC_AUC  0.998329  0.002926
1   PR_AUC  0.988521  0.019611
2       F1  0.039344  0.078689
3    Brier  0.257839  0.316766
4      ECE  0.260615  0.315222

Saved QE model results → quantum_entropy_plco_results.csv
