## Uniform

In [27]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [28]:
import numpy as np
from sklearn.linear_model import Lasso
import matplotlib.pyplot as plt

import setting
import utils
import parametric
import CoRT_builder
from mpmath import mp


In [29]:
def parametric_uniform_test(n_target, n_source, p, K, Ka, h, lamda, alpha, iteration, T, cnt):
    s_vector = [0] * cnt
    s = len(s_vector)
    CoRT_model = CoRT_builder.CoRT(alpha=lamda)
    p_values = []

    for step in range(iteration):

        target_data, source_data = CoRT_model.gen_data(n_target, n_source, p, K, Ka, h, s_vector, s, "AR")
        similar_source_index = CoRT_model.find_similar_source(n_target, K, target_data, source_data, T=T, verbose=False)
        X_combined, y_combined = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data)

        model = Lasso(alpha=lamda, **setting.lasso_config)
        model.fit(X_combined, y_combined.ravel())
        beta_hat_target = model.coef_[-p:]

        active_indices = np.array([i for i, b in enumerate(beta_hat_target) if b != 0])

        if len(active_indices) == 0:
            print(f"Iteration {iter}: Lasso selected no features. Skipping.")
            continue

        j = np.random.choice(len(active_indices))

        X_target = target_data["X"]
        y_target = target_data["y"]
        X_active, X_inactive = utils.get_active_X(beta_hat_target, X_target)

        etaj, etajTy = utils.construct_test_statistic(y_target, j, X_active)

        Sigma = np.eye(n_target)
        b_global = Sigma @ etaj @ np.linalg.pinv(etaj.T @ Sigma @ etaj)
        a_global = (Sigma - b_global @ etaj.T) @ y_target

        folds = utils.split_target(T, X_target, y_target, n_target)

        tn_sigma = (np.sqrt(etaj.T @ Sigma @ etaj)).item()
        z_k = -20 * tn_sigma
        z_max = 20 * tn_sigma
      
        Z_train_list = parametric.get_Z_train(z_k, folds, source_data, a_global, b_global, lamda, K, T)
        Z_val_list, similar_source_index, cnt_vote, is_similar = parametric.get_Z_val(z_k, folds, T, K, a_global, b_global, lamda, source_data)

        target_data_current = {"X": X_target, "y": a_global + z_k * b_global}
        X_combined_new, y_combined_new = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data_current)
        L_CoRT, R_CoRT, Az = parametric.get_Z_CoRT(X_combined_new, similar_source_index, lamda, a_global, b_global, source_data, z_k)
        z_list = [z_k]
        Az_list = []
        stopper = "empty"
        while z_k < z_max:
            current_num_sources = len(similar_source_index)
            offset = p * current_num_sources
            Az_target_current = np.array([idx - offset for idx in Az if idx >= offset])
            Az_list.append(Az_target_current)
            mn = z_max
            stopper = "MAX"

            for val in Z_train_list:
                if mn > val[4]:
                    mn = val[4]
                    stopper = "TRAIN"

            for val in Z_val_list:
                if mn > val[3]:
                    mn = val[3]
                    stopper = "VAL"

            if mn > R_CoRT:
                mn = R_CoRT
                stopper = "CORT"

            R_final = mn

            z_k = max(R_final, z_k) + 1e-4

            if (z_k >= z_max):
                z_list.append(z_max)
                break
            else:
                z_list.append(z_k)

            update_train_needed = False
            update_val_needed = False
            update_cort_needed = False
            
            if stopper == "TRAIN":
                update_train_needed = True
                update_val_needed = True   

            elif stopper == "VAL":
                update_val_needed = True
                
            elif stopper == "CORT":
                update_cort_needed = True

            if update_train_needed:
                for val in Z_train_list:
                    if val[4] <= z_k + 1e-9:
                        l, r = parametric.update_Z_train(val, z_k, folds, source_data, a_global, b_global, lamda, K, T)
                        val[3] = l
                        val[4] = r

            if update_val_needed:
                for val in Z_val_list:
                    if stopper == "TRAIN" or val[3] <= z_k + 1e-9:
                        l, r, similar_source_index, cnt_vote, is_similar, oke = parametric.update_Z_val(val, z_k, folds, T, a_global, b_global, lamda, source_data, similar_source_index, cnt_vote, is_similar)
                        val[2] = l
                        val[3] = r
                        if oke == True:
                            update_cort_needed = True

            if update_cort_needed:
                target_data_current = {"X": X_target, "y": a_global + z_k * b_global}
                X_combined_new, y_combined_new = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data_current)
                L_CoRT, R_CoRT, Az = parametric.get_Z_CoRT(X_combined_new, similar_source_index, lamda, a_global, b_global, source_data, z_k)

        optim_p_value = utils.pivot(active_indices, Az_list, z_list, etaj, etajTy, 0, Sigma)

        if optim_p_value == 0:
            print(" WARNING: p-value is 0")
        elif optim_p_value is None:
            print(" WARNING: p-value is None")
        else:
            p_values.append(optim_p_value)
        if step % 5 == 0: 
            print(f"optim_p_value[{step}]: {optim_p_value}")

    plt.hist(p_values, bins=10, edgecolor='black', density=True)
    plt.title(f"Uniform - {iteration} iterations")
    plt.xlabel("p-value")
    plt.ylabel("Density")
    plt.show()
    return p_values

