In [1]:
import numpy as np
from scipy.stats import truncnorm, norm, beta, bernoulli,t

def simulate_data_2lc_d(N, I, pi_true, alpha_true, p_true, seed=None):
    """
    Fast vectorised version:
      • 输入参数与旧函数完全一致；
      • 返回 dict 的键/值类型也与旧版一致 (D,T,R 为 int64，T_obs 为 float64 带 NaN)。
    """
    rng = np.random.default_rng(seed)

    # 1) D_k ~ Bern(pi)
    D = rng.binomial(1, pi_true, size=N).astype(np.int64)        # (N,)

    # 2) T_{ki} | D_k
    #    α 对应列索引为 D；alpha_true shape (I,2)
    prob_T = alpha_true[:, D].T                                  # (N,I)
    T = (rng.random((N, I)) < prob_T).astype(np.int64)           # (N,I)

    # 3) R_{ki} | D_{k}
    prob_R = p_true[:, D].T                                      # (N,I)
    R = (rng.random((N, I)) < prob_R).astype(np.int64)           # (N,I)

    # 4) 观测 T
    T_obs = T.astype(float)
    T_obs[R == 0] = np.nan

    return {'D': D, 'T': T, 'R': R, 'T_obs': T_obs}

In [None]:
import numpy as np
from tqdm import tqdm

def gibbs_sampler_2lc_d(T_obs, R,
                        n_iter=3000, burn=1000,
                        a_pi=1, b_pi=1,          # π 先验
                        a0=1, b0=1, a1=1, b1=1,   # α 先验
                        c0=1, d0=1, c1=1, d1=1,   # p 先验
                        ident_order=True,
                        seed=None, verbose=True):
    """
    Gibbs sampler for the 2-LC-D model with unknown p_{id}.
    Returns list of draws (α, p, π) of length n_iter-burn.
    α, p 都是 (I,2) 数组；π 为标量。
    """
    rng = np.random.default_rng(seed)
    N, I = T_obs.shape

    miss_mask = np.isnan(T_obs)

    # ---------- 初始化 ----------
    T = np.where(miss_mask,
                 rng.binomial(1, .5, (N, I)),
                 T_obs).astype(int)        # (N,I)
    D     = rng.binomial(1, .5, N)         # (N,)
    alpha = np.full((I, 2), .5)            # (I,2)
    p     = np.full((I, 2), .8)            # (I,2)
    pi    = .5

    draws = []
    if verbose:
        pbar = tqdm(total=n_iter, unit="iter", desc="Gibbs 2LC-D")

    for it in range(n_iter):
        # =========================================================
        # 1) 填补缺失 T | D, α   (与 R 无关)
        # ---------------------------------------------------------
        alpha_mat = alpha[:, 0][None, :]*(1-D)[:, None] + alpha[:, 1][None, :]*D[:, None]
        T[miss_mask] = (rng.random(miss_mask.sum()) < alpha_mat[miss_mask]).astype(int)

        # =========================================================
        # 2) 更新 D | T, R, α, p, π
        # ---------------------------------------------------------
        # log P(T|D) 部分
        log_a1 , log_1a1 = np.log(alpha[:,1])[None,:], np.log1p(-alpha[:,1])[None,:]
        log_a0 , log_1a0 = np.log(alpha[:,0])[None,:], np.log1p(-alpha[:,0])[None,:]
        # log P(R|D) 部分
        log_p1 , log_1p1 = np.log(p[:,1])[None,:]    , np.log1p(-p[:,1])[None,:]
        log_p0 , log_1p0 = np.log(p[:,0])[None,:]    , np.log1p(-p[:,0])[None,:]

        loglik1 = (np.log(pi)               +
                   (T*log_a1  + (1-T)*log_1a1).sum(axis=1) +
                   (R*log_p1  + (1-R)*log_1p1).sum(axis=1))
        loglik0 = (np.log1p(-pi)            +
                   (T*log_a0  + (1-T)*log_1a0).sum(axis=1) +
                   (R*log_p0  + (1-R)*log_1p0).sum(axis=1))

        prob_D1 = 1.0 / (1.0 + np.exp(loglik0 - loglik1))
        D = (rng.random(N) < prob_D1).astype(int)

        # =========================================================
        # 3) π | D
        s1 = D.sum()
        pi = rng.beta(a_pi + s1, b_pi + N - s1)

        # =========================================================
        # 4) α | D, T
        n0, n1   = (D == 0).sum(), s1
        succ0_T  = T[D == 0].sum(axis=0)
        succ1_T  = T[D == 1].sum(axis=0)
        alpha[:, 0] = rng.beta(a0 + succ0_T, b0 + n0 - succ0_T)
        alpha[:, 1] = rng.beta(a1 + succ1_T, b1 + n1 - succ1_T)

        if ident_order:
            swap = alpha[:, 1] < alpha[:, 0]
            alpha[swap] = alpha[swap][:, ::-1]

        # =========================================================
        # 5) p | D, R   (与 T 无关)
        n0, n1   = (D == 0).sum(), s1
        succ0_R  = R[D == 0].sum(axis=0)
        succ1_R  = R[D == 1].sum(axis=0)
        p[:, 0]  = rng.beta(c0 + succ0_R, d0 + n0 - succ0_R)
        p[:, 1]  = rng.beta(c1 + succ1_R, d1 + n1 - succ1_R)

        # =========================================================
        # 6) 保存
        if it >= burn:
            draws.append(dict(alpha=alpha.copy(),
                              p=p.copy(),
                              pi=float(pi)))

        if verbose and it % 100 == 0:
            pbar.update(100)
            pbar.set_description(f"π={pi:.4f}")

    return draws   # list, len = n_iter-burn


