### View the current logical CPU count of the server

In [1]:
import os

NUM_CPU = len(os.sched_getaffinity(0)) #os.cpu_count() 

print(f'CPU Total: {NUM_CPU}')

CPU Total: 128


### Limit the number of threads that can be called by a single process

In [2]:
NUM_THREADS = 4 

os.environ["MKL_NUM_THREADS"]     = str(NUM_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(NUM_THREADS)
os.environ["OMP_NUM_THREADS"]     = str(NUM_THREADS)

NUM_PROCESS = NUM_CPU // NUM_THREADS
print(f'Maximum number of parallel processes: {NUM_PROCESS}')

Maximum number of parallel processes: 32


### Import numpy, multiprocessing and other packages

In [1]:
import numpy as np
from numpy.random import default_rng
from time import time
import multiprocessing as mp
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as st
import random
import math

### Global invariant parameters

In [217]:
K = 5; q = 8
#True parameters
sigma = 1; theta = [3,1.5,0,0,2,0,0,0]; gamma = list(range(-4,3*K-4,3));
pi = [0.15,0.2,0.3,0.25,0.1]
mean = np.zeros(q)
rho = 0.5
i, j = np.mgrid[:q, :q]
cov = rho**abs(i-j)

### Some functions needed for global calculations

In [5]:
def dup_rows(a, indx, num_dups=1):
    return np.insert(a,[indx+1]*num_dups,a[indx],axis=0)

def dup_cols(a, indx, num_dups=1):
    return np.insert(a,[indx+1]*num_dups,a[:,[indx]],axis=1)
def function_exp(x):
    return np.exp(x)
function_vexp = np.vectorize(function_exp)
def function_bin(p,x):
    return (p**x)*(1-p)**(1-x)
function_vbin = np.vectorize(function_bin)

### EM algorithm

In [226]:
def em_single_for_p(n,p,K,initial,prior_p,X,Y,Z):
    '''
    EM
    Arguments:
    initial: [pi_h,gamma_h,theta_h,sigma_h]
    priors:pj [k X 1 list]
    Y:[n X 1 list]
    X:[n X q matrix]
    Z:[n X p matrix]
    j: for p
    
    Returns:
    new_priors:new_pj
    '''
    pi_h = initial[0:K]; gamma_h = initial[K:2*K]
    theta_h = initial[2*K:2*K+q]; sigma_h = initial[-1]
    p_t = prior_p
    
    #E step -pi_ik
    c = Y-np.dot(X,theta_h)
    g = np.array(gamma_h)
    g_pi = np.array(pi_h)
    a1 = (np.ones((K,n))*c.T).T
    a2 = np.ones((n,K))*g
    e_pri = -(a1-a2)**2/(2*sigma_h**2)
    e_pri = function_vexp(e_pri)
    eb_pri = np.zeros((K,n,p))
    for k in range(K):
        p_k = p_t[k,:]
        p_k_ma = np.ones((n,p))*p_k
        prod_k = p_k_ma*Z+(1-p_k_ma)*(1-Z)
        eb_pri[k,:,:] = pi_h[k]*((np.ones((p,n))*e_pri[:,k]).T)*prod_k
    pi_ik_j = np.zeros((K,n,p))
    new_p = np.zeros((K,p))
    dd = np.sum(eb_pri,0)
    for k in range(K):
        pi_ik_j[k,:,:] = np.divide(eb_pri[k,:,:],dd)
        weight_k = np.divide(pi_ik_j[k,:,:],np.sum(pi_ik_j[k,:,:],0))
        #M step update
        new_p[k,:] = np.diag(np.dot(weight_k.T,Z))

    return new_p

### EM - Iteration

In [218]:
def em_for_p(n,p,K,initial,prior_p,X,Y,Z,tol = 1e-3,iterations=100):
    '''
    EM
    param Y,X,Z :Data
    param prior：Initial value
    param initial: Other parameters
    param tol：End of Iteration Threshold
    param iterations：Maximum number of iterations
    return：Locally optimal model parameters
    '''
    iteration = 0;
    while iteration < iterations:
        new_prior_p = em_single_for_p(n,p,K,initial,prior_p,X,Y,Z)
        p_change = (prior_p-new_prior_p)**2
        err_norm = np.mean((np.sum(p_change,1))**0.5)
        if err_norm<tol:
            break
        else:
            prior_p = new_prior_p
            iteration +=1
            #print(iteration, err_norm)
    return [new_prior_p,iteration]

### 1. Simulation Data Generator--X,Y,Z

In [8]:
def data_generator(n,p,rho_kj,seed):

    rng = default_rng(seed)                                 # Setting the seed for the random number generator
    X = rng.multivariate_normal(mean, cov, (n,), 'raise')   # X nxq
    mk_class = rng.multinomial(n, pvals=pi)
                                                            # Generate mK_gamma
    mK_gamma = []
    mK = []
    for k in range(K):
        idt = np.ones(int(mk_class[k]))
        mK.extend(idt*(k))
        mK_gamma.extend(idt*gamma[k])
    mK = [int(k) for k in mK]
    # Y
    epsilon = list(rng.normal(size=n))
    Y = mK_gamma + np.dot(X, theta) + epsilon
    # Z
    Z = np.zeros((n,p))
    for k in range(K):
        ki_ind = [i for i,x in enumerate(mK) if x==k]
        for j in range(p):
            Z[ki_ind,j] = rng.binomial(1,rho_kj[k,j],len(ki_ind))
            
    return [X, Y, Z]

### 2. Function to compute the p_kj initial value matrix

In [9]:
def p_pri_est(n,p,K,initial_est, X, Y, Z):
    
    #initial estimator
    pi_ini_est = initial_est[0:K]; gamma_ini_est = initial_est[K:2*K]
    theta_ini_est = initial_est[2*K:2*K+q]; sigma_ini_est = initial_est[-1]
    
    #p_kj initial value
    c = Y-np.dot(X,theta_ini_est)
    g = np.array(gamma_ini_est)
    g_pi = np.array(pi_ini_est)
    a1 = dup_cols(np.column_stack((c,c)), indx=0, num_dups=3)
    a2 = dup_rows(np.row_stack((g,g)), indx=0, num_dups=n-2)
    e_pri = -(a1-a2)**2/(2*sigma_ini_est**2)
    e_pri = function_vexp(e_pri)
    a_pi = dup_rows(np.row_stack((g_pi,g_pi)), indx=0, num_dups=n-2)
    pi_ik_pri = (np.divide((e_pri*a_pi).T,(np.sum(e_pri*a_pi,1)).T)).T
    weight = np.divide(pi_ik_pri,sum(pi_ik_pri,0))
    p_pri = np.dot(weight.T, Z)
        
    return p_pri

### 3. Define a mapping: random number seed$\mapsto$p estimator

In [220]:
def map_fun(b):
    
    X,Y,Z = data_generator(n,p,rho_kj, seed = b) # 生成模拟数据
    p_pri = p_pri_est(n,p,K,initial_est, X, Y, Z)#计算每次p的初值
    
    p_est, p_iter = em_for_p(n,p,K,initial_est,p_pri,X, Y, Z) # 计算估计量和迭代次数
    print(p_iter)
    return [p_est, p_iter] # 返回

### 4. Constants setting for simulation

In [4]:
n = 5000; p = 2000;   #Dimension
B = 500;             #Rep

In [251]:
# Generate True p_kj
rd = np.random.RandomState(888) 
rho_kj = rd.uniform(0.01,0.3,(K,p))
def function_2(x):
    return round(x,2)
function_vector = np.vectorize(function_2)
rho_kj = function_vector(rho_kj)
diag_pk = rd.uniform(0.8,0.95,int(p/K))
diag_pk = function_vector(diag_pk)
for k in range(K):
    np.random.shuffle(diag_pk)
    rho_kj[k,k*int(p/K):(k+1)*int(p/K)] = diag_pk
# Save True p_kj
pd.DataFrame(rho_kj).to_csv("new_R500_responce/rho_5000_2000.csv",index=False)

In [253]:
# Load Initial estimator
initial_est = np.mean(pd.read_csv("new_R500_initial/initial_est_n5000.csv"),axis=0).tolist()

### 5. Calling multiple processes for simulation experiments

In [None]:
tic1 = time()

with mp.Pool(NUM_PROCESS) as pool:             # Calling the NUM_PROCESS process
    Results1 = pool.map(map_fun, range(B))

toc1 = time()
print(toc1 - tic1)                             # Total computing time

### 6. Obtain Results

In [255]:
p_est_n5000_p2000 = np.zeros((B,K,p))
p_iter_n5000_p2000 = np.ones((B,p))
for b in range(B):
    p_est_n5000_p2000[b,:,:] = Results1[b][0]
    p_iter_n5000_p2000[b,:] = Results1[b][1]

In [256]:
# Save estimation results
for b in range(B):
    pd.DataFrame(p_est_n5000_p2000[b,:,:]).to_csv("new_R500_responce/n5000_p2000/rho_est_5000_2000_"+str(b)+".csv",index=False)
pd.DataFrame(p_iter_n5000_p2000).to_csv("new_R500_responce/n5000_p2000/rho_iter_5000_2000.csv",index=False)