In [5]:
import numpy as np
from math import sqrt, log, exp, pi
from statistics import mean

# -----------------------------
# 1) 目标分布：高斯混合
# -----------------------------
w1, w2 = 0.3, 0.7
mu1, var1 = 4.0, 0.3
mu2, var2 = 7.0, 2.0

def log_norm_pdf(x, mu, var):
    # 正态分布密度的对数：log N(x|mu,var)
    return -0.5*log(2*pi*var) - 0.5*((x-mu)**2)/var

def log_pi(x):
    # 使用 log-sum-exp 技巧计算混合分布的对数密度，避免数值下溢
    a = log(w1) + log_norm_pdf(x, mu1, var1)
    b = log(w2) + log_norm_pdf(x, mu2, var2)
    m = max(a, b)
    return m + log(exp(a-m) + exp(b-m))

def U(x):
    # 势能函数：U(x) = -log π(x)
    return -log_pi(x)

def grad_U(x):
    """
    势能函数的梯度（1维）：
    dU/dx = r1*(x-mu1)/var1 + r2*(x-mu2)/var2
    其中 r_k 为混合模型中的“责任度”（后验权重）
    """
    a = log(w1) + log_norm_pdf(x, mu1, var1)
    b = log(w2) + log_norm_pdf(x, mu2, var2)
    m = max(a, b)
    ea = exp(a - m)
    eb = exp(b - m)
    denom = ea + eb
    r1 = ea / denom
    r2 = eb / denom
    return r1*(x-mu1)/var1 + r2*(x-mu2)/var2

# -----------------------------
# 2) HMC 采样器（1维）
# -----------------------------
def hmc_step(x_current, eps, L, M=1.0, rng=None):
    """
    一次 HMC 迭代：
    - 从 p ~ N(0, M) 中采样动量
    - 使用步长 eps、步数 L 的 leapfrog 方法进行数值积分
    - 基于哈密尔顿量差值进行 Metropolis-Hastings 接受/拒绝
    返回：x_next, 是否接受 (bool)
    """
    if rng is None:
        rng = np.random.default_rng()

    # 抽样动量
    p_current = rng.normal(loc=0.0, scale=sqrt(M))

    # 当前哈密尔顿量
    H_current = U(x_current) + (p_current**2) / (2.0*M)

    # 初始化提议点
    x = float(x_current)
    p = float(p_current)

    # leapfrog 数值积分
    p -= 0.5 * eps * grad_U(x)
    for _ in range(L):
        x += eps * (p / M)
        # 除最后一步外，进行完整的动量更新
        if _ != L - 1:
            p -= eps * grad_U(x)
    p -= 0.5 * eps * grad_U(x)

    # 反转动量以保持可逆性
    p = -p

    # 提议点的哈密尔顿量
    H_prop = U(x) + (p**2) / (2.0*M)

    # Metropolis-Hastings 接受概率
    log_alpha = -(H_prop - H_current)
    if np.log(rng.random()) < log_alpha:
        return x, True
    else:
        return x_current, False

def run_hmc(x0, n_samples=20000, burn_in=2000, thin=1,
            eps=0.15, L_min=10, L_max=30, M=1.0, seed=0):
    """
    运行一条 HMC 链并返回样本（烧入后、抽稀后）与接受率
    每次迭代随机选择 L ∈ [L_min, L_max]，减少轨道的周期性
    """
    rng = np.random.default_rng(seed)
    x = float(x0)
    accepts = 0
    kept = []

    total_iters = burn_in + n_samples * thin
    for t in range(total_iters):
        L = int(rng.integers(L_min, L_max + 1))
        x, acc = hmc_step(x, eps=eps, L=L, M=M, rng=rng)
        accepts += int(acc)

        # 烧入后保存样本，并进行抽稀
        if t >= burn_in and ((t - burn_in) % thin == 0):
            kept.append(x)

    accept_rate = accepts / total_iters
    return np.array(kept), accept_rate

# -----------------------------
# 3) 真分布的 CDF（用于 KS 距离）
# -----------------------------
def std_norm_cdf(z):
    # 使用误差函数 erf 近似标准正态分布函数 Φ
    # Φ(z) = 0.5*(1+erf(z/sqrt(2)))
    from math import erf
    return 0.5 * (1.0 + erf(z / sqrt(2.0)))

