In [1]:
def get_performance(A_lit, A_loc, B_lit, prevalence):
    mean_B_loc_f1, std_B_loc_f1 = estimate_B_local(
    mean_A_lit=A_lit["f1"][0], std_A_lit=A_lit["f1"][1],
    mean_B_lit=B_lit["f1"][0], std_B_lit=B_lit["f1"][1],
    mean_A_loc=A_loc["f1"][0], std_A_loc=A_loc["f1"][1])
    
    mean_B_loc_auroc, std_B_loc_auroc = estimate_B_local(
    mean_A_lit=A_lit["auroc"][0], std_A_lit=A_lit["auroc"][1],
    mean_B_lit=B_lit["auroc"][0], std_B_lit=B_lit["auroc"][1],
    mean_A_loc=A_loc["auroc"][0], std_A_loc=A_loc["auroc"][1])
        
    auprc_mean2, auprc_std2 = estimate_auprc_from_f1_auroc(
    f1_mean=mean_B_loc_f1, f1_std=std_B_loc_f1,
    auc_mean=mean_B_loc_auroc, auc_std=std_B_loc_auroc,
    prevalence=prevalence,          # 传入你的测试集正例比例
    n_mc=4000,
    seed=123
)
    
    print(f"推算本地 B f1: {mean_B_loc_f1 * 100:.2f} ± {std_B_loc_f1 * 100:.2f}")
    print(f"推算本地 B AUROC: {mean_B_loc_auroc * 100:.2f} ± {std_B_loc_auroc * 100:.2f}")
    print(f"推算本地 B AUPRC: {auprc_mean2 * 100:.2f} ± {auprc_std2 * 100:.2f}")

In [2]:
prevalence = 0.4015
A_lit = {"f1": (0.6945, 0.002), "auroc": (0.7837, 0.003)}
A_loc = {"f1": (0.6925, 0.0102), "auroc": (0.7807, 0.0049)}
B_lit = {"f1": (0.6096, 0.008), "auroc": (0.6762, 0.005)}

In [3]:
get_performance(A_lit, A_loc, B_lit, prevalence)

NameError: name 'estimate_B_local' is not defined

In [None]:
import math

def logit(x):
    x = min(max(x, 1e-8), 1 - 1e-8)
    return math.log(x / (1 - x))

def inv_logit(u):
    return 1 / (1 + math.exp(-u))

def to_logit(mean, std):
    u = logit(mean)
    dm_du = mean * (1 - mean)
    s_u = std / max(dm_du, 1e-8)
    return u, s_u

def from_logit(u, s_u):
    m = inv_logit(u)
    dm_du = m * (1 - m)
    s_m = dm_du * s_u
    return m, s_m

def estimate_B_local(mean_A_lit, std_A_lit,
                     mean_B_lit, std_B_lit,
                     mean_A_loc, std_A_loc):
    # 转到 logit 空间
    uA_lit, sA_lit = to_logit(mean_A_lit, std_A_lit)
    uB_lit, sB_lit = to_logit(mean_B_lit, std_B_lit)
    uA_loc, sA_loc = to_logit(mean_A_loc, std_A_loc)

    # 计算系数 k
    k = uB_lit / uA_lit if abs(uA_lit) > 1e-8 else 1.0

    # 估计 uB_loc
    uB_loc = k * uA_loc

    # 传播方差
    var_k = 0.0
    if abs(uA_lit) > 1e-8:
        var_k = (sB_lit**2) / (uA_lit**2) + (uB_lit**2) * (sA_lit**2) / (uA_lit**4)
    var_uB = (k**2) * (sA_loc**2) + (uA_loc**2) * var_k
    sB_loc = math.sqrt(max(var_uB, 0.0))

    # 回到原空间
    mean_B_loc, std_B_loc = from_logit(uB_loc, sB_loc)
    return mean_B_loc, std_B_loc

# === 示例 ===
# 输入: 模型A文献, 模型B文献, 模型A本地
A_lit = (0.9044, 0.001)   # mean, std
A_loc = (0.9213, 0.0036)
B_lit = (0.8425, 0.007) 


mean_B_loc, std_B_loc = estimate_B_local(
    mean_A_lit=A_lit[0], std_A_lit=A_lit[1],
    mean_B_lit=B_lit[0], std_B_lit=B_lit[1],
    mean_A_loc=A_loc[0], std_A_loc=A_loc[1]
)

print(f"推算本地 B: {mean_B_loc * 100:.2f} ± {std_B_loc * 100:.2f}")

