In [1]:
from utility.exps import stable_hash, run_real_experiments
from utility.graphing_tools import generate_latex_table
import pandas as pd
import numpy as np
import kagglehub

In [1]:
# Import general packages
import hashlib
import numpy as np
import pandas as pd
import time
from typing import List, Optional

# Import training packages
from sklearn.linear_model import LinearRegression, MultiTaskLasso
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

# Dataset loader
from ucimlrepo import fetch_ucirepo 
from kagglehub import dataset_download
from scipy.io import arff

# Import utility packages
from utility.data_generator import make_multitarget_regression
from utility.res_rescaled import check_coverage_rate, scaled_prediction, standardized_prediction
from utility.unscaled import unscaled_prediction, bonferroni_prediction
from utility.data_splitting import data_splitting_scaled_prediction, data_splitting_standardized_prediction, data_spliting_CHR_prediction
from utility.copula import empirical_copula_prediction
from utility.rectangle import Rectangle

def stable_hash(*args):
    """
    Create a reproducible hash from input arguments, useful for random seeds.

    Parameters
    ----------
    *args : Any
        Sequence of inputs to be hashed.

    Returns
    -------
    int
        A deterministic hash value based on inputs.
    """
    key = "_".join(str(a) for a in args)
    return int(hashlib.sha256(key.encode()).hexdigest(), 16) % (2**32)

def function_choice(scores, alpha, method):
    """
    Select and run the prediction region method based on input string.

    Parameters
    ----------
    scores : np.ndarray
        Score values used to construct prediction regions.
    alpha : float
        Miscoverage level.
    method : str
        The method name specifying which region construction to use.

    Returns
    -------
    depends on method
        Either a single region (Rectangle) or list of regions.
    """

    # Scaled methods
    if method == "Scaled (Full)":
        return scaled_prediction(scores=scores, alpha=alpha, short_cut=False)
    elif method == "Scaled (Shortcut)":
        return scaled_prediction(scores=scores, alpha=alpha, short_cut=True)
    elif method == "Scaled (Data Splitting)":
        return data_splitting_scaled_prediction(scores=scores, alpha=alpha)

    # Standardized methods
    elif method == "Standardized (Full)":
        return standardized_prediction(scores=scores, alpha=alpha, short_cut=False)
    elif method == "Standardized (Shortcut)":
        return standardized_prediction(scores=scores, alpha=alpha, short_cut=True)
    elif method == "Standardized (Data Splitting)":
        return data_splitting_standardized_prediction(scores=scores, alpha=alpha)
    
    # Point CHR
    elif method == "Point CHR":
        return data_spliting_CHR_prediction(scores=scores, alpha=alpha)
    
    # Empirical copuls
    elif method == "Empirical copula":
        return empirical_copula_prediction(scores=scores, alpha=alpha)

    # No scaling methods
    elif method == "Unscaled":
        return unscaled_prediction(scores=scores, alpha=alpha)
    elif method == "Bonferroni":
        return bonferroni_prediction(scores=scores, alpha=alpha)

