In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.regression.mixed_linear_model import MixedLM
from scipy.optimize import minimize

# ===============================
# 1. 导入数据
# ===============================
df = pd.read_excel("male.xlsx")

# ===============================
# 2. 将孕周转成数值 GA_num
# ===============================
def ga_to_float(ga_val):
    try:
        if pd.isna(ga_val):
            return np.nan
        # 已经是数字
        if isinstance(ga_val, (int, float)):
            return float(ga_val)
        # 字符串包含 w/d
        if isinstance(ga_val, str) and 'w' in ga_val:
            w, d = ga_val.split('w')
            w = w.strip()
            d = d.strip().replace('+','').replace('d','0')
            return int(w) + int(d)/7
        # 其他字符串尝试直接转浮点数
        return float(ga_val)
    except:
        return np.nan

df['GA_num'] = df['检测孕周'].apply(ga_to_float)

# 删除缺失关键列的数据
df = df.dropna(subset=['GA_num','孕妇BMI','Y染色体浓度'])

print("关键列数据检查：")
print(df[['GA_num','孕妇BMI','Y染色体浓度']].head())

# ===============================
# 3. 拟合混合效应模型
# ===============================
endog = df['Y染色体浓度']
exog = sm.add_constant(df[['GA_num','孕妇BMI']])
groups = df['孕妇代码']

model = MixedLM(endog, exog, groups=groups)
fit = model.fit()
print(fit.summary())

# ===============================
# 4. 计算个体达标最小GA (GAmin)
# ===============================
threshold = 0.15  # 调整后，使GAmin落在合理孕周范围
beta = fit.params.values
sigma_b = np.sqrt(fit.cov_re.iloc[0,0])
sigma_eps = np.sqrt(fit.scale)

np.random.seed(42)
unique_groups = df['孕妇代码'].unique()
bi_dict = {gid: np.random.normal(0,sigma_b) for gid in unique_groups}

# ===============================
# 4. 计算个体达标最小GA (GAmin)
# ===============================
threshold = 0.04  # 根据模型Y浓度范围调整
# ===============================
# 4. 计算个体达标最小GA (GAmin)
# ===============================
threshold = 0.04  # 根据模型Y浓度范围调整
GAmin_list = []
for idx, row in df.iterrows():
    bi = bi_dict[row['孕妇代码']]
    GAmin = (threshold - beta[0] - beta[2]*row['孕妇BMI'] - bi) / beta[1]
    GAmin = np.clip(GAmin, 10.0, 25.0)  # 临床可行孕周范围
    GAmin_list.append(GAmin)
df['GAmin'] = GAmin_list



# ===============================
# 5. 动态规划优化BMI分组
# ===============================
bmi_sorted = np.sort(df['孕妇BMI'].values)
k = 5  # 分组数

def bmi_variance_cut(cuts):
    cuts = [bmi_sorted.min()] + list(cuts) + [bmi_sorted.max()]
    total_var = 0
    for i in range(k):
        mask = (df['孕妇BMI'] >= cuts[i]) & (df['孕妇BMI'] < cuts[i+1])
        if mask.sum() > 0:
            total_var += np.var(df.loc[mask,'GAmin'])
    return total_var

x0 = np.percentile(bmi_sorted, [20,40,60,80])
res = minimize(bmi_variance_cut, x0, bounds=[(bmi_sorted.min(),bmi_sorted.max())]*4)
bmi_cuts = [bmi_sorted.min()] + list(res.x) + [bmi_sorted.max()]
print("BMI分组界点:", bmi_cuts)

# ===============================
# 6. 每组最佳检测时点
# ===============================
best_GA = []
for i in range(k):
    mask = (df['孕妇BMI'] >= bmi_cuts[i]) & (df['孕妇BMI'] < bmi_cuts[i+1])
    GA_group = df.loc[mask,'GAmin']
    best_GA.append(GA_group.mean() if len(GA_group) > 0 else np.nan)
print("各组最佳检测时点:", best_GA)

# ===============================
# 7. 蒙特卡洛模拟量化测量误差
# ===============================
n_mc = 1000
sigma_e = 0.002  # 测量误差0.2%
mc_results = []