In [3]:
p_true = np.array([[0.9, 0.8], [0.9, 0.7], [0.8, 0.65], [0.8, 0.6], [0.9, 0.7]]) 
pi = 0.4
alpha_true = np.array([[0.6, 0.8], [0.3, 0.9], [0.4, 0.7], [0.5, 0.9], [0.2, 0.9]])
N=5000
I=5

data = simulate_data_2lc_d(N,I,pi_true = pi,alpha_true = alpha_true,p_true = p_true,seed = 42)
T_obs = data['T_obs']
print(T_obs)
print(data['R'])

samples = gibbs_sampler_2lc_d(
    T_obs=data['T_obs'],
    R=data['R'],
    n_iter=3000, burn=1000,
    seed=42
)
print(np.mean([sample['alpha'] for sample in samples], axis=0))
print(np.mean([sample['p'] for sample in samples], axis=0))

[[ 1.  1. nan  1.  1.]
 [ 1.  0.  0.  0.  0.]
 [ 1.  1. nan  1.  1.]
 ...
 [ 1.  1.  1. nan  0.]
 [ 1.  0.  0.  1.  0.]
 [ 1.  1. nan nan  1.]]
[[1 1 0 1 1]
 [1 1 1 1 1]
 [1 1 0 1 1]
 ...
 [1 1 1 0 1]
 [1 1 1 1 1]
 [1 1 0 0 1]]


π=0.4232: 100%|██████████| 3000/3000 [00:07<00:00, 413.05iter/s] 

[[0.59653946 0.80851554]
 [0.28170415 0.88424446]
 [0.38712126 0.67578695]
 [0.50139043 0.87574461]
 [0.19079377 0.90069616]]
[[0.89952725 0.80836856]
 [0.89923042 0.67691764]
 [0.80869551 0.65919426]
 [0.80527423 0.5995077 ]
 [0.911204   0.68571448]]





In [11]:
import numpy as np
from tqdm import trange

# ------- 1. 单次模拟并返回后验 draws -------
def run_once(N, I, pi, alpha_true, p_true,
             n_iter=3000, burn=1000, seed=None):
    data = simulate_data_2lc_d(N, I, pi, alpha_true, p_true, seed)
    draws = gibbs_sampler_2lc_d(data['T_obs'], data['R'], 
                                n_iter=n_iter, burn=burn, seed=seed,verbose=False)

    # draws['alpha'] 形状: (n_save, I, 2)
    alpha_draws = np.stack([d['alpha'] for d in draws], axis=0)

    # 后验均值
    alpha_hat = alpha_draws.mean(axis=0)          # (I,2)

    # 95% 等尾区间
    ci_low  = np.quantile(alpha_draws, 0.025, axis=0)
    ci_high = np.quantile(alpha_draws, 0.975, axis=0)

    return dict(
        alpha_hat=alpha_hat,
        ci_low=ci_low,
        ci_high=ci_high,
        alpha_draws=alpha_draws        # 可选：若存储太大可删去
    )

