Dynamic

In [4]:
import numpy as np
from scipy.optimize import minimize
from scipy.linalg import svd, inv
from scipy.stats import chi2

np.random.seed(42)
n = 10        # 节点数
m = 100        # 每期样本数
d = 5        # 投影维度
T = 5         # 时间周期数
num_experiments = 100  # 实验次数N
alpha = 0.05           # 显著性水平

# ARMA参数设置
ar1, ma1 = 0.8, 0.2   # 总体一的ARMA参数
ar2, ma2 = 0.6, 0.4   # 总体二的ARMA参数
noise_scale = 0.1     # 控制噪声幅度

def generate_symmetric_prob_matrix(low, high):
    upper = np.triu(np.random.uniform(low, high, (n, n)), 1)
    sym_matrix = upper + upper.T
    np.fill_diagonal(sym_matrix, 0.0)
    return sym_matrix

# 初始概率矩阵（固定不变）
theta1_init = generate_symmetric_prob_matrix(0.2, 0.4)
theta2_init = generate_symmetric_prob_matrix(0.5, 0.9)

# 生成ARMA时间序列（固定参数）
theta1_series = generate_arma_theta(theta1_init, ar1, ma1, T, noise_scale)
theta2_series = generate_arma_theta(theta2_init, ar2, ma2, T, noise_scale)

# 初始化功效统计数组
power = np.zeros(T)

for exp in range(num_experiments):
    for t_idx in range(T):
        t = t_idx + 1  # 时间点从1到T
        
        # 生成两组样本
        samples_group1 = generate_adj_sample(theta1_series[t], m)
        samples_group2 = generate_adj_sample(theta2_series[t], m)
        
        # 计算Theta矩阵
        eps = 1e-8
        mean_adj1 = samples_group1.mean(axis=0)
        mean_adj2 = samples_group2.mean(axis=0)
        Theta1 = mean_adj1 / (1 - mean_adj1 + eps)
        Theta2 = mean_adj2 / (1 - mean_adj2 + eps)
        
        # 优化投影矩阵
        U0, _, Vt0 = svd((Theta1 - Theta2) / 2)
        initial_params = np.concatenate([U0[:, :d].ravel(), Vt0[:d, :].T.ravel()])
        res = minimize(projection_loss, initial_params, args=(Theta1, Theta2, d),
                       method='L-BFGS-B', options={'maxiter': 500})
        U_opt = res.x[:n*d].reshape(n, d)
        V_opt = res.x[n*d:].reshape(n, d)
        
        # 计算gamma参数和Wald统计量
        J = np.kron(V_opt.T, U_opt.T)  # 形状 (25, 100)
        vec_Theta = (Theta1 + Theta2).ravel() / 2  # 形状 (100,)
        gamma2 = (Theta1 - Theta2).ravel() / 2     # 形状 (100,)
        gamma1 = vec_Theta  # 形状 (100,)

        # 投影后的参数
        gamma1_proj = J @ gamma1  # 形状 (25,)
        gamma2_proj = J @ gamma2  # 形状 (25,)

        # 计算F_hat矩阵（参数空间协方差）
        f_prime = vec_Theta * (1 - vec_Theta)  # 形状 (100,)
        F_hat = 2 * J @ np.diag(f_prime) @ J.T  # 正确矩阵乘法 (25,25)
        F_hat_inv = inv(F_hat + 1e-6 * np.eye(25))  # 添加正则化避免奇异矩阵

        # 计算G_hat矩阵（投影空间方差）
        g_prime_diag = gamma1_proj * (1 - gamma1_proj)  # 基于投影参数的方差 (25,)
        G_hat = 2 * np.diag(g_prime_diag)  # 构造对角矩阵 (25,25)

        # 计算Wald统计量（投影空间二次型）
        W = n * gamma2_proj.T @ G_hat @ F_hat_inv @ G_hat @ gamma2_proj
        W = W.item()  # 转换为标量
        p_value = 1 - chi2.cdf(W, d*d)  # 自由度为d^2的卡方分布

        # 统计拒绝次数
        if p_value < alpha:
            power[t_idx] += 1


    print(f"实验 {exp+1}/{num_experiments} 完成")

power /= num_experiments

# 输出各时间点的功效
print("\n各时间点的功效:")
for t in range(T):
    print(f"时间点 {t+1}: {power[t]:.4f}")

