In [1]:
import os 
import numpy as np
import time
# 如果没有安装 joblib，请先运行 !pip install joblib
from joblib import Parallel, delayed
from tqdm.notebook import tqdm 
import multiprocessing

# --- 设置单线程环境变量 ---
# 在并行计算中，每个子进程内部不要再进行多线程运算，否则会CPU争抢导致变慢
NUM_THREADS = 1 
os.environ["OMP_NUM_THREADS"]     = str(NUM_THREADS)
os.environ["MKL_NUM_THREADS"]     = str(NUM_THREADS)
os.environ["OPENBIAS_NUM_THREADS"] = str(NUM_THREADS)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(NUM_THREADS)
os.environ["NUMEXPR_NUM_THREADS"]  = str(NUM_THREADS)

# 检测核心数
NUM_CPU = os.cpu_count()
if NUM_CPU is None: NUM_CPU = 4
print(f'系统 CPU 核心数: {NUM_CPU}')

np.set_printoptions(suppress=True)

Number of CPUs: 20
环境配置完成。


In [2]:
def beta_true_gen(N,p,K,gamma):
    '''Generate beta_true'''
    np.random.seed(0)  
    beta_true = np.zeros([p+1,K-1])
    for k in range(K-1):
        beta_true[:,k] = np.random.normal(size = [p+1,]) 
        beta_true[:,k] = beta_true[:,k]/np.linalg.norm(beta_true[:,k])
    # 模拟稀有类截距
    beta_true[0,:] = beta_true[0,:] + gamma*np.log(N)
    return beta_true

def X_gen(N, p=5, rho=0.5):
    '''Generate features X'''
    mean = np.zeros(p)
    cov = np.zeros([p,p])
    for i in range(p):
        for j in range(i,p):
            cov[i,j]=rho**(np.abs(i-j))
    cov = cov+cov.T-np.eye(p)        
    X = np.random.multivariate_normal(mean, cov, (N,)) 
    return(X)

def get_onehot(y, baseclass=None):
    '''One-hot encoding'''
    idx = np.squeeze(y)
    ss = len(y)
    nclass = len(np.unique(y))
    z = np.zeros([ss,nclass])
    z[np.arange(ss),idx] = 1  
    ls_class = list(np.arange(nclass))
    # 注意：这里为了兼容性，如果 baseclass 未传，需要外部处理，
    # 但在这个脚本逻辑里，baseclass 是外部传入的
    if baseclass is not None:
        _ = ls_class.pop(baseclass)
    return z[:,ls_class]

print("辅助函数定义完成。")

辅助函数定义完成。


In [3]:
def mle_multilogistic_opt_ic(x, y, K0_pval, baseclass=None): 
    '''GMLE (NR algorithm)'''
    ss, ncov = x.shape  
    K = len(np.unique(y))    
    y_onehot = get_onehot(y, baseclass) 
    dist = 1.0; niter = 0
    beta0 = np.zeros([ncov*(K-1),1])  
    alpha0 = np.log((1/K0_pval-1)/(K-1)) 
    beta0[np.arange(K-1)*ncov] = alpha0 
    
    while (dist>1.0e-6) & (niter<50):
        niter += 1
        beta0mat = (beta0.reshape([(K-1),ncov]).T)*1.0
        link_mu = x @ beta0mat  
        prob = np.exp(link_mu)
        prob = prob/(1+np.sum(prob, axis=1, keepdims=True))
        resid = y_onehot - prob
        D1 = ((x.T @ resid/ss).T.flatten()).reshape([-1,1]) 
        
        Xrep = x.reshape([1,ss,ncov]) * np.ones([K-1,1,1]) 
        XMAT = (prob.T).reshape([K-1,ss,1]) * Xrep  
        XMAT = (XMAT.transpose([1,0,2])).reshape([ss,-1])  
        D2 = -(XMAT.T @ XMAT/ss)  
        
        for i in range(K-1):    
            probtmp = (prob[:,i])*1.0
            weight = np.sqrt(probtmp*(1-probtmp))
            wx = weight.reshape([ss,1]) * x
            D2[i*ncov:(i+1)*ncov, i*ncov:(i+1)*ncov] = wx.T @ wx/ss   

        # 增加扰动防止奇异矩阵
        step = (np.linalg.inv(D2 + 1.0e-6*np.eye(ncov*(K-1)))) @ D1   
        beta1 = beta0 + step
        dist = np.mean(np.abs(beta1 - beta0))
        beta0 = beta1
    return beta0.reshape([(K-1),ncov]).T, dist, niter  

