# Testing loforest tuning and comparing to several K

In [54]:
# importing packages
from sklearn.ensemble import HistGradientBoostingRegressor
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# font size
sns.set_style("white", rc={"font_scale": 1.5})

# loforest and locart functions
from CP2LFI.loforest import ConformalLoforest
from CP2LFI.scores import LambdaScore

from clover import Scores
from clover import LocartSplit

from copy import deepcopy
from tqdm import tqdm

from scipy import stats
from scipy.optimize import minimize_scalar

import time

## BFF setting

In [55]:
# BFF functions
def sim_X(n, theta, rng):
    X = rng.normal(theta, 1, n)
    return X

def sim_lambda(B, N, theta, rng, sigma=0.25):
    lambdas = np.zeros(B)
    for i in range(0, B):
        X = sim_X(N, theta, rng)
        lambdas[i] = compute_pdf_posterior(theta, X, sigma=sigma)
    return lambdas

def sample_posterior(n, N, rng, sigma=0.25):
    thetas = rng.uniform(-5, 5, size=n)
    lambdas = np.zeros(n)
    i = 0
    for theta in thetas:
        X = sim_X(N, theta, rng)
        lambdas[i] = compute_pdf_posterior(theta, X, sigma=sigma)
        i += 1
    return thetas, lambdas

def compute_pdf_posterior(theta, x, sigma=0.25):
    n = x.shape[0]
    mu_value = (1 / ((1 / sigma) + n)) * (np.sum(x))
    sigma_value = ((1 / sigma) + n) ** (-1)
    return -stats.norm.pdf(theta, loc=mu_value, scale=np.sqrt(sigma_value))

# naive method
def naive(alpha, rng, B=1000, N=100, lower=-5, upper=5, naive_n=100, sigma=0.25):
    n_grid = int(B / naive_n)
    thetas = np.linspace(lower, upper, n_grid)
    quantiles = {}
    for theta in thetas:
        lambdas = sim_lambda(naive_n, N, theta, rng, sigma=sigma)
        quantiles[theta] = np.quantile(lambdas, q=1 - alpha)
    return quantiles

# naive predict function
def predict_naive_quantile(theta_grid, quantiles_dict):
    thetas_values = np.array(list(quantiles_dict.keys()))
    quantiles_list = []
    for theta in theta_grid:
        idx = thetas_values[int(np.argmin(np.abs(theta - thetas_values)))]
        quantiles_list.append(quantiles_dict[idx])
    return quantiles_list

# obtaining quantile for a specific K in loforest
def obtain_quantiles(
    thetas,
    N,
    rng,
    B=1000,
    alpha=0.05,
    sigma=0.25,
    min_samples_leaf=300,
    K = 50,
    tune_k = True,
    n_estimators = 200,
    step = 5,
    pct_cover = 0.95,
    min_samples = 300
):
    # simulating to fit models
    theta_sim, model_lambdas = sample_posterior(n=B, N=N, rng=rng, sigma=sigma)
    model_thetas = theta_sim.reshape(-1, 1)
    # loforest quantiles
    start_time = time.time()
    loforest_object = ConformalLoforest(
        LambdaScore, None, alpha=alpha, is_fitted=True, split_calib=False, tune_K = tune_k
    )
    loforest_object.calibrate(
        model_thetas, model_lambdas, min_samples_leaf=min_samples_leaf, K=K, n_estimators=n_estimators
    )
    
    # loforest
    loforest_cutoffs = loforest_object.compute_cutoffs(thetas.reshape(-1, 1), min_samples = min_samples, step = step, pct_cover = pct_cover)
    end_time = time.time()

    running_time = end_time - start_time
    print(f"Loforest took {running_time} seconds to run.")

    # dictionary of quantiles
    if tune_k == True:
        K = "tuned"
    
    print(f"Tuned K: {loforest_object.K}")
        
    quantile_dict = {
        "loforest_{}".format(K) : loforest_cutoffs,
    }

    return quantile_dict

First, we will test the tuned loforest performance and running time:

In [56]:
def evaluate_coverage_N_tuned_loforest(
        seed = 45,
        n_out = 300,
        n = 1000,
        N = np.array([1, 10, 20, 50]), 
        B = 5000, 
        alpha = 0.05,
        sigma=0.25, 
        min_samples_leaf=300, 
        K=50,
        tune_K = True,
        min_samples = 300,
        pct_cover = 0.95,
        step = 5):
    rng = np.random.default_rng(seed)
    # generate testing grid
    thetas = np.linspace(-4.999, 4.999, n_out)
    N_list = []
    mae_list = []
    se_list = []
    B_list = []
    methods_list = []

    for N_fixed in N:
        err_data = np.zeros((thetas.shape[0], 1))

        # Obtain the quantiles
        quantiles_dict = obtain_quantiles(
            thetas = thetas, 
            N = N_fixed, 
            rng = rng, 
            B=B, 
            alpha=alpha, 
            sigma=sigma, 
            min_samples_leaf=min_samples_leaf, 
            K=K, 
            tune_k = tune_K,
            step = step,
            pct_cover = pct_cover,
            min_samples = min_samples
            )

        # Check if the true lambda values fall within the predicted quantiles
        i = 0
        for theta in thetas:
            lambda_stat = sim_lambda(
                B=n,
                N=N_fixed,
                theta=theta,
                sigma=sigma,
                rng = rng,
                )
            if tune_K:
                coverage = np.mean(lambda_stat <= quantiles_dict["loforest_tuned"][i])
            else:
                coverage = np.mean(lambda_stat <= quantiles_dict["loforest_{}".format(K)][i])

            err_data[i, :] = np.array([np.abs(coverage - (1 - alpha))])
            i += 1
        
        if tune_K:
            methods_list.extend(["loforest_tuned"])
        else:
            methods_list.extend(["loforest_{}".format(K)])
        mae_list.extend(np.mean(err_data, axis=0).tolist())
        se_list.extend((np.std(err_data, axis=0) / (np.sqrt(thetas.shape[0]))).tolist())
        N_list.extend([N_fixed])
        B_list.extend([B])
    
    if tune_K:
        stats_data = pd.DataFrame(
        {
            "N": N_list,
            "B": B_list,
            "MAE": mae_list,
            "se": se_list,
        }
    )
    else:
        stats_data = pd.DataFrame(
        {
            "methods": methods_list,
            "N": N_list,
            "B": B_list,
            "MAE": mae_list,
            "se": se_list,
        }
    )
    return stats_data

Fixing $K = 50$ and comparing to the tuned one:

In [57]:
start_time = time.time()
stats_df = evaluate_coverage_N_tuned_loforest(B = 5000, n = 2000, min_samples = 100, pct_cover = 0.95, min_samples_leaf = 100, K = 50, tune_K = False)
end_time = time.time()

running_time = end_time - start_time
print(f"The simulation took {running_time} seconds to run.")

Loforest took 1.8778064250946045 seconds to run.
Tuned K: 50
Loforest took 1.8969686031341553 seconds to run.
Tuned K: 50
Loforest took 1.8711588382720947 seconds to run.
Tuned K: 50
Loforest took 1.8574192523956299 seconds to run.
Tuned K: 50
The simulation took 151.43603992462158 seconds to run.


In [58]:
stats_df

Unnamed: 0,methods,N,B,MAE,se
0,loforest_50,1,5000,0.07051,0.005096
1,loforest_50,10,5000,0.072573,0.004337
2,loforest_50,20,5000,0.055515,0.002296
3,loforest_50,50,5000,0.050728,0.002102


Now, analysing the tuning results:

In [59]:
start_time = time.time()
stats_df = evaluate_coverage_N_tuned_loforest(B = 5000, n = 2000, min_samples = 100, pct_cover = 1, min_samples_leaf = 100, K = 50, tune_K = True)
end_time = time.time()

running_time = end_time - start_time
print(f"The simulation took {running_time} seconds to run.")

Loforest took 1.9047048091888428 seconds to run.
Tuned K: 105
Loforest took 1.7245514392852783 seconds to run.
Tuned K: 105
Loforest took 1.8848514556884766 seconds to run.
Tuned K: 110
Loforest took 1.794950246810913 seconds to run.
Tuned K: 115
The simulation took 150.63796424865723 seconds to run.


In [60]:
stats_df

Unnamed: 0,N,B,MAE,se
0,1,5000,0.070567,0.005105
1,10,5000,0.072573,0.004337
2,20,5000,0.055515,0.002296
3,50,5000,0.053462,0.002353


