In [None]:
import os
import pandas as pd
import numpy as np

folder_path = "2-merged-results"
ceb_file = os.path.join(folder_path, "merged_ceb.csv")
other_file = os.path.join(folder_path, "merged_other.csv")

def extract_last_column(file_path):
    if os.path.exists(file_path):
        df = pd.read_csv(file_path)
        last_col_name = df.columns[-1] 
        return df[last_col_name].tolist() 
    else:
        return []

ceb_values = extract_last_column(ceb_file)
job_values = extract_last_column(other_file)

In [None]:
import math
def getC(R,delta):
    # print("delta = ",delta)
    sortedR = sorted(R)
    n = len(sortedR)
    print("total_number: ",n)
    q_hat_index = math.ceil(((n+1)*(1-delta)))
    return sortedR[q_hat_index-1]

In [None]:
E = 0.09
def g(x, e=E):
    return max(0,x-e)

def g_inverse(x, e=E):
    # if (x < 1-e):
    #     return x + e
    # if x > 1-e:
    #     return 1 
    return min(1, x+e)

def shift(c, K):
    return g_inverse(g((1+1/K)*g_inverse(c)))

In [None]:
ceb_C = getC(ceb_values)
print("ceb_C: ",ceb_C)

In [None]:
job_C = getC(job_values)
print("job_C: ",job_C)

In [None]:
def test_robustCP_one_iteration(job_values, ceb_values, delta, debug=True):
    K =300 # Split Index

    np.random.shuffle(job_values)

    # JOB Calibration -> Sorted
    job_calibration = job_values[:K] # K -> calibration data points
    job_calibration = sorted(job_calibration)

    # JOB Test -> Remaining values
    job_test = job_values[K:] # N-K -> test data points

    # CEB Test -> All the data points
    job_calibration_C = getC(job_calibration, delta)
    if debug: print("Original C: ", job_calibration_C)
    
    # Test on JOB
    job_count = 0
    for val in job_test:
        if val<=job_calibration_C:
            job_count+=1
    job_test_valid_rate = job_count / len(job_test)
    if debug: print("=> JOB Test: ",job_test_valid_rate)

    robustCP_shift_quantile=shift(1-delta,K)
    print("RobustCP_shift_quantile: ", robustCP_shift_quantile)
    robustCP_index = math.ceil(((K+1)*robustCP_shift_quantile))
    robustCP_index = min(robustCP_index, len(job_calibration))

    robustCP_new_C = job_calibration[robustCP_index-1]
    if debug: print("RobustCP_new_C: ",robustCP_new_C)

    # Test on CEB
    ceb_count = 0
    for val in ceb_values:
        if val < robustCP_new_C:
            ceb_count +=1
    ceb_test_valid_rate = ceb_count / len(ceb_values)
    print("=> CEB Test: ",ceb_test_valid_rate)
    return ceb_test_valid_rate


# Draw the Quantile

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
def drawing_robustCP_one_iteration(job_values, ceb_values, delta, debug=True):
    K = 300 # Split Index

    np.random.shuffle(job_values)

    # JOB Calibration -> Sorted
    job_calibration = job_values[:K] # K -> calibration data points

    job_calibration = [i for i in job_calibration if i < 5000]
    job_calibration = sorted(job_calibration)

    # JOB Test -> Remaining values
    job_test = job_values[K:] # N-K -> test data points

    # CEB Test -> All the data points
    job_calibration_C = getC(job_calibration, delta)
    if debug: print("Original C: ", job_calibration_C)
    
    # Test on JOB
    job_count = 0
    for val in job_test:
        if val<=job_calibration_C:
            job_count+=1
    job_test_valid_rate = job_count / len(job_test)
    if debug: print("=> JOB Test: ",job_test_valid_rate)

    robustCP_shift_quantile=shift(1-delta,K)
    print("RobustCP_shift_quantile: ", robustCP_shift_quantile)
    robustCP_index = math.ceil(((K+1)*robustCP_shift_quantile))
    robustCP_index = min(robustCP_index, len(job_calibration))

    robustCP_new_C = job_calibration[robustCP_index-1]
    if debug: print("RobustCP_new_C: ",robustCP_new_C)
    
    plt.figure(figsize=(8, 5))
    plt.hist(job_calibration, bins=100, alpha=0.7, color='steelblue', edgecolor='black')
    plt.axvline(job_calibration_C, color='blue', linestyle='-', linewidth=2, label=r'Original C')
    plt.axvline(robustCP_new_C, color='green', linestyle='-', linewidth=2, label="Adjusted $\\tilde{C}$ (Adaptive CP)")
    plt.xlabel("Nonconformity Score")
    plt.ylabel("Frequency")
    plt.legend()
    plt.grid(alpha=0.3)
    plt.title("Distribution of Nonconformity Score with $C$ and $\\tilde{C}$")
    plt.show()