In [30]:
import json

class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)
    
h = 30
lamda = 0.1
alpha = 0.05
iteration = 100
cnt = 3
nTest = 5

all_results = []

while nTest > 0:
    nTest -= 1
    n_target = np.random.randint(6, 8)
    n_source = np.random.randint(6, 8)
    p = np.random.randint(max(n_target, n_source) + 1, 15)
    K = np.random.randint(5, 10)
    Ka = np.random.randint(3, 5)
    T = 3
    cnt = np.random.randint(1, p / 3 - (p / 3 > 3))
    
    print(f"Running test with n_target={n_target}, n_source={n_source}, p={p}, K={K}, Ka={Ka}, T={T}, cnt={cnt}")
    p_values = parametric_uniform_test(n_target, n_source, p, K, Ka, h, lamda, alpha, iteration, T, cnt)

    run_info = {
        "run_id": 5 - nTest + 1,
        "params": {
            "n_target": n_target, "n_source": n_source, "p": p, 
            "K": K, "Ka": Ka, "T": T, "cnt": cnt
        },
        "results": {
            "p_values": p_values
        }
    }
    
    all_results.append(run_info)

output_filename = "simulation_results.json"
with open(output_filename, "w", encoding="utf-8") as f:
    json.dump(all_results, f, cls=NumpyEncoder, indent=4)

print(f"Đã lưu kết quả thành công vào file: {output_filename}")

Running test with n_target=6, n_source=6, p=7, K=7, Ka=3, T=3, cnt=1


KeyboardInterrupt: 

## FPR and TPR