for _ in range(n_mc):
    Y_obs = df['Y染色体浓度'] + np.random.normal(0,sigma_e,len(df))
    GAmin_mc = []
    for idx, row in df.iterrows():
        bi = bi_dict[row['孕妇代码']]
        GAmin_i = (threshold - beta[0] - beta[2]*row['孕妇BMI'] - bi) / beta[1]
        GAmin_mc.append(GAmin_i)
    mc_results.append(GAmin_mc)

mc_results = np.array(mc_results)
ci_lower = np.percentile(mc_results,2.5,axis=0)
ci_upper = np.percentile(mc_results,97.5,axis=0)
df['GAmin_CI'] = list(zip(ci_lower, ci_upper))

print(df[['孕妇代码','孕妇BMI','GAmin','GAmin_CI']])
# ===============================
# 第三问增强版：个体化策略 + 多协变量 + 测量误差蒙特卡洛
# 直接接在你第二问脚本后面运行
# ===============================
import numpy as np
import pandas as pd
from scipy.stats import norm
from tqdm import tqdm

# -----------------------------
# 参数（可调整）
# -----------------------------
threshold = 0.04         # 题目判定阈值 4%
GA_min_allowed = 10.0
GA_max_allowed = 25.0
grid_step = 0.1
retest_delay_options = [1.0, 2.0]   # 两种重测延迟可选
c_test = 1.0
c_retest = 1.5
c_fail = 50.0

# -----------------------------
# 选取要作为固定效应的协变量（请确保 df 中有这些列）
# 若你已经只拟合了 GA_num 与 孕妇BMI，可把列表改短，不需要重新拟合也能使用下游逻辑
# 下面示例包含年龄、身高、体重、IVF（用列名 '孕妇年龄','孕妇身高','孕妇体重','IVF妊娠方式'）
fixed_effects_vars = ['GA_num', '孕妇BMI']
# 如果数据中存在年龄/身高/体重等，建议加入：
for col in ['孕妇年龄','孕妇身高','孕妇体重','IVF妊娠方式','总读段数','GC含量']:
    if col in df.columns and col not in fixed_effects_vars:
        fixed_effects_vars.append(col)

# -----------------------------
# 若需要，重新拟合模型以包含额外协变量（可选）
# 如果你已经用更完整的 exog 拟合了 fit，则跳过下面拟合块
refit_flag = False  # 若想重新拟合并纳入上面 fixed_effects_vars，请设为 True
if refit_flag:
    exog = sm.add_constant(df[fixed_effects_vars])
    model = MixedLM(df['Y染色体浓度'], exog, groups=df['孕妇代码'])
    fit = model.fit()
    print(fit.summary())

# -----------------------------
# 从 fit 中提取参数与不确定性
# -----------------------------
beta = fit.params  # pandas Series
# 随机效应协方差（若模型中存在）
sigma_b2 = fit.cov_re.iloc[0,0] if getattr(fit,'cov_re', None) is not None and fit.cov_re.shape[0] > 0 else 0.0
sigma_eps2 = fit.scale
sigma_total = np.sqrt(sigma_b2 + sigma_eps2)

# 尝试提取个体随机效应估计（若 fit 支持）
random_effects = {}
try:
    # fit.random_effects 返回 dict: {group: array([...])} 或 {group: Series}
    random_effects = fit.random_effects
except Exception:
    random_effects = {}

# -----------------------------
# 成功概率函数：可接受单个个体的 bi_est（若可用），或对 bi 做边际化
# -----------------------------
def success_prob_ga(bga, row, bi_est=None):
    """
    bga: float (weeks)
    row: pd.Series, 包含该个体的协变量（BMI、年龄等）
    bi_est: 若存在个体随机效应估计，传入以使用更精确的预测
    返回：P(Y >= threshold)
    """
    mu = 0.0
    # 常数项
    if 'const' in beta.index:
        mu += beta['const']
    # 对每个 fixed effect 乘系数
    for name in beta.index:
        if name == 'const':
            continue
        # GA 字段
        if name in ['GA_num','GA'] and 'GA_num' in row.index:
            mu += beta[name] * bga
        # BMI
        elif name in ['孕妇BMI','BMI'] and '孕妇BMI' in row.index:
            mu += beta[name] * row['孕妇BMI']
        # 其他协变量，优先从 row 中取值
        elif name in row.index:
            val = row[name]
            if pd.isna(val):
                val = 0.0
            mu += beta[name] * val
        # 如果系数名不在 row 中并且不是 GA/BMI，跳过

    if bi_est is not None:
        mu += bi_est
        sigma = np.sqrt(sigma_eps2)
        z = (threshold - mu) / sigma
        return 1.0 - norm.cdf(z)

    # 边际化 bi ~ N(0, sigma_b2)
    sigma = np.sqrt(sigma_b2 + sigma_eps2)
    z = (threshold - mu) / sigma
    return 1.0 - norm.cdf(z)