## Exponential setting:

In [61]:
def sim_lambda(theta, rng, B = 1000, N = 100):
    lambdas = np.zeros(B)
    theoretical = np.e**(-theta)
    for k in range(0, B):
        exp = rng.exponential(1/theta, N)
        empirical = len([i for i in exp if i > 1])/len(exp)
        lambdas[k] = np.abs(theoretical - empirical)
    return lambdas

def train_naive(alpha, rng, B = 1000, N = 100, naive_n = 500, lower = 0.0001, upper = 6.9999):
    # simulating by a fixed theta_grid with size compatible with the amount of samples 
    # we want to simulate
    n_grid = int(B / naive_n)
    if n_grid > 1:
        step = (upper - lower)/n_grid
        thetas_fixed = np.arange(lower, upper, step)
    else:
        step = (upper - lower)/2
        thetas_fixed = np.array([np.arange(lower, upper, step)[1]])
      
    thetas_fixed = np.linspace(lower, upper, n_grid)
    
    quantiles = {}
    for theta in thetas_fixed:
        diff = sim_lambda(theta, B = n_grid, N = N, rng = rng)
        quantiles[theta] = np.quantile(diff, q = 1 - alpha)
    return quantiles

def predict_naive_quantile(theta_grid, quantiles_dict):
    thetas_values = np.array(list(quantiles_dict.keys()))
    quantiles_list = []
    for theta in theta_grid:
        idx = thetas_values[int(np.argmin(np.abs(theta - thetas_values)))]
        quantiles_list.append(quantiles_dict[idx])
    return quantiles_list
  
def generate_parameters_random(rng, B = 5000, N = 1000):
    random_theta_grid = rng.uniform(0, 7, B)
    lambdas = np.zeros(B)
    i = 0
    for theta in random_theta_grid:
        theoretical = np.e**(-theta)
        exp = rng.exponential(1/theta, N)
        empirical = (len([i for i in exp if i > 1])/len(exp))
        lambdas[i] = np.abs(theoretical - empirical)
        i += 1
    return random_theta_grid, lambdas

def obtain_quantiles(
    thetas,
    N,
    rng,
    B=1000,
    alpha=0.05,
    min_samples_leaf=300,
    K = 50,
    tune_k = True,
    step = 5,
    pct_cover = 0.95,
    min_samples = 300
):
    # simulating to fit models
    theta_sim, model_lambdas = generate_parameters_random(B = B, rng = rng, N = N)
    model_thetas = theta_sim.reshape(-1, 1)

    # loforest quantiles
    start_time = time.time()
    loforest_object = ConformalLoforest(
        LambdaScore, None, alpha=alpha, is_fitted=True, split_calib=False, tune_K = tune_k
    )
    loforest_object.calibrate(
        model_thetas, model_lambdas, min_samples_leaf=min_samples_leaf, K=K, n_estimators=200
    )
    
    # loforest
    loforest_cutoffs = loforest_object.compute_cutoffs(thetas.reshape(-1, 1), 
                                                       step = step, 
                                                       pct_cover = pct_cover, 
                                                       min_samples = min_samples)
    end_time = time.time()

    running_time = end_time - start_time
    print(f"Loforest took {running_time} seconds to run.")

    # dictionary of quantiles
    if tune_k == True:
        K = "tuned"
    
    print(f"Tuned K: {loforest_object.K}")
        
    quantile_dict = {
        "loforest_{}".format(K) : loforest_cutoffs,
    }

    return quantile_dict