In [None]:
import math
import random
from typing import Optional, Tuple

# ---------- 基本函数 ----------
SQRT2 = math.sqrt(2.0)
EPS = 1e-12

def phi(x: float) -> float:
    """标准正态 CDF Φ(x)，用 erf 实现。"""
    return 0.5 * (1.0 + math.erf(x / SQRT2))

def inv_phi(y: float) -> float:
    """Φ^{-1}(y) 的近似（Acklam 近似）。y∈(0,1)"""
    # 参考: https://web.archive.org/web/20150910044702/http://home.online.no/~pjacklam/notes/invnorm/
    # 常数
    a1,a2,a3,a4,a5,a6 = (-39.6968302866538,220.946098424521,-275.928510446969,138.357751867269,-30.6647980661472,2.50662827745924)
    b1,b2,b3,b4,b5 = ( -54.4760987982241,161.585836858041,-155.698979859887,66.8013118877197,-13.2806815528857)
    c1,c2,c3,c4,c5,c6 = ( -0.00778489400243029,-0.322396458041136,-2.40075827716184,-2.54973253934373,4.37466414146497,2.93816398269878)
    d1,d2,d3,d4 = (0.00778469570904146,0.32246712907004,2.445134137143,3.75440866190742)
    p_low  = 0.02425
    p_high = 1 - p_low
    if y <= 0.0:
        return float("-inf")
    if y >= 1.0:
        return float("inf")
    if y < p_low:
        q = math.sqrt(-2 * math.log(y))
        x = (((((c1*q+c2)*q+c3)*q+c4)*q+c5)*q+c6)/((((d1*q+d2)*q+d3)*q+d4)*q+1)
        return -x
    elif y > p_high:
        q = math.sqrt(-2 * math.log(1 - y))
        x = (((((c1*q+c2)*q+c3)*q+c4)*q+c5)*q+c6)/((((d1*q+d2)*q+d3)*q+d4)*q+1)
        return x
    else:
        q = y - 0.5
        r = q*q
        x = (((((a1*r+a2)*r+a3)*r+a4)*r+a5)*r+a6)*q/(((((b1*r+b2)*r+b3)*r+b4)*r+b5)*r+1)
        # 一次牛顿修正提升精度
        e = phi(x) - y
        x = x - e / (math.exp(-0.5*x*x)/math.sqrt(2*math.pi))
        return x

def clip01(x: float) -> float:
    return min(max(x, EPS), 1.0 - EPS)

def logit(p: float) -> float:
    p = clip01(p)
    return math.log(p/(1-p))

def inv_logit(u: float) -> float:
    if u >= 0:
        z = math.exp(-u)
        return 1.0/(1.0+z)
    else:
        z = math.exp(u)
        return z/(1.0+z)

# ---------- 二正态模型：由 AUROC 推 d' ----------
def auc_to_dprime(auc: float) -> float:
    """binormal 等方差模型下：AUC = Φ(d'/√2) ⇒ d' = √2 * Φ^{-1}(AUC)"""
    auc = clip01(auc)
    return SQRT2 * inv_phi(auc)

# ---------- 给定 d' 与 p，计算在阈值 t 下的 TPR/FPR/Precision/Recall/F1 ----------
def pr_at_threshold(t: float, dprime: float, p: float) -> Tuple[float, float, float]:
    """
    设负类 N(0,1)，正类 N(d',1)；阈值 t：score >= t 判为正。
    TPR = 1 - Φ((t - d')/1)
    FPR = 1 - Φ(t)
    Precision = p*TPR / (p*TPR + (1-p)*FPR)
    返回 (precision, recall, f1)
    """
    p = clip01(p)
    tpr = 1.0 - phi(t - dprime)
    fpr = 1.0 - phi(t)
    denom = p*tpr + (1.0 - p)*fpr
    precision = (p*tpr) / max(denom, EPS)
    recall = tpr
    if precision + recall <= 0:
        f1 = 0.0
    else:
        f1 = 2.0 * precision * recall / (precision + recall)
    return precision, recall, f1