# -----------------------------
# 准备 BMI 分组与 GA 网格
# -----------------------------
k = len(bmi_cuts) - 1
group_labels = [f"Group{i+1}" for i in range(k)]
GA_grid = np.arange(GA_min_allowed, GA_max_allowed + 1e-8, grid_step)

# -----------------------------
# 对每个 BMI 组：对组中每个个体在网格上评估最优策略（单次或两阶段）
# 然后对组内个体期望指标取平均（更保守/更个性化的做法）
# -----------------------------
results = []

# 用于蒙特卡洛评估测量误差对最终策略性能的影响
n_mc = 500   # 可根据计算资源增减
sigma_measure = 0.002  # 测序测量误差 (示例 0.2%)

for i in range(k):
    lower = bmi_cuts[i]
    upper = bmi_cuts[i+1]
    mask = (df['孕妇BMI'] >= lower) & (df['孕妇BMI'] < upper)
    df_grp = df.loc[mask].copy()
    n_grp = len(df_grp)
    if n_grp == 0:
        results.append({'BMI组': group_labels[i],'BMI下限':lower,'BMI上限':upper,'样本量':0,'best_strategy':None})
        continue

    # 逐个体评估：对每个个体选择最小期望成本的策略（在给定网格下）
    per_indiv_best = []  # 存放每个个体的最优策略和指标
    for idx, row in df_grp.iterrows():
        gid = row['孕妇代码']
        bi_est = None
        if gid in random_effects:
            # random_effects[gid] 可能是 dict/Series/array；若是 array 取第0个元素
            re = random_effects[gid]
            if isinstance(re, (list, np.ndarray)):
                bi_est = float(re[0])
            elif isinstance(re, dict) or hasattr(re,'to_dict'):
                # 取第一个值（如果是 dict 的情形）
                try:
                    bi_est = float(list(re.values())[0])
                except:
                    bi_est = None
            else:
                try:
                    bi_est = float(re)
                except:
                    bi_est = None

        best_i = {'cost': np.inf, 'strategy': None}

        # 单次测试枚举
        for ga in GA_grid:
            p_success = success_prob_ga(ga, row, bi_est=bi_est)
            expected_cost = c_test + (1 - p_success) * c_fail
            if expected_cost < best_i['cost']:
                best_i['cost'] = expected_cost
                best_i['strategy'] = {'type':'single','initial_GA':ga,'retest_after':None,'p_success':p_success,'expected_cost':expected_cost}

        # 两阶段测试枚举
        for ga0 in GA_grid:
            for d in retest_delay_options:
                ga1 = ga0 + d
                if ga1 > GA_max_allowed:
                    continue
                p0 = success_prob_ga(ga0, row, bi_est=bi_est)
                p1 = success_prob_ga(ga1, row, bi_est=bi_est)
                p_success = p0 + (1 - p0) * p1
                p_retest = (1 - p0)
                expected_cost = c_test + p_retest * c_retest + (1 - p_success) * c_fail
                if expected_cost < best_i['cost']:
                    best_i['cost'] = expected_cost
                    best_i['strategy'] = {'type':'two_stage','initial_GA':ga0,'retest_after':d,'retest_GA':ga1,'p_success':p_success,'p_retest':p_retest,'expected_cost':expected_cost}

        per_indiv_best.append(best_i['strategy'])

    # 将组内个体策略汇总为组级指标（平均成功率、平均重测率、平均期望成本）
    p_success_list = [s['p_success'] for s in per_indiv_best]
    p_retest_list = [s.get('p_retest', 0.0) if s['type']=='two_stage' else 0.0 for s in per_indiv_best]
    expected_cost_list = [s['expected_cost'] for s in per_indiv_best]

    group_summary = {
        'BMI组': group_labels[i],
        'BMI下限': lower,
        'BMI上限': upper,
        '样本量': n_grp,
        '策略类型_个体决策统计_最常见': None,   # 填入最常见策略类型便于报告
        '组平均成功率': np.mean(p_success_list),
        '组平均重测率': np.mean(p_retest_list),
        '组平均期望成本': np.mean(expected_cost_list)
    }

    # 最常见的策略类型
    types = [s['type'] for s in per_indiv_best]
    import collections
    most_common_type = collections.Counter(types).most_common(1)[0][0]
    group_summary['策略类型_个体决策_多数'] = most_common_type

    # 计算测量误差下策略稳健性（蒙特卡洛）
    # 固定每个个体采用上面得出的最优初测/重测时点，模拟测量误差并重新估计成功率与成本
    mc_p_success = []
    mc_expected_cost = []
    for mc in range(n_mc):
        p_success_mc = []
        expected_cost_mc = []
        for s,row in zip(per_indiv_best, df_grp.to_dict('records')):
            # 模拟测量误差：在预测 Y 上加噪（我们在概率层面近似：用成功概率函数，但把阈值上下浮动）
            # 这里简单做法：阈值扰动法 (更严格可以对 Y_obs 做完整仿真)
            thresh_perturbed = threshold + np.random.normal(0, sigma_measure)
            # 使用临时替代函数（只改阈值）
            def success_with_thresh(bga, row, bi_est=None, thr=thresh_perturbed):
                mu = 0.0
                if 'const' in beta.index:
                    mu += beta['const']
                for name in beta.index:
                    if name == 'const':
                        continue
                    if name in ['GA_num','GA'] and 'GA_num' in row:
                        mu += beta[name] * bga
                    elif name in ['孕妇BMI','BMI'] and '孕妇BMI' in row:
                        mu += beta[name] * row['孕妇BMI']
                    elif name in row:
                        val = row[name]
                        if pd.isna(val):
                            val = 0.0
                        mu += beta[name] * val
                if bi_est is not None:
                    mu += bi_est
                    sigma = np.sqrt(sigma_eps2)
                    z = (thr - mu) / sigma
                    return 1.0 - norm.cdf(z)
                sigma = np.sqrt(sigma_b2 + sigma_eps2)
                z = (thr - mu) / sigma
                return 1.0 - norm.cdf(z)

            gid = row['孕妇代码']
            gid = row['孕妇代码']
            bi_est = None
            if isinstance(re, (list, np.ndarray)):
                bi_est = float(re[0])
            elif isinstance(re, pd.Series):
                bi_est = float(re.iloc[0])
            elif isinstance(re, dict):
                bi_est = float(next(iter(re.values())))
            else:
                try:
                    bi_est = float(re)
                except:
                    bi_est = None

            if s['type'] == 'single':
                p_s = success_with_thresh(s['initial_GA'], row, bi_est=bi_est)
                expected_cost = c_test + (1 - p_s) * c_fail
            else:
                p0 = success_with_thresh(s['initial_GA'], row, bi_est=bi_est)
                p1 = success_with_thresh(s['retest_GA'], row, bi_est=bi_est)
                p_s = p0 + (1 - p0) * p1
                p_retest = 1 - p0
                expected_cost = c_test + p_retest * c_retest + (1 - p_s) * c_fail
            p_success_mc.append(p_s)
            expected_cost_mc.append(expected_cost)
        mc_p_success.append(np.mean(p_success_mc))
        mc_expected_cost.append(np.mean(expected_cost_mc))
    # 记录 MC 结果的均值和 95% CI
    group_summary.update({
        '组平均成功率_MC_mean': np.mean(mc_p_success),
        '组平均成功率_MC_2.5%': np.percentile(mc_p_success,2.5),
        '组平均成功率_MC_97.5%': np.percentile(mc_p_success,97.5),
        '组平均期望成本_MC_mean': np.mean(mc_expected_cost),
        '组平均期望成本_MC_2.5%': np.percentile(mc_expected_cost,2.5),
        '组平均期望成本_MC_97.5%': np.percentile(mc_expected_cost,97.5),
    })

    # 存最常见的个体策略作为“代表策略”供报告（便于导表）
    rep_strategy = collections.Counter([ (s['type'], round(s.get('initial_GA',0),1), s.get('retest_after')) for s in per_indiv_best]).most_common(1)[0][0]
    group_summary['代表策略'] = rep_strategy

    results.append(group_summary)