In [62]:
def evaluate_coverage_N_tuned_loforest(
        seed = 45,
        n_out = 500,
        n = 1000,
        N = np.array([1, 10, 20, 50, 100, 1000]), 
        B = 5000, 
        alpha = 0.05,
        min_samples_leaf=300, 
        K=50,
        tune_K = True,
        pct_cover = 0.95,
        step = 5,
        min_samples = 300,):
    rng = np.random.default_rng(seed)
    # generate testing grid
    thetas = np.linspace(0.0001, 6.9999, n_out)
    N_list = []
    mae_list = []
    se_list = []
    B_list = []
    methods_list = []

    for N_fixed in N:
        err_data = np.zeros((thetas.shape[0], 1))

        # Obtain the quantiles
        quantiles_dict = obtain_quantiles(
            thetas, 
            N_fixed, 
            rng, 
            B=B, 
            alpha=alpha, 
            min_samples_leaf=min_samples_leaf, 
            K=K, 
            tune_k = tune_K, 
            pct_cover= pct_cover,
            step = step,
            min_samples = min_samples,)

        # Check if the true lambda values fall within the predicted quantiles
        i = 0
        for theta in thetas:
            lambda_stat = sim_lambda(
                B=n,
                N=N_fixed,
                theta=theta,
                rng = rng,
                )
            if tune_K:
                coverage = np.mean(lambda_stat <= quantiles_dict["loforest_tuned"][i])
            else:
                coverage = np.mean(lambda_stat <= quantiles_dict["loforest_{}".format(K)][i])

            err_data[i, :] = np.array([np.abs(coverage - (1 - alpha))])
            i += 1
        
        if tune_K:
            methods_list.extend(["loforest_tuned"])
        else:
            methods_list.extend(["loforest_{}".format(K)])
        mae_list.extend(np.mean(err_data, axis=0).tolist())
        se_list.extend((np.std(err_data, axis=0) / (np.sqrt(thetas.shape[0]))).tolist())
        N_list.extend([N_fixed])
        B_list.extend([B])
    
    if tune_K:
        stats_data = pd.DataFrame(
        {
            "N": N_list,
            "B": B_list,
            "MAE": mae_list,
            "se": se_list,
        }
    )
    else:
        stats_data = pd.DataFrame(
        {
            "methods": methods_list,
            "N": N_list,
            "B": B_list,
            "MAE": mae_list,
            "se": se_list,
        }
    )
    return stats_data

In [63]:
start_time = time.time()
stats_df = evaluate_coverage_N_tuned_loforest(B = 5000, n = 2000, min_samples = 100, pct_cover = 1, min_samples_leaf = 100, K = 50, tune_K = False)
end_time = time.time()

running_time = end_time - start_time
print(f"The simulation took {running_time} seconds to run.")

Loforest took 4.179127931594849 seconds to run.
Tuned K: 50
Loforest took 4.271272420883179 seconds to run.
Tuned K: 50
Loforest took 4.22680139541626 seconds to run.
Tuned K: 50
Loforest took 4.186808824539185 seconds to run.
Tuned K: 50
Loforest took 4.1800618171691895 seconds to run.
Tuned K: 50
Loforest took 4.2613749504089355 seconds to run.
Tuned K: 50
The simulation took 110.05889654159546 seconds to run.


In [64]:
stats_df

Unnamed: 0,methods,N,B,MAE,se
0,loforest_50,1,5000,0.064404,0.00282
1,loforest_50,10,5000,0.060228,0.001914
2,loforest_50,20,5000,0.063925,0.002138
3,loforest_50,50,5000,0.068529,0.002256
4,loforest_50,100,5000,0.067488,0.002172
5,loforest_50,1000,5000,0.057014,0.001389


In [65]:
start_time = time.time()
stats_df = evaluate_coverage_N_tuned_loforest(B = 5000, n = 2000, min_samples = 100, pct_cover = 1, min_samples_leaf = 100, K = 50, tune_K = True)
end_time = time.time()

running_time = end_time - start_time
print(f"The simulation took {running_time} seconds to run.")

Loforest took 4.302614212036133 seconds to run.
Tuned K: 100
Loforest took 4.162890672683716 seconds to run.
Tuned K: 105
Loforest took 3.89399790763855 seconds to run.
Tuned K: 110
Loforest took 3.985732316970825 seconds to run.
Tuned K: 105
Loforest took 3.924908399581909 seconds to run.
Tuned K: 100
Loforest took 4.084616661071777 seconds to run.
Tuned K: 105
The simulation took 106.88781714439392 seconds to run.


In [66]:
stats_df

Unnamed: 0,N,B,MAE,se
0,1,5000,0.066452,0.00298
1,10,5000,0.062445,0.002209
2,20,5000,0.063827,0.002122
3,50,5000,0.07297,0.002569
4,100,5000,0.06883,0.002271
5,1000,5000,0.056691,0.001365