drawing_robustCP_one_iteration(job_values, ceb_values, delta = 0.2)

In [None]:
test_robustCP_one_iteration(job_values, ceb_values, delta = 0.2)

In [None]:
test_robustCP_one_iteration(job_values, ceb_values, delta = 0.2, debug = False)

In [None]:
def test_robustCP_multiples_iterations(job_values, ceb_values, delta = 0.2, iterations=10000):
    import matplotlib.pyplot as plt
    import sys
    import statistics
    class HiddenPrints:
        def __enter__(self):
            self._original_stdout = sys.stdout
            sys.stdout = open('/dev/null', 'w')  # macOS/Linux

        def __exit__(self, exc_type, exc_val, exc_tb):
            
            sys.stdout.close()
            sys.stdout = self._original_stdout 

    ceb_test_valid_rates = []

    with HiddenPrints():  
        ceb_test_valid_rates = []
        for i in range(iterations):
            ans = test_robustCP_one_iteration(job_values, ceb_values, delta, debug = False)
            ceb_test_valid_rates.append(ans)

    avg_valid_rate = sum(ceb_test_valid_rates) / len(ceb_test_valid_rates) if ceb_test_valid_rates else 0
    print("Average CEB Valid Rate:", avg_valid_rate)
    median_valid_rate = statistics.median(ceb_test_valid_rates) if ceb_test_valid_rates else 0
    print("Median CEB Valid Rate:", median_valid_rate)

test_robustCP_multiples_iterations(job_values,ceb_values,delta=0.2)

In [None]:
### Compare with the origianl C
from scipy.stats import gaussian_kde
def original_C_test_robustCP_one_iteration(job_values, ceb_values, delta, debug=True):
    K = 300 # Split Index

    np.random.shuffle(job_values)

    # JOB Calibration -> Sorted
    job_calibration = job_values[:K] # K -> calibration data points
    job_calibration = sorted(job_calibration)

    # CEB Test -> All the data points
    job_calibration_C = getC(job_calibration, delta)

    # Test on CEB
    ceb_count = 0
    for val in ceb_values:
        if val < job_calibration_C:
            ceb_count +=1
    ceb_test_valid_rate = ceb_count / len(ceb_values)
    print("=> CEB Test: ",ceb_test_valid_rate)
    return ceb_test_valid_rate

def original_C_test_robustCP_multiples_iterations(job_values, ceb_values, delta = 0.2, iterations=10000):
    import matplotlib.pyplot as plt
    import sys
    import statistics
    class HiddenPrints:
        def __enter__(self):
            self._original_stdout = sys.stdout
            sys.stdout = open('/dev/null', 'w') 

        def __exit__(self, exc_type, exc_val, exc_tb):
            sys.stdout.close()
            sys.stdout = self._original_stdout 

    ceb_test_valid_rates = []

    with HiddenPrints(): 
        ceb_test_valid_rates = []
        for i in range(iterations):
            ans = original_C_test_robustCP_one_iteration(job_values, ceb_values, delta, debug = False)
            ceb_test_valid_rates.append(ans)

    avg_valid_rate = sum(ceb_test_valid_rates) / len(ceb_test_valid_rates) if ceb_test_valid_rates else 0
    print("Average CEB Valid Rate:", avg_valid_rate)
    median_valid_rate = statistics.median(ceb_test_valid_rates) if ceb_test_valid_rates else 0
    print("Median CEB Valid Rate:", median_valid_rate)


original_C_test_robustCP_multiples_iterations(job_values,ceb_values,delta=0.2) 

In [None]:
test_robustCP_multiples_iterations(job_values,ceb_values,delta=0.1)

In [None]:
test_robustCP_multiples_iterations(job_values,ceb_values,delta=0.2)

In [None]:
test_robustCP_multiples_iterations(job_values,ceb_values,delta=0.3)