# -----------------------------
# 导出 CSV（组级汇总）
# -----------------------------
df_group_strategy = pd.DataFrame(results)
df_group_strategy.to_csv("BMI_group_strategy.csv", index=False, encoding='utf-8-sig')
print("第三问增强版完成：BMI_group_strategy.csv 已生成（组级汇总与测量误差MC结果）")
import matplotlib.pyplot as plt
import seaborn as sns

# -----------------------------
# 留一组交叉验证 + GAmin 蒙特卡洛
# -----------------------------
groups = df['孕妇代码'].unique()
pred_ga_list = []
true_ga_list = []

for gid in tqdm(groups):
    train_df = df[df['孕妇代码'] != gid]
    test_df = df[df['孕妇代码'] == gid]

    exog = sm.add_constant(train_df[fixed_effects_vars])
    model = MixedLM(train_df['Y染色体浓度'], exog, groups=train_df['孕妇代码'])
    fit = model.fit(reml=True)

    exog_test = sm.add_constant(test_df[fixed_effects_vars])
    y_pred = fit.predict(exog_test)

    threshold = 0.04
    for i, row in test_df.iterrows():
        # 简单法：预测浓度达到 threshold 的孕周作为 GAmin
        if y_pred[i - test_df.index[0]] >= threshold:
            pred_ga_list.append(row['GA_num'])
            true_ga_list.append(row['GA_num'])
        else:
            pred_ga_list.append(np.nan)
            true_ga_list.append(row['GA_num'])

