In [None]:
import os
import numpy as np
import pandas as pd

# ======================
# ==== 配置（可改） ====
# ======================
CONFIG = {
    "base_dir": r"C:/Users/81005/Desktop/CYH/3-PFAS/3.2.4路径分析/LOO",
    "base_file": "probability_base.xlsx",
    "inputs_file": "inputs_core.xlsx",

    "pfas_col_candidates": ["PFAS_name", "PFAS名称", "PFAS", "Name"],

    "prob_col_candidates": {
        "PI3K-AKT": [
            "P (PI3K – AKT)",  
            "P(PI3K – AKT)",
            "P (PI3K- AKT)", "P (PI3K - AKT)", "P(PI3K-AKT)", "P (PI3K-AKT)",
            "到达路径PI3K–AKT的概率", "到达路径PI3K- AKT的概率", "到达路径PI3K-AKT的概率"
        ],
        "PPAR": [
            "P (PPAR)", "P(PPAR)",
            "P(PPAR signaling)", "P (PPAR signaling)",
            "到达路径PPAR signaling的概率", "到达路径PPAR的概率"
        ],
        "LPS": [
            "P (LPS)", "P(LPS)", "到达路径LPS的概率"
        ],
        "ROS": [
            "P (ROS)", "P(ROS)", "到达路径ROS的概率"
        ],
    },

    "evidence_sheet": "evidence",

    "default_weights": {"weight_FDR": 1.0, "weight_Count": 1.0, "weight_Hub": 1.0, "floor": 0.02},

    "out_summary": "LOO_summary.xlsx",
    "out_deltas": "LOO_deltas.xlsx",

    "round_digits": 6,
}

# ======================
# ==== 工具函数 ========
# ======================
def _find_first_col(df: pd.DataFrame, cand_list):
    """在 cand_list 中按顺序找第一个存在的列名；若没找到抛错"""
    for c in cand_list:
        if c in df.columns:
            return c
        c2 = c.strip()
        if c2 in df.columns:
            return c2
        c3 = c.replace("–", "-")
        if c3 in df.columns:
            return c3
        c4 = c.replace("-", "–")
        if c4 in df.columns:
            return c4
        c5 = c.replace(" ", "")
        no_space_cols = {col.replace(" ", ""): col for col in df.columns}
        if c5 in no_space_cols:
            return no_space_cols[c5]
    raise ValueError(f"在表头中找不到这些列名中的任意一个：{cand_list}\n表头：{list(df.columns)}")

def _read_base_probs(base_path: str, cfg: dict):
    df = pd.read_excel(base_path)
    pfas_col = _find_first_col(df, cfg["pfas_col_candidates"])
    df.rename(columns={pfas_col: "__PFAS__"}, inplace=True)

    prob_cols = {}
    for key, cand in cfg["prob_col_candidates"].items():
        col = _find_first_col(df, cand)
        prob_cols[key] = col

    return df, prob_cols  

def _read_weights_or_default(inputs_path: str, default_w: dict):
    try:
        wdf = pd.read_excel(inputs_path, sheet_name="weights")
        row = wdf.iloc[0].to_dict()
        out = default_w.copy()
        out["weight_FDR"]   = float(row.get("weight_FDR",   out["weight_FDR"]))
        out["weight_Count"] = float(row.get("weight_Count", out["weight_Count"]))
        out["weight_Hub"]   = float(row.get("weight_Hub",   out["weight_Hub"]))
        out["floor"]        = float(row.get("floor",        out["floor"]))
        return out
    except Exception:
        return default_w.copy()

def _canon_path_name(x: str) -> str:
    if not isinstance(x, str):
        x = str(x)
    s = x.strip().upper().replace("–", "-")

    if s in ("PI3K", "PI3K-AKT", "PI3K - AKT"):
        return "PI3K-AKT"
    if s in ("PPAR",):
        return "PPAR"
    if s in ("LPS",):
        return "LPS"
    if s in ("ROS",):
        return "ROS"
    return s 

def _read_evidence(inputs_path: str, evidence_sheet: str):
    ev = pd.read_excel(inputs_path, sheet_name=evidence_sheet)

    rename_map = {}
    for k in ev.columns:
        k2 = k.strip()
        if k2.lower() in ("path", "term"):
            rename_map[k] = "path"
        elif k2.lower() in ("fdr", "w_fdr"):
            rename_map[k] = "FDR"
        elif k2.lower() in ("count", "w_count"):
            rename_map[k] = "Count"
        elif k2.lower() in ("hub", "w_hub"):
            rename_map[k] = "Hub"
    ev = ev.rename(columns=rename_map)

    required = ["path", "FDR", "Count", "Hub"]
    for r in required:
        if r not in ev.columns:
            raise ValueError(f"evidence 缺少列：{required}；当前列：{list(ev.columns)}")

    ev["path"] = ev["path"].apply(_canon_path_name)

    ev = ev[ev["path"].isin(["PI3K-AKT", "PPAR", "LPS", "ROS"])].copy()
    if ev.empty:
        raise ValueError("evidence 表未包含 PI3K-AKT / PPAR / LPS / ROS 的行。")

    ev = ev.set_index("path")[["FDR", "Count", "Hub"]]
    ev = ev.astype(float)
    return ev 