# ------- 2. Monte-Carlo 主循环 -------
def run_mc(M=200,                        # 模拟重复次数
           N=1000, I=5,
           pi=0.4, alpha_true=None, p_true=None,
           n_iter=3000, burn=1000, seed0=123):

    if alpha_true is None or p_true is None:
        raise ValueError("请提供 alpha_true 与 p_true")

    rng = np.random.default_rng(seed0)

    # 结果容器
    cover_se  = np.zeros((M, I))
    cover_sp  = np.zeros((M, I))
    bias_alpha = np.zeros((M, I, 2))
    mse_alpha  = np.zeros((M, I, 2))

    for m in trange(M, desc="Monte-Carlo"):
        seed = int(rng.integers(1e9))
        res  = run_once(N, I, pi, alpha_true, p_true,
                        n_iter=n_iter, burn=burn, seed=seed)

        alpha_hat = res['alpha_hat']
        ci_low, ci_high = res['ci_low'], res['ci_high']

        # -------- se/sp 覆盖率 --------
        se_true = alpha_true[:, 1]
        sp_true = 1 - alpha_true[:, 0]

        se_hat_low, se_hat_high = ci_low[:, 1],  ci_high[:, 1]
        sp_hat_low, sp_hat_high = 1 - ci_high[:, 0], 1 - ci_low[:, 0]  # 注意顺序

        cover_se[m] = (se_true >= se_hat_low) & (se_true <= se_hat_high)
        cover_sp[m] = (sp_true >= sp_hat_low) & (sp_true <= sp_hat_high)

        # -------- α 偏差 / MSE --------
        bias_alpha[m] = alpha_hat - alpha_true          # (I,2)
        mse_alpha[m]  = (alpha_hat - alpha_true)**2

    # ========= 汇总 =========
    result = {
        "coverage_se":  cover_se.mean(axis=0),          # (I,)
        "coverage_sp":  cover_sp.mean(axis=0),
        "bias_alpha":   bias_alpha.mean(axis=0),        # (I,2)
        "mse_alpha":    mse_alpha.mean(axis=0)
    }
    return result

# ----------------- 运行示例 -----------------
p_true = np.array([[0.9, 0.8],
                   [0.9, 0.7],
                   [0.8, 0.65],
                   [0.8, 0.6],
                   [0.9, 0.7]])
alpha_true = np.array([[0.6, 0.8],
                       [0.3, 0.9],
                       [0.4, 0.7],
                       [0.5, 0.9],
                       [0.2, 0.9]])

summary = run_mc(M=200, N=500, I=5,
                 pi=0.4, alpha_true=alpha_true, p_true=p_true,
                 n_iter=3000, burn=1000, seed0=42)

print("\n95% posterior interval coverage (target ≈ 0.95)")
print("SE:", summary["coverage_se"].round(3))
print("SP:", summary["coverage_sp"].round(3))

print("\nBias of α (col-0 = α₍i0₎, col-1 = α₍i1₎):")
print(summary["bias_alpha"].round(4))

print("\nMSE of α:")
print(summary["mse_alpha"].round(5))


Monte-Carlo: 100%|██████████| 200/200 [05:31<00:00,  1.66s/it]


95% posterior interval coverage (target ≈ 0.95)
SE: [0.975 0.985 0.935 0.935 0.975]
SP: [0.95  0.965 0.98  0.935 0.94 ]

Bias of α (col-0 = α₍i0₎, col-1 = α₍i1₎):
[[-0.0036 -0.0004]
 [ 0.0039 -0.0092]
 [ 0.0019 -0.006 ]
 [-0.0004 -0.0072]
 [ 0.0025 -0.0159]]

MSE of α:
[[0.00111 0.00129]
 [0.0013  0.00184]
 [0.00121 0.00243]
 [0.00153 0.00163]
 [0.00139 0.00234]]