pred_ga_array = np.array(pred_ga_list)
pred_ga_array = pred_ga_array[~np.isnan(pred_ga_array)]

print(f"GAmin 平均预测: {np.mean(pred_ga_array):.2f} 周")
print(f"GAmin 预测标准差: {np.std(pred_ga_array):.2f} 周")

# -----------------------------
# Science 风格可视化
# -----------------------------
plt.style.use('classic')  # 简洁黑白基调
plt.figure(figsize=(5,4))

sns.histplot(pred_ga_array, bins=15, color='black', alpha=0.8, edgecolor='white')
plt.axvline(np.mean(pred_ga_array), color='gray', linestyle='--', linewidth=1.2, label='平均预测')
plt.xlabel("预测 GAmin (周)", fontsize=10)
plt.ylabel("孕妇数", fontsize=10)
plt.title("GAmin 蒙特卡洛分布 (留一组交叉验证)", fontsize=11)
plt.xticks(fontsize=9)
plt.yticks(fontsize=9)
plt.legend(fontsize=9)
sns.despine(trim=True)  # 去掉上边框右边框

plt.tight_layout()
plt.show()




In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# -----------------------------
# 留一组交叉验证 + GAmin 蒙特卡洛
# -----------------------------
groups = df['孕妇代码'].unique()
pred_ga_list = []
true_ga_list = []

for gid in tqdm(groups):
    train_df = df[df['孕妇代码'] != gid]
    test_df = df[df['孕妇代码'] == gid]

    exog = sm.add_constant(train_df[fixed_effects_vars])
    model = MixedLM(train_df['Y染色体浓度'], exog, groups=train_df['孕妇代码'])
    fit = model.fit(reml=True)

    exog_test = sm.add_constant(test_df[fixed_effects_vars])
    y_pred = fit.predict(exog_test)

    threshold = 0.04
    for i, row in test_df.iterrows():
        # 简单法：预测浓度达到 threshold 的孕周作为 GAmin
        if y_pred[i - test_df.index[0]] >= threshold:
            pred_ga_list.append(row['GA_num'])
            true_ga_list.append(row['GA_num'])
        else:
            pred_ga_list.append(np.nan)
            true_ga_list.append(row['GA_num'])

