In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm

from tqdm import tqdm
from sklearn.linear_model import LinearRegression
import time

# SETTING 1

In [118]:
def split_conformal_bands(predictor, 
                          X_test,  
                          X_cal, 
                          y_cal,
                          qiantiles,
                          alpha = 0.95
):
    cal_mean_predictions = predictor.predict(X_cal)
    S_score = np.abs(y_cal - cal_mean_predictions)
    quantile = np.nanquantile(S_score, 1 - alpha, method = "higher")
    
    mean_predictions = predictor.predict(X_test)
    prediction_bands = np.stack([
        mean_predictions - quantile,
        mean_predictions + quantile
    ], axis = 1)
    
    return prediction_bands, quantile, mean_predictions

In [134]:
def weighted_split_conformal_prediction(predictor, X_cal, y_cal, X_test, cal_weights, alpha=0.95):
    """ Weighted Split Conformal Prediction (taken from github.io/code/nonexchangeable_conformal.zip) """

    # normalize weights (we add +1 in the denominator for the test point at n+1)
    weights_normalized = cal_weights / (np.sum(cal_weights)+1)

    if(np.sum(weights_normalized) >= 1-alpha):
        # calibration scores: |y_i - x_i @ betahat|
        R = np.abs(y_cal - predictor.predict(X_cal))
        ord_R = np.argsort(R)
        # from when are the cumulative quantiles at least 1-\alpha
        ind_thresh = np.min(np.where(np.cumsum(weights_normalized[ord_R])>=1-alpha))
        # get the corresponding residual
        quantile = np.sort(R)[ind_thresh]
    else:
        quantile = np.inf
    
    # Standard prediction intervals using the absolute residual score quantile
    mean_prediction = predictor.predict(X_test)
    prediction_bands = np.stack([
        mean_prediction - quantile,
        mean_prediction + quantile
    ], axis=1)

    return mean_prediction, prediction_bands, quantile

# SIMULATION STUDY
$$\textbf{Setting 1: i.i.d. data} X_i \sim \mathcal{N}(0, \textbf{I}_{4}) \text{  and  } Y_i \sim X_i^T\beta + \mathcal{N}(0, 1) $$

We first let $\beta = (2, 1, 0, 0)$


In [None]:
np.random.seed(12345)
N = 1000
beta = [2, 1, 0, 0]
alpha = 0.1
ntrial = 200
train_lag = 100


rho = rho_ls = 0.99
# Data generation
X = np.random.randn(N, 4)
Y = np.dot(X, beta) + np.random.randn(N)

# Methods
methods = ["CP+LS", "NexCP+LS", "NexCP+WLS"]

# Initialize prediction intervals
PI_split_CP = np.zeros((len(methods), ntrial, N, 2))
PI_split_CP[:, :, :train_lag, 0] = -np.inf
PI_split_CP[:, :, :train_lag, 1] = np.inf

# Main loop
for trial in tqdm(np.arange(ntrial)):
    for pred_idx in range(train_lag, N):
        for method_idx, method in enumerate(methods):
            if method == "CP+LS":
                tags = np.ones(pred_idx)
            else:
                weights = rho ** np.arange(pred_idx, 0, -1)
                weights = np.append(weights, 1)  # Fix append

            if method == "NexCP+WLS":
                tags = rho_ls ** np.arange(pred_idx, -1, -1)
            else:
                tags = np.ones(pred_idx)  # Default tags for other methods

            idx_odd = np.arange(1, int(np.ceil(pred_idx / 2) * 2), 2)
            idx_even = np.arange(0, int(np.floor(pred_idx / 2) * 2), 2)

            predictor = LinearRegression()
            predictor.fit(X[idx_odd], Y[idx_odd], tags[idx_odd])  # Correct indexing

            mean_predictions, prediction_bands, quantile = weighted_split_conformal_prediction(
                predictor,
                X[idx_even],
                Y[idx_even],
                X[pred_idx][np.newaxis, :],  # Correct indexing
                weights[idx_even],
                alpha
            )

            PI_split_CP[method_idx, trial, pred_idx, :] = prediction_bands

print("Finished processing.")


  0%|          | 0/200 [00:00<?, ?it/s]


IndexError: index 5 is out of bounds for axis 1 with size 4

In [None]:
Y.shape

(1000,)