#### This notebook is used to conduct parameters estimation under the Stochastic Block Model (SBM) setting.

In [1]:
# Importing utils functions needed for simulation studies
import utils
from importlib import reload
reload(utils)

<module 'utils' from '/mnt/code/simulation/scripts/utils.py'>

In [2]:
# Importing necessary packages
import pandas as pd
import sys,codecs
import numpy as np
import re
import os
import matplotlib.pyplot as plt
import scipy.stats as st
import collections
import statsmodels.api as sm
from time import time
import random

In [3]:
# Importing Ray, a distributed computing framework for enabling parallel and scalable execution
import ray

ray.init(address='auto')  

2025-07-27 15:34:30,159	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 192.168.1.7:6379...
2025-07-27 15:34:30,218	INFO worker.py:1927 -- Connected to Ray cluster.


0,1
Python version:,3.12.4
Ray version:,2.48.0


[36m(map_fun_fmle pid=17319, ip=192.168.1.5)[0m cmle is over
[36m(map_fun_fmle pid=17319, ip=192.168.1.5)[0m Z_err_max: 4.636100950961921 Z_err_max1: 4.4003303832439125
[36m(map_fun_fmle pid=17319, ip=192.168.1.5)[0m SARX is over
[36m(map_fun_fmle pid=17330, ip=192.168.1.5)[0m cmle is over
[36m(map_fun_fmle pid=17330, ip=192.168.1.5)[0m Z_err_max: 4.3888966413125825 Z_err_max1: 5.58307786260088
[36m(map_fun_fmle pid=17319, ip=192.168.1.5)[0m fmle is over
[36m(map_fun_fmle pid=17874, ip=192.168.1.6)[0m cmle is over
[36m(map_fun_fmle pid=17874, ip=192.168.1.6)[0m Z_err_max: 4.924954618858843 Z_err_max1: 4.769929875188988
[36m(map_fun_fmle pid=15152)[0m cmle is over[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
[36m(map_fun_fmle pid=15152)[0m Z_err_max: 4.983838187030

In [4]:
print(ray.cluster_resources())

{'CPU': 124.0, 'memory': 184393276213.0, 'object_store_memory': 79025689803.0, 'node:192.168.1.6': 1.0, 'node:192.168.1.5': 1.0, 'node:192.168.1.8': 1.0, 'node:__internal_head__': 1.0, 'node:192.168.1.7': 1.0}


In [5]:
# Importing OS module for interacting with the operating system (e.g., file paths, directories)
import os

NUM_CPU = (ray.cluster_resources())['CPU'] #os.cpu_count() 

print(f'CPU total: {NUM_CPU}')

CPU total: 124.0


In [6]:
NUM_THREADS = 4

os.environ["MKL_NUM_THREADS"]     = str(NUM_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(NUM_THREADS)
os.environ["OMP_NUM_THREADS"]     = str(NUM_THREADS)

NUM_PROCESS = NUM_CPU // NUM_THREADS
print(f'max process: {NUM_PROCESS}')

max process: 31.0


### True parameters and network setting 

In [7]:
# Dimensions 
n = 500; p = 50; q = 20; d = 3

In [8]:
# true parameter
seed = 666
rng = np.random.default_rng(seed) 
tau = np.round(rng.uniform(0.15,0.2,p),4)
rho = np.round(rng.uniform(0.2,0.9,p),4)
beta0 = np.round(rng.uniform(0.5,1,(p,10)),4)
beta1 = np.zeros((p,q-10))
beta = np.hstack((beta0, beta1))
bc = np.array(np.hstack([rng.normal(0,1,(p,d))]))
# Omega mean and variance
mean = np.zeros(p)
ta = 0.15
i, j = np.mgrid[:p, :p]
cov = ta**2*ta**abs(i-j)
cov[list(range(p)),list(range(p))] = ta
sigma2 = np.zeros(p)
for j in range(p):
    sigma2[j] = bc[j,:]@bc[j,:] + ta

In [9]:
# Adjacency matrix-SBM
A = np.zeros((n,n))
K = 5
nk = int(n/K)
p1 = 9/n
p2 = 3/n
P = (np.kron(np.eye(K),(p1-p2)*np.ones((nk,nk)))+p2*np.ones((n,n))).flatten()
A = (rng.binomial(1,P)).reshape((n,n))

ind = (np.where(np.sum(A,1)==0))[0]
for i in ind:
    A[i,rng.choice(list(range(n)),6)] = 1
A[list(range(n)),list(range(n))] = 0
W = A/np.sum(A,1).reshape(n,1)  

In [10]:
# check
print(np.sum(A))
np.max(np.sum(A,1))

2114


np.int64(13)

### Save path setting and save parameters

In [11]:
Save_path_dir = '../results/Results_Block_02/'

In [12]:
#save true values
pd.DataFrame(rho).to_csv(Save_path_dir+f"n{n}_p{p}_B500/rho_true_n{n}_p{p}.csv",index=False)
pd.DataFrame(beta).to_csv(Save_path_dir+f"n{n}_p{p}_B500/beta_true_n{n}_p{p}.csv",index=False)
pd.DataFrame(bc).to_csv(Save_path_dir+f"n{n}_p{p}_B500/B_true_n{n}_p{p}.csv",index=False)
pd.DataFrame(A).to_csv(Save_path_dir+f"A_smallp_n{n}.csv",index=False)

### Parallel framework-Ray and full process

In [13]:
# parallel computation framework
import ray

ray.init(num_cpus=NUM_CPU, ignore_reinit_error=True)

2025-07-27 15:34:44,699	INFO worker.py:1747 -- Connecting to existing Ray cluster at address: 192.168.1.7:6379...
2025-07-27 15:34:44,700	INFO worker.py:1765 -- Calling ray.init() again after it has already been called.


0,1
Python version:,3.12.4
Ray version:,2.48.0


In [14]:
# Randomly selected one dimension response for the CP
jj = rng.integers(0, p)
Save_path_process = os.path.abspath(Save_path_dir + 'process.txt')
Save_path_process_ci = os.path.abspath(Save_path_dir + 'process_ci.txt')

In [15]:
# Whole process
# Defining `map_fun_fmle` function, implementing a specific computational task
@ray.remote(num_cpus=4) 
def map_fun_fmle(bb):
    
    Y, X, Z = utils.data_generator(n, p, q, d, W, mean, cov, rho, beta, bc, seed = bb + 88)
    n0 = int(0.1*n)
    n1 = n - n0
    WW0 = W[:n0,:n0]
    Y0, X0, Z0 = Y[:n0], X[:n0,:], Z[:n0,:]  
    WW1 = W[n0:,n0:]
    Y1, X1, Z1 = Y[n0:], X[n0:,:], Z[n0:,:]
    # tic1n = time()
    #initial estimators
    rho_hi, beta_hi, sigma2_hi, ite_hi = utils.optimize_initial(n1, p, q, WW1, Y1, X1, bb, Save_path_process)
    rho_hb, beta_hb, sigma2_hb, ite_hb = utils.optimize_initial(n0, p, q, WW0, Y0, X0, bb, Save_path_process)
    print('cmle is over')
    # toc1n = time()
    # print(toc1n - tic1n)
    
    # Estimate Zi and M
    Z_h, M = utils.Z_est(n0, p, q, d, WW0, WW1, Y1, X1, rho_hi, beta_hi, Y0, X0, rho_hb, beta_hb)
    
    Z_err_max = np.max(abs(Z_h - Z1))
    Z_err_max1 = np.max(abs(-Z_h - Z1))
    print('Z_err_max:', Z_err_max, 'Z_err_max1:', Z_err_max1)
    if Z_err_max>Z_err_max1:
        Z_h = - Z_h
    if (Z_err_max>10) and (Z_err_max1>10):
        return np.zeros((8,p)),np.zeros(p),np.zeros(p),np.zeros(p)
    
    # FMLE with true Z
    rho_hfZ, beta_hfZ, b_hfZ, tau2_hfZ, ite_hfZ = utils.optimize(n1, p, q, d, WW1, Z1, Y1, X1, bb, Save_path_process)
    print('SARX is over')
    
    # FMLE with estimated Z
    rho_hf, beta_hf, b_hf, tau2_hf, ite_hf = utils.optimize(n1, p, q, d, WW1, Z_h, Y1, X1, bb, Save_path_process)
    print('fmle is over')

    H_hat = M.T@M/p
    bc_hat = b_hf.T@H_hat
    sigma2_hat = np.zeros(p)
    for j in range(p):
        sigma2_hat[j] = bc_hat[j,:]@bc_hat[j,:] + tau2_hf[j]

    lower_est = np.zeros(p)
    upper_est = np.zeros(p)
    q0 = 10
    # for j in range(p):
    #     tic1nj = time()
    #     lower_est[j], upper_est[j] = CI_est_new(j, n1, p, q0, d, WW1, rho_hf, beta_hf, b_hf, tau2_hf, M, sigma2_hat, Y1, X1, Z_h, 1.96)
    #     toc1nj = time()
    #     with open('Results_Block_02/process.txt', 'a') as f1:
    #         f1.write(str(bb)+', '+str(j)+', '+str(toc1nj - tic1nj)+', '+str(lower_est[j])+', '+str(upper_est[j])+'\n')   
    # CI computation
    tic1nj = time()
    lower_est[jj], upper_est[jj] = utils.CI_est_test(jj, n1, p, q0, d, WW1, rho_hf, beta_hf, b_hf, tau2_hf, M, sigma2_hat, Y1, X1, Z_h, 1.96)
    toc1nj = time()  
    with open(Save_path_process_ci, 'a') as f1:
        f1.write(str(bb)+', '+str(jj)+', '+str(toc1nj - tic1nj)+', '+str(lower_est[jj])+', '+str(upper_est[jj])+'\n')   
 

    return np.array([rho_hi, rho_hfZ, rho_hf, sigma2_hi, tau2_hfZ, tau2_hf, lower_est, upper_est]), ite_hi, ite_hfZ, ite_hf

In [16]:
from time import time
from numpy.random import default_rng

### Start parallel experiments

In [17]:
# Whole experiments
BB = 500
tic1 = time()
tasks = [map_fun_fmle.remote(bb) for bb in range(BB)]
results500_p50whole = ray.get(tasks)  
toc1 = time()
print(toc1 - tic1) # Total time

4303.228565454483


In [59]:
import ray
ray.shutdown()

### Save results and analysis

In [18]:
# Save Results
BB = 500
q0 = 10
theta_hat_Block_n500_p50_B500 = np.zeros((BB,8,p))

ite_ini = np.zeros((BB,p))
ite_fZin = np.zeros((BB,p))
ite_fin = np.zeros((BB,p))
#tic1 = time()
for bt in range(BB):
    theta_hat_Block_n500_p50_B500[bt,:,:], ite_ini[bt,:], ite_fZin[bt,:], ite_fin[bt,:] = results500_p50whole[bt][0], results500_p50whole[bt][1],results500_p50whole[bt][2],results500_p50whole[bt][3]
#toc1 = time()

In [19]:
# ECP computation
cc = list(set(range(BB)).difference(set(np.where(ite_fin==0)[0])))
ECP = np.zeros(len(cc))
i=0
for b in cc:
    ECP[i] = (theta_hat_Block_n500_p50_B500[b,6,jj]<=rho[jj]) & (rho[jj]<=theta_hat_Block_n500_p50_B500[b,7,jj])
    i = i+1

In [20]:
np.mean(ECP)

np.float64(0.944)

In [21]:
#cmle
np.nanmean(abs(theta_hat_Block_n500_p50_B500[cc,0,:]-((rho.reshape((p,1))@np.ones((1,BB))).T)[cc,:]))

np.float64(0.034652235865416305)

In [22]:
#SARX
np.nanmean(abs(theta_hat_Block_n500_p50_B500[cc,1,:]-((rho.reshape((p,1))@np.ones((1,BB))).T)[cc,:]))

np.float64(0.010573491988717933)

In [23]:
#fmle
np.nanmean(abs(theta_hat_Block_n500_p50_B500[cc,2,:]-((rho.reshape((p,1))@np.ones((1,BB))).T)[cc,:]))

np.float64(0.021386851698667914)

In [24]:
# Save results
for b in range(BB):
    pd.DataFrame(theta_hat_Block_n500_p50_B500[b,:,:]).to_csv("../results/Results_Block_02/n500_p50_B500/theta_hat_Block_n500_p50_"+str(b)+'_.csv',index=False)