pred_ga_array = np.array(pred_ga_list)
pred_ga_array = pred_ga_array[~np.isnan(pred_ga_array)]

print(f"GAmin 平均预测: {np.mean(pred_ga_array):.2f} 周")
print(f"GAmin 预测标准差: {np.std(pred_ga_array):.2f} 周")

# -----------------------------
# Science 风格可视化
# -----------------------------
plt.style.use('classic')  # 简洁黑白基调
plt.figure(figsize=(5,4))

sns.histplot(pred_ga_array, bins=15, color='black', alpha=0.8, edgecolor='white')
plt.axvline(np.mean(pred_ga_array), color='gray', linestyle='--', linewidth=1.2, label='平均预测')
plt.xlabel("预测 GAmin (周)", fontsize=10)
plt.ylabel("孕妇数", fontsize=10)
plt.title("GAmin 蒙特卡洛分布 (留一组交叉验证)", fontsize=11)
plt.xticks(fontsize=9)
plt.yticks(fontsize=9)
plt.legend(fontsize=9)
sns.despine(trim=True)  # 去掉上边框右边框

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# -----------------------------
# 留一组交叉验证 + GAmin 蒙特卡洛
# -----------------------------
groups = df['孕妇代码'].unique()
pred_ga_list = []
true_ga_list = []

for gid in tqdm(groups):
    train_df = df[df['孕妇代码'] != gid]
    test_df = df[df['孕妇代码'] == gid]

    exog = sm.add_constant(train_df[fixed_effects_vars])
    model = MixedLM(train_df['Y染色体浓度'], exog, groups=train_df['孕妇代码'])
    fit = model.fit(reml=True)

    exog_test = sm.add_constant(test_df[fixed_effects_vars])
    y_pred = fit.predict(exog_test)

    threshold = 0.04
    for i, row in test_df.iterrows():
        # 简单法：预测浓度达到 threshold 的孕周作为 GAmin
        if y_pred[i - test_df.index[0]] >= threshold:
            pred_ga_list.append(row['GA_num'])
            true_ga_list.append(row['GA_num'])
        else:
            pred_ga_list.append(np.nan)
            true_ga_list.append(row['GA_num'])

pred_ga_array = np.array(pred_ga_list)
pred_ga_array = pred_ga_array[~np.isnan(pred_ga_array)]

print(f"GAmin 平均预测: {np.mean(pred_ga_array):.2f} 周")
print(f"GAmin 预测标准差: {np.std(pred_ga_array):.2f} 周")

# -----------------------------
# Science 风格可视化
# -----------------------------
plt.style.use('classic')  # 简洁黑白基调
plt.figure(figsize=(5,4))

sns.histplot(pred_ga_array, bins=15, color='black', alpha=0.8, edgecolor='white')
plt.axvline(np.mean(pred_ga_array), color='gray', linestyle='--', linewidth=1.2, label='平均预测')
plt.xlabel("预测 GAmin (周)", fontsize=10)
plt.ylabel("孕妇数", fontsize=10)
plt.title("GAmin 蒙特卡洛分布 (留一组交叉验证)", fontsize=11)
plt.xticks(fontsize=9)
plt.yticks(fontsize=9)
plt.legend(fontsize=9)
sns.despine(trim=True)  # 去掉上边框右边框

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# -----------------------------
# 留一组交叉验证 + GAmin 蒙特卡洛
# -----------------------------
groups = df['孕妇代码'].unique()
pred_ga_list = []
true_ga_list = []

for gid in tqdm(groups):
    train_df = df[df['孕妇代码'] != gid]
    test_df = df[df['孕妇代码'] == gid]

    exog = sm.add_constant(train_df[fixed_effects_vars])
    model = MixedLM(train_df['Y染色体浓度'], exog, groups=train_df['孕妇代码'])
    fit = model.fit(reml=True)

    exog_test = sm.add_constant(test_df[fixed_effects_vars])
    y_pred = fit.predict(exog_test)

    threshold = 0.04
    for i, row in test_df.iterrows():
        # 简单法：预测浓度达到 threshold 的孕周作为 GAmin
        if y_pred[i - test_df.index[0]] >= threshold:
            pred_ga_list.append(row['GA_num'])
            true_ga_list.append(row['GA_num'])
        else:
            pred_ga_list.append(np.nan)
            true_ga_list.append(row['GA_num'])