In [31]:
def parametric_fpr_tpr_test(n_target, n_source, p, K, Ka, h, lamda, alpha, iteration, T, cnt):
    s_vector = [1] * cnt
    s = len(s_vector)
    CoRT_model = CoRT_builder.CoRT(alpha=lamda)
    para_results_storage = []

    for step in range(iteration):
        if step % 10 == 0:
            print(f"Processing step {step}")

        target_data, source_data = CoRT_model.gen_data(n_target, n_source, p, K, Ka, h, s_vector, s, "AR")
        similar_source_index = CoRT_model.find_similar_source(n_target, K, target_data, source_data, T=T, verbose=False)
        X_combined, y_combined = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data)

        model = Lasso(alpha=lamda, **setting.lasso_config)
        model.fit(X_combined, y_combined.ravel())
        beta_hat_target = model.coef_[-p:]
        active_indices = np.array([i for i, b in enumerate(beta_hat_target) if b != 0])

        if len(active_indices) == 0:
            print(f"Iteration {iter}: Lasso selected no features. Skipping.")
            continue
        
        j = np.random.choice(len(active_indices))

        X_target = target_data["X"]
        y_target = target_data["y"]
        X_active, X_inactive = utils.get_active_X(beta_hat_target, X_target)

        etaj, etajTy = utils.construct_test_statistic(y_target, j, X_active)

        Sigma = np.eye(n_target)
        b_global = Sigma @ etaj @ np.linalg.pinv(etaj.T @ Sigma @ etaj)
        a_global = (Sigma - b_global @ etaj.T) @ y_target

        folds = utils.split_target(T, X_target, y_target, n_target)

        tn_sigma = (np.sqrt(etaj.T @ Sigma @ etaj)).item()
        z_k = -20 * tn_sigma
        z_max = 20 * tn_sigma
      
        Z_train_list = parametric.get_Z_train(z_k, folds, source_data, a_global, b_global, lamda, K, T)
        Z_val_list, similar_source_index, cnt_vote, is_similar = parametric.get_Z_val(z_k, folds, T, K, a_global, b_global, lamda, source_data)

        target_data_current = {"X": X_target, "y": a_global + z_k * b_global}
        X_combined_new, y_combined_new = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data_current)
        L_CoRT, R_CoRT, Az = parametric.get_Z_CoRT(X_combined_new, similar_source_index, lamda, a_global, b_global, source_data, z_k)
        z_list = [z_k]
        Az_list = []
        stopper = "empty"
        while z_k < z_max:
            current_num_sources = len(similar_source_index)
            offset = p * current_num_sources
            Az_target_current = np.array([idx - offset for idx in Az if idx >= offset])
            Az_list.append(Az_target_current)
            mn = z_max
            stopper = "MAX"

            for val in Z_train_list:
                if mn > val[4]:
                    mn = val[4]
                    stopper = "TRAIN"

            for val in Z_val_list:
                if mn > val[3]:
                    mn = val[3]
                    stopper = "VAL"

            if mn > R_CoRT:
                mn = R_CoRT
                stopper = "CORT"

            R_final = mn

            z_k = max(R_final, z_k) + 1e-4

            if (z_k >= z_max):
                z_list.append(z_max)
                break
            else:
                z_list.append(z_k)

            update_train_needed = False
            update_val_needed = False
            update_cort_needed = False
            
            if stopper == "TRAIN":
                update_train_needed = True
                update_val_needed = True   

            elif stopper == "VAL":
                update_val_needed = True
                
            elif stopper == "CORT":
                update_cort_needed = True

            if update_train_needed:
                for val in Z_train_list:
                    if val[4] <= z_k + 1e-9:
                        l, r = parametric.update_Z_train(val, z_k, folds, source_data, a_global, b_global, lamda, K, T)
                        val[3] = l
                        val[4] = r

            if update_val_needed:
                for val in Z_val_list:
                    if stopper == "TRAIN" or val[3] <= z_k + 1e-9:
                        l, r, similar_source_index, cnt_vote, is_similar, oke = parametric.update_Z_val(val, z_k, folds, T, a_global, b_global, lamda, source_data, similar_source_index, cnt_vote, is_similar)
                        val[2] = l
                        val[3] = r
                        if oke == True:
                            update_cort_needed = True

            if update_cort_needed:
                target_data_current = {"X": X_target, "y": a_global + z_k * b_global}
                X_combined_new, y_combined_new = CoRT_model.prepare_CoRT_data(similar_source_index, source_data, target_data_current)
                L_CoRT, R_CoRT, Az = parametric.get_Z_CoRT(X_combined_new, similar_source_index, lamda, a_global, b_global, source_data, z_k)

        optim_p_value = utils.pivot(active_indices, Az_list, z_list, etaj, etajTy, 0, Sigma)

        if optim_p_value == 0:
            print(" WARNING: p-value is 0")
        elif optim_p_value is None:
            print(" WARNING: p-value is None")

        selected_feature_index = active_indices[j]
        is_signal = (selected_feature_index < s) 
        para_results_storage.append({
            "p_value": optim_p_value,
            "is_signal": is_signal,
            "feature_idx": selected_feature_index
        })
    # Show parametric result 
    para_is_signal_cases = [r for r in para_results_storage if r['is_signal']]
    para_not_signal_cases = [r for r in para_results_storage if not r['is_signal']]

    para_false_positives = sum(1 for c in para_not_signal_cases if c['p_value'] <= alpha)
    para_fpr = para_false_positives / len(para_not_signal_cases)
    print(f"Parametric FPR: {para_fpr:.4f} (Target: {alpha})")

    para_true_positives = sum(1 for r in para_is_signal_cases if r['p_value'] <= alpha)
    para_tpr = para_true_positives / len(para_is_signal_cases)
    print(f"Parametric TPR: {para_tpr:.4f}")
    result_data = {
        "FPR": para_fpr,
        "TPR": para_tpr,
        "details": para_results_storage 
    }
    return result_data

In [None]:
import json

class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NumpyEncoder, self).default(obj)
    
h = 30
lamda = 0.1
alpha = 0.05
iteration = 100
cnt = 3
nTest = 5

all_results = []

while nTest > 0:
    nTest -= 1
    n_target = np.random.randint(6, 8)
    n_source = np.random.randint(6, 8)
    p = np.random.randint(max(n_target, n_source) + 1, 15)
    K = np.random.randint(5, 10)
    Ka = np.random.randint(3, 5)
    T = 3
    cnt = np.random.randint(1, p / 3 - (p / 3 > 3))
    
    print(f"Running test with n_target={n_target}, n_source={n_source}, p={p}, K={K}, Ka={Ka}, T={T}, cnt={cnt}")
    p_values = parametric_uniform_test(n_target, n_source, p, K, Ka, h, lamda, alpha, iteration, T, cnt)

    run_info = {
        "run_id": 5 - nTest + 1,
        "params": {
            "n_target": n_target, "n_source": n_source, "p": p, 
            "K": K, "Ka": Ka, "T": T, "cnt": cnt
        },
        "results": {
            "p_values": p_values
        }
    }
    
    all_results.append(run_info)

output_filename = "simulation_results2.json"
with open(output_filename, "w", encoding="utf-8") as f:
    json.dump(all_results, f, cls=NumpyEncoder, indent=4)

print(f"Đã lưu kết quả thành công vào file: {output_filename}")

Running test with n_target=6, n_source=7, p=13, K=6, Ka=4, T=3, cnt=1
optim_p_value[0]: 0.9400221230747862