实验 1/100 完成
实验 2/100 完成
实验 3/100 完成
实验 4/100 完成
实验 5/100 完成
实验 6/100 完成
实验 7/100 完成
实验 8/100 完成
实验 9/100 完成
实验 10/100 完成
实验 11/100 完成
实验 12/100 完成
实验 13/100 完成
实验 14/100 完成
实验 15/100 完成
实验 16/100 完成
实验 17/100 完成
实验 18/100 完成
实验 19/100 完成
实验 20/100 完成
实验 21/100 完成
实验 22/100 完成
实验 23/100 完成
实验 24/100 完成
实验 25/100 完成
实验 26/100 完成
实验 27/100 完成
实验 28/100 完成
实验 29/100 完成
实验 30/100 完成
实验 31/100 完成
实验 32/100 完成
实验 33/100 完成
实验 34/100 完成
实验 35/100 完成
实验 36/100 完成
实验 37/100 完成
实验 38/100 完成
实验 39/100 完成
实验 40/100 完成
实验 41/100 完成
实验 42/100 完成
实验 43/100 完成
实验 44/100 完成
实验 45/100 完成
实验 46/100 完成
实验 47/100 完成
实验 48/100 完成
实验 49/100 完成
实验 50/100 完成
实验 51/100 完成
实验 52/100 完成
实验 53/100 完成
实验 54/100 完成
实验 55/100 完成
实验 56/100 完成
实验 57/100 完成
实验 58/100 完成
实验 59/100 完成
实验 60/100 完成
实验 61/100 完成
实验 62/100 完成
实验 63/100 完成
实验 64/100 完成
实验 65/100 完成
实验 66/100 完成
实验 67/100 完成
实验 68/100 完成
实验 69/100 完成
实验 70/100 完成
实验 71/100 完成
实验 72/100 完成
实验 73/100 完成
实验 74/100 完成
实验 75/100 完成
实验 76/100 完成
实验 77/100 完成
实验 78/10

Fix

In [83]:
import numpy as np
from scipy.optimize import minimize
from scipy.linalg import svd, inv
from scipy.stats import chi2

np.random.seed(42)
n = 10        # 节点数
m = 100        # 每期样本数
d = 5        # 投影维度
T = 5         # 时间周期数
num_experiments = 100  # 实验次数N
alpha = 0.05           # 显著性水平

# ARMA参数设置
ar1, ma1 = 0.8, 0.2   # 总体一的ARMA参数
ar2, ma2 = 0.6, 0.4   # 总体二的ARMA参数
noise_scale = 0.1     # 控制噪声幅度

def generate_symmetric_prob_matrix(low, high):
    upper = np.triu(np.random.uniform(low, high, (n, n)), 1)
    sym_matrix = upper + upper.T
    np.fill_diagonal(sym_matrix, 0.0)
    return sym_matrix

# 初始概率矩阵（固定不变）
theta1_init = generate_symmetric_prob_matrix(0.2, 0.4)
theta2_init = generate_symmetric_prob_matrix(0.5, 0.9)

# 生成ARMA时间序列（固定参数）
theta1_series = generate_arma_theta(theta1_init, ar1, ma1, T, noise_scale)
theta2_series = generate_arma_theta(theta2_init, ar2, ma2, T, noise_scale)

# 初始化功效统计数组
power = np.zeros(T)