def true_cdf(x):
    return (w1 * std_norm_cdf((x - mu1) / sqrt(var1)) +
            w2 * std_norm_cdf((x - mu2) / sqrt(var2)))

def ks_distance(samples, grid_size=2000):
    """
    近似计算 KS 距离：
    D = sup_x |F_hat(x) - F(x)|
    在样本区间上构造网格进行数值近似
    """
    s = np.sort(samples)
    n = len(s)
    lo = s[0] - 3.0
    hi = s[-1] + 3.0
    grid = np.linspace(lo, hi, grid_size)

    # 经验分布函数
    idx = np.searchsorted(s, grid, side="right")
    F_hat = idx / n
    F_true = np.array([true_cdf(x) for x in grid])
    return np.max(np.abs(F_hat - F_true))

# -----------------------------
# 4) 有效样本量 ESS
# -----------------------------
def ess_1d(x, max_lag=None):
    """
    简单 ESS 估计公式：
    ESS = n / (1 + 2 * sum_{k=1}^{K} rho_k)
    其中 K 由“正序列”规则或最大滞后阶数确定
    """
    x = np.asarray(x)
    n = len(x)
    x = x - x.mean()
    var = np.dot(x, x) / n
    if var <= 0:
        return float(n)

    if max_lag is None:
        max_lag = min(2000, n - 1)

    # 计算自相关系数
    rhos = []
    for k in range(1, max_lag + 1):
        cov = np.dot(x[:-k], x[k:]) / n
        rho = cov / var
        rhos.append(rho)

    # 正序列规则：当自相关变为非正时停止累加
    ssum = 0.0
    for rho in rhos:
        if rho <= 0:
            break
        ssum += rho

    ess = n / (1.0 + 2.0 * ssum)
    return max(1.0, min(float(n), ess))

# -----------------------------
# 5) 运行两条链（针对双峰分布更稳妥）
# -----------------------------
if __name__ == "__main__":
    # 参数设置
    eps = 0.3
    L_min, L_max = 10, 30
    M = 1.0
    n_samples = 20000
    burn_in = 3000
    thin = 1

    # 链 A：从第一个峰附近初始化
    s1, acc1 = run_hmc(x0=4.0, n_samples=n_samples, burn_in=burn_in, thin=thin,
                       eps=eps, L_min=L_min, L_max=L_max, M=M, seed=1)
    # 链 B：从第二个峰附近初始化
    s2, acc2 = run_hmc(x0=7.0, n_samples=n_samples, burn_in=burn_in, thin=thin,
                       eps=eps, L_min=L_min, L_max=L_max, M=M, seed=2)

    samples = np.concatenate([s1, s2])

    # 理论矩（真值）
    true_mean = w1*mu1 + w2*mu2  # 6.1
    true_var = w1*(var1 + (mu1-true_mean)**2) + w2*(var2 + (mu2-true_mean)**2)  # 3.38

    # 样本矩
    samp_mean = float(np.mean(samples))
    samp_var = float(np.var(samples, ddof=1))

    # 误差
    err_mean = abs(samp_mean - true_mean)
    err_var = abs(samp_var - true_var)

    # KS 距离
    Dks = ks_distance(samples)

    # 诊断指标
    ess_val = ess_1d(samples)

    print("=== HMC 诊断信息 ===")
    print(f"链1 接受率 = {acc1:.3f}")
    print(f"链2 接受率 = {acc2:.3f}")
    print(f"合并后样本数 = {len(samples)}")
    print(f"有效样本量 ESS（粗略） = {ess_val:.1f}")

    print("\n=== 分布误差指标 ===")
    print(f"样本均值 = {samp_mean:.6f}, 理论均值 = {true_mean:.6f}, |误差| = {err_mean:.6f}")
    print(f"样本方差 = {samp_var:.6f}, 理论方差 = {true_var:.6f}, |误差| = {err_var:.6f}")
    print(f"KS 距离 D_KS ≈ {Dks:.6f}")


=== HMC 诊断信息 ===
链1 接受率 = 0.992
链2 接受率 = 0.992
合并后样本数 = 40000
有效样本量 ESS（粗略） = 15682.8

=== 分布误差指标 ===
样本均值 = 6.085324, 理论均值 = 6.100000, |误差| = 0.014676
样本方差 = 3.379561, 理论方差 = 3.380000, |误差| = 0.000439
KS 距离 D_KS ≈ 0.005659