def gd_multilogistic_opt_ic(x, y, K0_pval, baseclass, alpha): 
    '''GMLE (GD algorithm)'''
    ss, ncov = x.shape  
    K = len(np.unique(y)) 
    y_onehot = get_onehot(y, baseclass) 
    dist = 1.0; niter = 0
    beta0 = np.zeros([ncov*(K-1),1]) 
    alpha0 = np.log((1/K0_pval-1)/(K-1))
    beta0[np.arange(K-1)*ncov] = alpha0
    
    while (dist>1.0e-6) & (niter<1000):
        niter += 1
        beta0mat = (beta0.reshape([(K-1),ncov]).T)*1.0
        link_mu = x @ beta0mat  
        prob = np.exp(link_mu)
        prob = prob/(1+np.sum(prob, axis=1, keepdims=True))
        resid = y_onehot - prob
        D1 = ((x.T @ resid/ss).T).reshape([-1,1])
        beta1 = beta0 + alpha * D1
        dist = np.mean(np.abs(beta1 - beta0))
        beta0 = beta1
    return beta0.reshape([(K-1),ncov]).T, dist, niter 

print("GMLE 求解器定义完成。")

GMLE 求解器定义完成。


In [4]:
def mle_logistic_cpu_ic_windows(args):
    '''
    PMLE Worker function adapted for Windows.
    args: (x_subset, y_subset, K0_pval)
    '''
    # 1. 解包数据 (不再依赖全局变量)
    x, y, K0_pval = args
    
    # Optimization Logic (保持原样)
    ss, ncov = x.shape  
    y = y.reshape(ss, 1)
    # y已经是二分类标签了(0和k)，这里再次确保变为0/1
    # 注意：传入的y里，主要类是0，稀有类是k。我们把k视为1。
    y = 1 * (y != 0) 
    
    dist = 1.0
    niter = 0
    beta0 = np.zeros([ncov, 1])
    alpha0 = np.log(1/K0_pval - 1)
    beta0[0] = alpha0
    
    while (dist > 1.0e-6) & (niter < 50):
        niter += 1
        link_mu = x @ beta0  
        prob = np.exp(link_mu)
        prob = prob / (1 + prob)
        resid = y - prob
        D1 = x.T @ resid / ss  
        weight = np.sqrt(prob * (1 - prob))
        wx = weight * x 
        # 手动释放内存
        del weight
        D2 = wx.T @ wx / ss + 1.0e-6 * np.eye(ncov) 
        del wx 
        
        step = np.linalg.inv(D2) @ D1
        beta1 = beta0 + step
        dist = np.mean(np.abs(beta1 - beta0))
        beta0 = beta1
        
    return beta1.reshape([ncov,])

print("PMLE Windows版 Worker 定义完成。")

PMLE Windows版 Worker 定义完成。


In [5]:
!pip install tqdm

from tqdm.notebook import tqdm  # 专为 Jupyter 设计的进度条
import time

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting tqdm
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.1


In [None]:
# %%time 是 Jupyter 的魔术命令，必须放在单元格的第一行（代码最顶端）
# 它会统计整个单元格运行完的总时间