for exp in range(num_experiments):
    # 存储第一期的投影矩阵
    U_fixed = None
    V_fixed = None
    
    for t_idx in range(T):
        t = t_idx + 1  # 时间点从1到T
        
        # 生成两组样本
        samples_group1 = generate_adj_sample(theta1_series[t], m)
        samples_group2 = generate_adj_sample(theta2_series[t], m)
        
        # 计算Theta矩阵
        eps = 1e-8
        mean_adj1 = samples_group1.mean(axis=0)
        mean_adj2 = samples_group2.mean(axis=0)
        Theta1 = mean_adj1 / (1 - mean_adj1 + eps)
        Theta2 = mean_adj2 / (1 - mean_adj2 + eps)
        
        # 仅在第一期计算投影矩阵
        if t_idx == 0:  # 第一期
            # 优化投影矩阵
            U0, _, Vt0 = svd((Theta1 - Theta2) / 2)
            initial_params = np.concatenate([U0[:, :d].ravel(), Vt0[:d, :].T.ravel()])
            res = minimize(projection_loss, initial_params, args=(Theta1, Theta2, d),
                           method='L-BFGS-B', options={'maxiter': 500})
            U_fixed = res.x[:n*d].reshape(n, d)
            V_fixed = res.x[n*d:].reshape(n, d)
        
        # 使用固定或已计算的投影矩阵
        U_opt = U_fixed
        V_opt = V_fixed
        
        # 计算gamma参数和Wald统计量
        J = np.kron(V_opt.T, U_opt.T)  # 形状 (25, 100)
        vec_Theta = (Theta1 + Theta2).ravel() / 2  # 形状 (100,)
        gamma2 = (Theta1 - Theta2).ravel() / 2     # 形状 (100,)
        gamma1 = vec_Theta  # 形状 (100,)

        # 投影后的参数
        gamma1_proj = J @ gamma1  # 形状 (25,)
        gamma2_proj = J @ gamma2  # 形状 (25,)

        # 计算F_hat矩阵（参数空间协方差）
        f_prime = vec_Theta * (1 - vec_Theta)  # 形状 (100,)
        F_hat = 2 * J @ np.diag(f_prime) @ J.T  # 正确矩阵乘法 (25,25)
        F_hat_inv = inv(F_hat + 1e-6 * np.eye(25))  # 添加正则化避免奇异矩阵

        # 计算G_hat矩阵（投影空间方差）
        g_prime_diag = gamma1_proj * (1 - gamma1_proj)  # 基于投影参数的方差 (25,)
        G_hat = 2 * np.diag(g_prime_diag)  # 构造对角矩阵 (25,25)

        # 计算Wald统计量（投影空间二次型）
        W = n * gamma2_proj.T @ G_hat @ F_hat_inv @ G_hat @ gamma2_proj
        W = W.item()  # 转换为标量
        p_value = 1 - chi2.cdf(W, d*d)  # 自由度为d^2的卡方分布

        # 统计拒绝次数
        if p_value < alpha:
            power[t_idx] += 1

            print(f"实验 {exp+1}/{num_experiments} 完成")

power /= num_experiments

# 输出各时间点的功效
print("\n各时间点的功效:")
for t in range(T):
    print(f"时间点 {t+1}: {power[t]:.4f}")

实验 1/100 完成
实验 1/100 完成
实验 1/100 完成
实验 2/100 完成
实验 2/100 完成
实验 2/100 完成
实验 3/100 完成
实验 3/100 完成
实验 3/100 完成
实验 4/100 完成
实验 4/100 完成
实验 4/100 完成
实验 4/100 完成
实验 5/100 完成
实验 5/100 完成
实验 5/100 完成
实验 5/100 完成
实验 6/100 完成
实验 6/100 完成
实验 7/100 完成
实验 7/100 完成
实验 8/100 完成
实验 8/100 完成
实验 8/100 完成
实验 8/100 完成
实验 9/100 完成
实验 9/100 完成
实验 10/100 完成
实验 10/100 完成
实验 10/100 完成
实验 10/100 完成
实验 11/100 完成
实验 11/100 完成
实验 11/100 完成
实验 11/100 完成
实验 12/100 完成
实验 12/100 完成
实验 12/100 完成
实验 13/100 完成
实验 13/100 完成
实验 14/100 完成
实验 14/100 完成
实验 15/100 完成
实验 15/100 完成
实验 15/100 完成
实验 16/100 完成
实验 16/100 完成
实验 16/100 完成
实验 16/100 完成
实验 17/100 完成
实验 17/100 完成
实验 17/100 完成
实验 17/100 完成
实验 18/100 完成
实验 18/100 完成
实验 18/100 完成
实验 19/100 完成
实验 19/100 完成
实验 20/100 完成
实验 20/100 完成
实验 20/100 完成
实验 20/100 完成
实验 20/100 完成
实验 21/100 完成
实验 21/100 完成
实验 21/100 完成
实验 22/100 完成
实验 22/100 完成
实验 22/100 完成
实验 22/100 完成
实验 23/100 完成
实验 23/100 完成
实验 23/100 完成
实验 24/100 完成
实验 24/100 完成
实验 24/100 完成
实验 24/100 完成
实验 25/100 完成
实验 25/100 完成