def auprc_given_dp_p(dprime: float, p: float, t_min=-8.0, t_max=8.0, steps=4000) -> float:
    """数值积分阈值以得到 PR 曲线下的面积（梯形法），返回 AUPRC。"""
    p = clip01(p)
    # 按阈值从高到低走，可得到 recall 从 0→1 的曲线
    dt = (t_max - t_min) / steps
    rec_prev, prec_prev = None, None
    area = 0.0
    for i in range(steps+1):
        t = t_max - i*dt
        prec, rec, _ = pr_at_threshold(t, dprime, p)
        if rec_prev is not None:
            # 梯形法：对 recall 积分
            drec = rec - rec_prev
            area += 0.5 * (prec + prec_prev) * drec
        rec_prev, prec_prev = rec, prec
    # 数值误差保障
    return float(min(max(area, p), 1.0))  # AUPRC >= 基线 p

def max_f1_given_dp_p(dprime: float, p: float, t_min=-8.0, t_max=8.0, steps=2000) -> float:
    """在给定 d' 与 p 下，扫描阈值得到可实现的最大 F1。"""
    best = 0.0
    dt = (t_max - t_min) / steps
    for i in range(steps+1):
        t = t_min + i*dt
        _, _, f1 = pr_at_threshold(t, dprime, p)
        if f1 > best:
            best = f1
    return best

# ---------- 若未知 p，用 F1 反推（拟合“可实现最大 F1”≈ 观测 F1） ----------
def estimate_p_from_f1(dprime: float, f1_target: float, p_lo=1e-4, p_hi=0.99, iters=30) -> float:
    """
    给定 d' 与“观测 F1”（理解为接近最大 F1 的水平），通过 1D 搜索反推 p。
    若目标超出可行域，返回最接近的端点。
    """
    f1_target = clip01(f1_target)
    # 先检查端点
    f1_lo = max_f1_given_dp_p(dprime, p_lo)
    f1_hi = max_f1_given_dp_p(dprime, p_hi)
    if f1_target <= f1_lo:
        return p_lo
    if f1_target >= f1_hi:
        return p_hi
    # 三分/二分混合搜索（对单峰函数鲁棒）
    lo, hi = p_lo, p_hi
    for _ in range(iters):
        m1 = lo + (hi - lo) / 3.0
        m2 = hi - (hi - lo) / 3.0
        f1_m1 = max_f1_given_dp_p(dprime, m1)
        f1_m2 = max_f1_given_dp_p(dprime, m2)
        # 以与 f1_target 的距离为损失
        if abs(f1_m1 - f1_target) < abs(f1_m2 - f1_target):
            hi = m2
        else:
            lo = m1
    p_hat = 0.5 * (lo + hi)
    return float(min(max(p_hat, p_lo), p_hi))

# ---------- 主函数：由 F1±std 与 AUROC±std 估计 AUPRC±std ----------
def estimate_auprc_from_f1_auroc(
    f1_mean: float, f1_std: float,
    auc_mean: float, auc_std: float,
    prevalence: Optional[float] = None,
    n_mc: int = 2000,
    seed: int = 42
) -> Tuple[float, float]:
    """
    依据二正态模型 + 蒙特卡洛，将 (F1±std, AUROC±std) 映射为 AUPRC 的 mean±std。
    - 若提供 prevalence（正例率），精度更高；否则用 F1 反推 p。
    - 返回: (auprc_mean, auprc_std)
    """
    rng = random.Random(seed)
    auprc_samples = []

    for _ in range(n_mc):
        # 采样并裁剪到有效区间
        auc = clip01(rng.normalvariate(auc_mean, auc_std))
        auc = min(max(auc, 0.500001), 1.0 - 1e-6)  # 避免无分辨/越界
        f1  = clip01(rng.normalvariate(f1_mean, max(f1_std, 1e-6)))

        dprime = auc_to_dprime(auc)

        if prevalence is None:
            p_hat = estimate_p_from_f1(dprime, f1)
        else:
            p_hat = clip01(prevalence)

            # 若给定 p 下，最大 F1 与观测 F1 相差过大，做一次“温和”的调和校正
            f1_max = max_f1_given_dp_p(dprime, p_hat)
            # 可选：如果差距极大（>0.1），向能达到观测 F1 的 p 方向微调，但不越过 20%
            if abs(f1_max - f1) > 0.10:
                p_star = estimate_p_from_f1(dprime, f1)
                alpha = 0.2
                p_hat = clip01(alpha * p_star + (1 - alpha) * p_hat)

        # 数值积分得到 AUPRC
        auprc = auprc_given_dp_p(dprime, p_hat)
        auprc_samples.append(auprc)

    # 统计量
    n = len(auprc_samples)
    mean = sum(auprc_samples) / n
    var  = sum((x - mean) ** 2 for x in auprc_samples) / max(n - 1, 1)
    std  = math.sqrt(max(var, 0.0))
    return float(mean), float(std)