def _calc_path_score(F, C, H, wf, wc, wh):
    return (wf*F + wc*C + wh*H) / (wf + wc + wh)

def _calc_r_after_drop(F, C, H, drop_key, wf, wc, wh):

    w = {"FDR": wf, "Count": wc, "Hub": wh}
    w[drop_key] = 0.0
    s = w["FDR"] + w["Count"] + w["Hub"]
    if s == 0:
        return 1.0
    w_norm = {k: v/s for k, v in w.items()} 
    score_loo  = w_norm["FDR"]*F + w_norm["Count"]*C + w_norm["Hub"]*H
    score_base = _calc_path_score(F, C, H, wf, wc, wh)
    if score_base == 0:
        return 1.0
    return score_loo / score_base

# ======================
# ===== 主流程 =========
# ======================
base_dir = CONFIG["base_dir"]
base_path = os.path.join(base_dir, CONFIG["base_file"])
inputs_path = os.path.join(base_dir, CONFIG["inputs_file"])

df_base, prob_cols = _read_base_probs(base_path, CONFIG)
ev_tbl = _read_evidence(inputs_path, CONFIG["evidence_sheet"])
weights = _read_weights_or_default(inputs_path, CONFIG["default_weights"])
wf, wc, wh = weights["weight_FDR"], weights["weight_Count"], weights["weight_Hub"]
floor = weights["floor"]

paths = ["PI3K-AKT", "PPAR", "LPS", "ROS"]

summary_rows = []
detail_sheets = {}

for path_name in paths:
    if path_name not in ev_tbl.index:
        continue

    prob_col = prob_cols[path_name]
    if prob_col not in df_base.columns:
        raise ValueError(f"基线概率表找不到列：{prob_col}（路径 {path_name}）; 表头：{list(df_base.columns)}")

    F = float(ev_tbl.loc[path_name, "FDR"])
    C = float(ev_tbl.loc[path_name, "Count"])
    H = float(ev_tbl.loc[path_name, "Hub"])

    for drop_key in ["FDR", "Count", "Hub"]:
        r = _calc_r_after_drop(F, C, H, drop_key, wf, wc, wh)

        base_vals = df_base[prob_col].astype(float).values
        new_vals = np.maximum(floor, r * base_vals)

        delta = new_vals - base_vals
        abs_delta = np.abs(delta)
        med_abs = float(np.median(abs_delta))
        max_abs = float(np.max(abs_delta))

        scen = f"{path_name}_drop{drop_key}"
        df_detail = pd.DataFrame({
            "__PFAS__": df_base["__PFAS__"],
            f"{prob_col}(base)": np.round(base_vals, CONFIG["round_digits"]),
            f"{prob_col}({scen})": np.round(new_vals, CONFIG["round_digits"]),
            f"Δp({scen})": np.round(delta, CONFIG["round_digits"]),
            f"|Δp|({scen})": np.round(abs_delta, CONFIG["round_digits"]),
        })
        detail_sheets[scen] = df_detail

        summary_rows.append({
            "Path": path_name,
            "Scenario": f"LOO-{drop_key}",
            "r_k": round(r, CONFIG["round_digits"]),
            "|Δp|_median": round(med_abs, CONFIG["round_digits"]),
            "|Δp|_max": round(max_abs, CONFIG["round_digits"]),
            "Note": "路径级证据：整列同比缩放；排序不变（τ=1）"
        })


out_summary_path = os.path.join(base_dir, CONFIG["out_summary"])
out_deltas_path = os.path.join(base_dir, CONFIG["out_deltas"])

pd.DataFrame(summary_rows).to_excel(out_summary_path, index=False)
with pd.ExcelWriter(out_deltas_path, engine="xlsxwriter") as writer:
    for name, d in detail_sheets.items():
        sheet = name[:31] if len(name) > 31 else name
        d.to_excel(writer, index=False, sheet_name=sheet)

print("✅ LOO 完成")
print("汇总：", out_summary_path)
print("明细：", out_deltas_path)