In [14]:
def run_real_experiments(data, num_splits, alpha = 0.1, cal_size = 0.2, test_size = 0.2):

    methods = [
            "Scaled (Shortcut)", 
            "Scaled (Data Splitting)", 
            "Standardized (Shortcut)", 
            "Standardized (Data Splitting)", 
            "Point CHR", 
            "Empirical copula", 
            "Unscaled", 
            "Bonferroni"]

    # Load data
    if data == "stock":

        stock_portfolio_performance = fetch_ucirepo(id=390)
        
        X = stock_portfolio_performance.data.features
        y = stock_portfolio_performance.data.targets
        X = X.drop(columns=y.columns)
        y = y.map(lambda x: float(x.strip('%'))/100 if isinstance(x, str) and '%' in x else x)

        model = MultiTaskLasso(alpha=0.0001)
    
    if data == "rf1":

        df = arff.loadarff("real_exps/data/rf1.arff")
        df = pd.DataFrame(df[0]).dropna()

        X = (df.iloc[:, :64])
        y = (df.iloc[:, 64:])

        model = RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "rf2":

        df = arff.loadarff("real_exps/data/rf2.arff")
        df = pd.DataFrame(df[0]).dropna()

        X = (df.iloc[:, :576])
        y = (df.iloc[:, 576:])

        model = RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "scm1d":

        df = arff.loadarff("real_exps/data/scm1d.arff")
        df = pd.DataFrame(df[0]).dropna()

        X = (df.iloc[:, :280])
        y = (df.iloc[:, 280:])

        model = RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "scm20d":

        df = arff.loadarff("real_exps/data/scm20d.arff")
        df = pd.DataFrame(df[0]).dropna()

        X = (df.iloc[:, :61])
        y = (df.iloc[:, 61:])

        model = RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "student":

        student_performance = fetch_ucirepo(id=320) 

        X = student_performance.data.features 
        y = student_performance.data.targets 
        categorical_cols = X.select_dtypes(include='object').columns.tolist()
        target_cols = ['G1', 'G2', 'G3']
        # Preprocessing: One-hot encode categorical columns, pass through numerical ones
        preprocessor = ColumnTransformer(
            transformers=[
                ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
            ],
            remainder='passthrough'
        )

        model = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('regressor', RandomForestRegressor(
                n_estimators=200,          
                max_depth=None,            
                min_samples_split=2,       
                min_samples_leaf=1,        
                max_features=1.0,          # Use all features at each split
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            ))
        ])

    if data == "air":

        air_quality = fetch_ucirepo(id=360) 

        target_cols = ["CO(GT)", "NOx(GT)", "NO2(GT)", "C6H6(GT)"]
        df = air_quality.data.features.drop(columns=["Date", "Time", "NMHC(GT)"])
        feature_cols = df.columns.difference(target_cols)
        df[feature_cols] = df[feature_cols].replace(-200, np.nan)
        imputer = SimpleImputer(strategy="mean")
        df[feature_cols] = imputer.fit_transform(df[feature_cols])
        df = df[(df[target_cols] != -200).all(axis=1)]

        X = df.drop(columns=target_cols)
        y = df[target_cols]

        model = RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          # Use all features at each split
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "crime":

        communities_and_crime = fetch_ucirepo(id=211) 

        X = communities_and_crime.data.features.drop(columns="State")
        X = X.loc[:, X.isna().mean() < 0.3] 
        y = communities_and_crime.data.targets 
        y = y.dropna()
        X = X.loc[y.index]

        model= RandomForestRegressor(
                n_estimators=200,          
                max_depth=16,            
                min_samples_split=5,       
                min_samples_leaf=2,        
                max_features="sqrt",          # Use all features at each split
                bootstrap=True,            
                random_state=42,
                n_jobs=-1
            )

    if data == "energy":

        energy_efficiency = fetch_ucirepo(id=242) 
        
        X = energy_efficiency.data.features 
        y = energy_efficiency.data.targets 

        model= RandomForestRegressor(
                n_estimators=200,          
                max_depth=None,            
                min_samples_split=2,       
                min_samples_leaf=1,        
                max_features=1.0,          # Use all features at each split
                bootstrap=True,            
                random_state=77,
                n_jobs=-1
            )

    results = np.zeros((len(methods), 4, num_splits))

    for i in range(num_splits):

        X_train, X_cal_test, y_train, y_cal_test = train_test_split(X, y, test_size=test_size+cal_size, random_state=stable_hash(i))
        X_cal, X_test, y_cal, y_test = train_test_split(X_cal_test, y_cal_test, test_size=test_size/(cal_size+test_size), random_state=stable_hash(i))

        # Train the model
        model.fit(X_train, y_train)

        # Make predictions
        prediction_cal = model.predict(X_cal)
        scores_cal = np.asarray(np.abs(prediction_cal-y_cal), dtype=np.float64)
        prediction_test = model.predict(X_test)
        scores_test = np.asarray(np.abs(prediction_test-y_test), dtype=np.float64)
        n, d = scores_cal.shape

        # Run the methods
        for index, method in enumerate(methods):

            start = time.time()
            prediction = function_choice(scores_cal, alpha, method)
            results[index][0][i] = time.time()-start
            results[index][1][i] = check_coverage_rate(scores_test, prediction)
            results[index][2][i] = prediction.volume()
            results[index][3][i] = np.log10(prediction.volume())
        
    output = []
    for index, method in enumerate(methods):
        row = [
        method,  # String
        scores_cal.shape,  # Tuple
        scores_test.shape,  # Tuple
        np.mean(results[index][1]),  # Float
        np.std(results[index][1], ddof=1),  # Float
        np.mean(results[index][2]),  # Float
        np.std(results[index][2], ddof=1),  # Float
        np.mean(results[index][3]),  # Float
        np.std(results[index][3], ddof=1),  # Float
        np.mean(results[index][0])  # Float
        ]
        output.append(row)
        
    columns=["Methods","cal_size","test_size", "test_coverage_avg", "test_coverage_1std", "coverage_vol", "coverage_vol_1std", "coverage_vol_log", "coverage_vol_log_1std", "runtime_avg"]
        
    return pd.DataFrame(output, columns=columns)

In [15]:
stock = run_real_experiments("stock", 200, cal_size=0.05, test_size=0.2)
stock.to_csv("real_exps/stock.csv")

  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjustments = adj1*base_upper/base_upper[reference_dim]
  scale = base_upper[reference_dim]/base_upper
  adjust

In [16]:
rf2 = run_real_experiments("rf2", 200, cal_size=0.05, test_size=0.2)
rf2.to_csv("real_exps/rf2.csv")

In [17]:
scm1d = run_real_experiments("scm1d", 200, cal_size=0.05, test_size=0.2)
scm1d.to_csv("real_exps/scm1d.csv")

In [18]:
scm20d = run_real_experiments("scm20d", 200, cal_size=0.05, test_size=0.2)
scm20d.to_csv("real_exps/scm20d.csv")

In [19]:
student_results = run_real_experiments("student", 200, cal_size=0.05, test_size=0.2)
student_results.to_csv("real_exps/student5.csv")

In [20]:
energy_results = run_real_experiments("energy", 200, cal_size=0.05, test_size=0.2)
energy_results.to_csv("real_exps/energy.csv")

In [21]:
air_results = run_real_experiments("air", 200, cal_size=0.05, test_size=0.2)
air_results.to_csv("real_exps/air.csv")

In [22]:
crime_results = run_real_experiments("crime", 200, cal_size=0.05, test_size=0.2)
crime_results.to_csv("real_exps/crime.csv")

  x = asanyarray(arr - arrmean)