if __name__ == '__main__':
    # --- 参数设置 ---
    N = 10**5      # 样本量 (如果想跑得快一点看效果，可以先改成 20000)
    p = 10         # 特征维度
    K = 11         # 类别数
    gamma = -0.5   # 稀有度
    
    # !!! 注意这里 !!!
    # 原代码是 100，如果你觉得太慢，先改成 5 或 10 试试水
    nsimu = 5      
    
    alpha = 40
    baseclass = 0
    K0_pval = 0.9

    # --- 结果容器 ---
    t_nr = np.zeros(nsimu)
    t_gd = np.zeros(nsimu)
    t_pmle = np.zeros(nsimu)

    beta_hat = np.zeros([nsimu, p+1, (K-1)])
    beta_hat_nr = np.zeros([nsimu, p+1, (K-1)])
    beta_hat_gd = np.zeros([nsimu, p+1, (K-1)])
    
    dist_nr = np.zeros(nsimu)
    dist_gd = np.zeros(nsimu)
    
    # 生成真实参数
    beta_true = beta_true_gen(N, p, K, -0.5) 

    print(f"开始仿真: N={N}, K={K}, Rounds={nsimu}")
    print(f"CPU核心数: {NUM_CPU}, 进程池大小: {min(K-1, NUM_CPU)}")
    
    # --- 这里加上了 tqdm 进度条 ---
    # desc="Simulation" 会在进度条前显示文字
    # position=0, leave=True 保证进度条不会被 print 顶掉
    for b in tqdm(range(nsimu), desc="Simulation Progress"):
        
        # 1. 生成数据
        np.random.seed(b)
        X = X_gen(N, p)
        X = np.hstack([np.ones([N,1]), X]) 

        prob = np.exp(X @ beta_true) 
        prob = np.hstack([np.ones([N,1]), prob])  
        prob = prob / np.sum(prob, 1).reshape([N,1])
        prob = np.cumsum(prob, 1)   
        Y = (np.random.uniform(size = [N,1]) < prob).astype(np.int16)
        Y = np.argmax(Y, 1) 
        idx_0 = np.where(Y == 0)[0] 
        
        # 2. GMLE: Newton-Raphson
        t_start = time.time()
        beta_hat_nr[b], dist_nr[b], _ = mle_multilogistic_opt_ic(X, Y, K0_pval, baseclass=baseclass)
        t_nr[b] = time.time() - t_start
        
        # 3. GMLE: Gradient Descent
        t_start = time.time()
        beta_hat_gd[b], dist_gd[b], _ = gd_multilogistic_opt_ic(X, Y, K0_pval, baseclass, alpha)
        t_gd[b] = time.time() - t_start
        
        # 4. PMLE: Parallel (Windows Optimized)
        t_start = time.time()
        
        # Windows下必须重新打包数据
        tasks = []
        for k in range(1, K):
            idx_k = np.where(Y == k)[0]
            idx_combined = np.concatenate((idx_0, idx_k))
            X_sub = X[idx_combined]
            Y_sub = Y[idx_combined]
            tasks.append((X_sub, Y_sub, K0_pval))
            
        # 启动进程池
        with Pool(processes=min(K-1, NUM_CPU)) as pool:
            results = pool.map(mle_logistic_cpu_ic_windows, tasks)
            
        beta_hat[b] = np.array(results).T
        t_pmle[b] = time.time() - t_start
        
        # 使用 tqdm.write 代替 print，防止打印信息把进度条冲乱
        # 这里的输出可以减少一点，比如每5轮输出一次
        if (b % 1 == 0): 
            tqdm.write(f'Round {b+1}/{nsimu} | PMLE: {t_pmle[b]:.2f}s | GMLE(NR): {t_nr[b]:.2f}s')

    print("\n=== 仿真结束 ===")
    print(f"平均耗时 PMLE: {np.mean(t_pmle):.4f}s")
    print(f"平均耗时 GMLE(NR): {np.mean(t_nr):.4f}s")

开始仿真: N=100000, K=11, Rounds=5
CPU核心数: 20, 进程池大小: 10


Simulation Progress:   0%|          | 0/5 [00:00<?, ?it/s]