In [None]:
# 情景 2：已知阳性率（强烈推荐，结果更可信）
auprc_mean2, auprc_std2 = estimate_auprc_from_f1_auroc(
    f1_mean=0.5924, f1_std=0.0046,
    auc_mean=0.8625, auc_std=0.0082,
    prevalence=0.2685,          # 传入你的测试集正例比例
    n_mc=4000,
    seed=123
)
print(f"AUPRC (given p) ≈ {auprc_mean2 * 100:.2f} ± {auprc_std2 * 100:.2f}")

In [None]:
import pandas as pd
import numpy as np
from typing import Dict, Iterable, Optional, Tuple

def auprc_transfer(auprc_src: float, pi_src: float, pi_tgt: float) -> float:
    """
    Prevalence-corrected AUPRC transfer.
    Keeps the "lift" over baseline constant across domains.
    AUPRC_tgt = pi_tgt + ((AUPRC_src - pi_src) / (1 - pi_src)) * (1 - pi_tgt)
    All inputs should be in [0, 1].
    """
    if not (0 <= auprc_src <= 1 and 0 <= pi_src < 1 and 0 <= pi_tgt < 1):
        raise ValueError("auprc_src, pi_src, pi_tgt must be within [0,1) (AUPRC within [0,1]).")
    if 1 - pi_src <= 0 or 1 - pi_tgt < 0:
        raise ValueError("Invalid prevalences leading to division by zero.")
    lift = (auprc_src - pi_src) / (1 - pi_src)
    return float(pi_tgt + lift * (1 - pi_tgt))


def _anchor_shift_stats(
    df: pd.DataFrame,
    dataset_src: str,
    dataset_tgt: str,
    task: str,
    metric: str,
    anchor_models: Optional[Iterable[str]] = None,
    use_median: bool = True,
) -> Tuple[float, float, int]:
    """
    Compute shift (tgt - src) for a given task & metric using models reported on BOTH datasets.
    Returns (shift_central, shift_std, n_anchors).
    """
    sub = df[(df["task"] == task) & (df["metric"] == metric)]
    # pivot to wide: index=model, columns=dataset, values=value
    wide = sub.pivot_table(index="model", columns="dataset", values="value", aggfunc="mean")
    # keep anchors: appear in both datasets
    wide = wide.dropna(subset=[dataset_src, dataset_tgt], how="any")
    if anchor_models is not None:
        wide = wide.loc[wide.index.intersection(set(anchor_models))]
    if wide.empty:
        raise ValueError(f"No anchor overlap for task='{task}', metric='{metric}'.")
    deltas = wide[dataset_tgt] - wide[dataset_src]
    if use_median:
        central = float(deltas.median())
    else:
        central = float(deltas.mean())
    std = float(deltas.std(ddof=1)) if len(deltas) > 1 else 0.0
    return central, std, len(deltas)