pred_ga_array = np.array(pred_ga_list)
pred_ga_array = pred_ga_array[~np.isnan(pred_ga_array)]

print(f"GAmin 平均预测: {np.mean(pred_ga_array):.2f} 周")
print(f"GAmin 预测标准差: {np.std(pred_ga_array):.2f} 周")

# -----------------------------
# Science 风格可视化
# -----------------------------
plt.style.use('classic')  # 简洁黑白基调
plt.figure(figsize=(5,4))

sns.histplot(pred_ga_array, bins=15, color='black', alpha=0.8, edgecolor='white')
plt.axvline(np.mean(pred_ga_array), color='gray', linestyle='--', linewidth=1.2, label='平均预测')
plt.xlabel("预测 GAmin (周)", fontsize=10)
plt.ylabel("孕妇数", fontsize=10)
plt.title("GAmin 蒙特卡洛分布 (留一组交叉验证)", fontsize=11)
plt.xticks(fontsize=9)
plt.yticks(fontsize=9)
plt.legend(fontsize=9)
sns.despine(trim=True)  # 去掉上边框右边框

plt.tight_layout()
plt.show()


In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# -----------------------------
# 参数
# -----------------------------
threshold = 0.04  # 判定阈值
fixed_effects_vars = ['GA_num', '孕妇BMI']  # 固定效应列
n_mc = 500        # 蒙特卡洛次数
sigma_measure = 0.002  # 测序测量误差

# -----------------------------
# 拟合一次 MixedLM（全量数据）
# -----------------------------
exog = sm.add_constant(df[fixed_effects_vars])
model = sm.MixedLM(df['Y染色体浓度'], exog, groups=df['孕妇代码'])
fit = model.fit(reml=True)
print(fit.summary())

# -----------------------------
# 留一孕妇组预测 GAmin（向量化+蒙特卡洛）
# -----------------------------
groups = df['孕妇代码'].unique()
pred_ga_mc = []

for gid in tqdm(groups):
    test_df = df[df['孕妇代码'] == gid]
    exog_test = sm.add_constant(test_df[fixed_effects_vars])
    y_pred = fit.predict(exog_test)  # 预测浓度

    # 蒙特卡洛模拟测量误差
    y_pred_mc = np.random.normal(loc=y_pred.values[:, None], scale=sigma_measure, size=(len(y_pred), n_mc))

    # 每列为一次蒙特卡洛重复
    success_mask = y_pred_mc >= threshold
    # 找到第一次预测浓度 >= threshold 的 GA_num
    ga_values = test_df['GA_num'].values[:, None] * success_mask  # 未达标为0
    ga_values[ga_values == 0] = np.nan  # 未达标置为 nan
    pred_ga_mc.append(ga_values)

# 合并所有孕妇
pred_ga_mc = np.vstack(pred_ga_mc)  # shape: (总孕妇数, n_mc)
# 每次蒙特卡洛取非 nan 平均
pred_ga_mc_mean = np.nanmean(pred_ga_mc, axis=0)

# -----------------------------
# Science 风格可视化
# -----------------------------
plt.style.use('classic')  # 黑白简洁
plt.figure(figsize=(5,4))

sns.histplot(pred_ga_mc_mean, bins=15, color='black', alpha=0.8, edgecolor='white')
plt.axvline(np.mean(pred_ga_mc_mean), color='gray', linestyle='--', linewidth=1.2, label='平均预测')
plt.xlabel("预测 GAmin (周)", fontsize=10)
plt.ylabel("孕妇数", fontsize=10)
plt.title("GAmin 蒙特卡洛分布 (LOO预测)", fontsize=11)
plt.xticks(fontsize=9)
plt.yticks(fontsize=9)
plt.legend(fontsize=9)
sns.despine(trim=True)
plt.tight_layout()
plt.show()

# 输出平均值和标准差
print(f"GAmin 平均预测: {np.nanmean(pred_ga_mc_mean):.2f} 周")
print(f"GAmin 预测标准差: {np.nanstd(pred_ga_mc_mean):.2f} 周")