def estimate_missing(
    df: pd.DataFrame,
    prevalence: Dict[Tuple[str, str], float],
    dataset_src: str = "MIMIC-III",
    dataset_tgt: str = "MIMIC-IV",
    anchor_models: Optional[Iterable[str]] = None,
    use_median_shift: bool = True,
) -> pd.DataFrame:
    """
    Estimate missing metrics on dataset_tgt using:
      - AUPRC: prevalence-corrected transfer from dataset_src
      - AUROC, F1: anchor-based shift learned from models reported on BOTH datasets.

    Input DataFrame format (long-form):
        columns = ["model", "task", "metric", "dataset", "value"]
        - metric in {"F1","AUROC","AUPRC"}
        - value in [0,100] or [0,1] (we auto-detect and convert to [0,1] internally)
    prevalence: dict mapping (dataset, task) -> positive rate in [0,1]
        e.g., {("MIMIC-III","Readmission"):0.4015, ("MIMIC-IV","Readmission"):0.7079, ...}

    Returns a new DataFrame with original rows plus the estimated rows filled in for
    (dataset_tgt, missing cells). Output values are in PERCENT (0-100).
    """
    df = df.copy()
    # normalize to [0,1]
    needs_scale = df["value"].max() > 1.0 + 1e-8
    if needs_scale:
        df["value"] = df["value"] / 100.0

    tasks = sorted(df["task"].unique())
    metrics = ["F1", "AUROC", "AUPRC"]
    rows = []

    # Pre-compute anchor shifts for F1 & AUROC per task
    shift_cache = {}
    for t in tasks:
        for m in ["F1", "AUROC"]:
            try:
                shift_cache[(t, m)] = _anchor_shift_stats(
                    df, dataset_src, dataset_tgt, t, m, anchor_models, use_median=use_median_shift
                )
            except ValueError:
                # if no anchors, fall back to zero shift (conservative)
                shift_cache[(t, m)] = (0.0, 0.0, 0)

    # iterate over source rows and generate target estimates if missing
    for (model, task, metric), grp in df.groupby(["model", "task", "metric"]):
        # need a src value
        src_val_series = grp.loc[grp["dataset"] == dataset_src, "value"]
        if src_val_series.empty:
            continue
        src_val = float(src_val_series.iloc[0])

        # is target missing?
        has_tgt = not grp.loc[grp["dataset"] == dataset_tgt, "value"].empty
        if has_tgt:
            continue

        if metric == "AUPRC":
            pi_src = prevalence[(dataset_src, task)]
            pi_tgt = prevalence[(dataset_tgt, task)]
            est = auprc_transfer(src_val, pi_src, pi_tgt)
            rows.append([model, task, metric, dataset_tgt, est])
        elif metric in ("F1", "AUROC"):
            shift, _, _ = shift_cache[(task, metric)]
            est = src_val + shift
            # clamp to [0,1]
            est = min(max(est, 0.0), 1.0)
            rows.append([model, task, metric, dataset_tgt, est])

    # append estimations
    if rows:
        add = pd.DataFrame(rows, columns=["model", "task", "metric", "dataset", "value"])
        df_out = pd.concat([df, add], ignore_index=True)
    else:
        df_out = df

    # return as percentage
    df_out["value"] = (df_out["value"] * 100).round(2)
    # sort nicely
    df_out = df_out.sort_values(["model", "task", "metric", "dataset"]).reset_index(drop=True)
    return df_out


def example_usage() -> pd.DataFrame:
    """
    Minimal example to demonstrate the API using a tiny subset from your table.
    Values are percentages in the input below (the function will normalize).
    """
    data = [
        # model, task, metric, dataset, value(%)
        ("G-BERT","Prolonged Stay","F1","MIMIC-III",69.62),
        ("G-BERT","Prolonged Stay","AUROC","MIMIC-III",72.96),
        ("G-BERT","Prolonged Stay","AUPRC","MIMIC-III",72.48),

        ("StageNet","Prolonged Stay","F1","MIMIC-III",75.65),
        ("StageNet","Prolonged Stay","AUROC","MIMIC-III",83.09),
        ("StageNet","Prolonged Stay","AUPRC","MIMIC-III",84.87),
        ("StageNet","Prolonged Stay","F1","MIMIC-IV",67.57),
        ("StageNet","Prolonged Stay","AUROC","MIMIC-IV",84.88),
        ("StageNet","Prolonged Stay","AUPRC","MIMIC-IV",76.41),

        ("HEART","Prolonged Stay","F1","MIMIC-III",75.44),
        ("HEART","Prolonged Stay","AUROC","MIMIC-III",82.99),
        ("HEART","Prolonged Stay","AUPRC","MIMIC-III",83.83),
        ("HEART","Prolonged Stay","F1","MIMIC-IV",67.07),
        ("HEART","Prolonged Stay","AUROC","MIMIC-IV",84.63),
        ("HEART","Prolonged Stay","AUPRC","MIMIC-IV",74.48),

        ("DT-BEHRT","Prolonged Stay","F1","MIMIC-III",76.37),
        ("DT-BEHRT","Prolonged Stay","AUROC","MIMIC-III",84.13),
        ("DT-BEHRT","Prolonged Stay","AUPRC","MIMIC-III",85.00),
        ("DT-BEHRT","Prolonged Stay","F1","MIMIC-IV",68.04),
        ("DT-BEHRT","Prolonged Stay","AUROC","MIMIC-IV",84.98),
        ("DT-BEHRT","Prolonged Stay","AUPRC","MIMIC-IV",74.78),
    ]
    df = pd.DataFrame(data, columns=["model","task","metric","dataset","value"])

    prevalence = {
        ("MIMIC-III","Prolonged Stay"): 0.5059,
        ("MIMIC-IV","Prolonged Stay"): 0.3337,
    }

    # estimate the missing G-BERT on MIMIC-IV for Prolonged Stay
    out = estimate_missing(
        df, prevalence,
        dataset_src="MIMIC-III", dataset_tgt="MIMIC-IV",
        anchor_models=None, use_median_shift=True
    )
    return out