In [None]:
# based on the code from Barber et al. (2023) available at https://rinafb.github.io/

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.random.seed(98765)

In [None]:
from sklearn import svm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.multioutput import MultiOutputClassifier
from sklearn.linear_model import LogisticRegression

def get_model(X_train, Y_train_l,mtype='logreg'):
    n_inputs = X_train.shape[-1]
    n_outputs = Y_train_l.shape[-1]
    X_train = X_train.reshape(-1,n_inputs)
    Y_train = Y_train_l.reshape(-1,n_outputs)
    
    if mtype =='logreg':
        model = MultiOutputClassifier(estimator= LogisticRegression())
        
    elif mtype=='svm':
        Y_train = np.argmax(Y_train,axis=1)
        model = svm.SVC()
        
    model.fit(X_train,Y_train)
    return model

In [None]:
# split conformal prediction with logistic regression.
def splitCP(X, Y, x, nclasses, alpha, weights=[], lambdas_probs=False):
    # weights are used for computing quantiles for the prediction interval
    n = len(Y)
    loss_beta = 1.0
    loss_alpha = 0.0
    
    if (len(weights)==0):
        weights = np.ones(n+1)
    if (len(weights)==n):
        weights = np.r_[weights,1]
    
    # odd data points for training, even for calibration
    inds_odd = np.arange(1, int(np.ceil(n/2)*2-1), 2)
    inds_even = np.arange(2, int(np.floor(n/2)*2), 2)
    
    # train model
    model = get_model(X[inds_odd], Y[inds_odd], 'logreg')
    
    if lambdas_probs:
        lambdas = np.linspace(0, 1, 50) # probability threshold
    else:
        lambdas = np.arange(0, nclasses+1, 1)
    
    # compute residual quantile on calibration set
    n_w = np.sum(weights[inds_even])
    r_hats = np.zeros(len(lambdas))

    Ytrue = Y[inds_even] # num_points x num_classes
    
    predictions = np.array(model.predict_proba(X[inds_even]))[:,:,1] # num_classes x num_points
    losses = np.zeros((len(lambdas), len(predictions.T)))
    for li,l in enumerate(lambdas):
        if lambdas_probs:
            fn = np.logical_and(predictions.T < 1-l, Ytrue)
            ind = np.nonzero(np.sum(Ytrue, 1) > 0)[0]
            fnr = np.zeros(len(predictions.T))
            fnr[ind] = np.sum(fn, 1)[ind] / np.sum(Ytrue, 1)[ind]
        else:
            assert(False)
        losses[li, :] = fnr
        ws = np.sum(weights[inds_even]*losses[li])
        r_hats[li] = 1/n_w * ws
    calib_lambdas = (r_hats*n_w/(n_w+1)) + loss_beta/(n_w+1)
    lambda_chosen=nclasses
    for i,li in enumerate(calib_lambdas):
        if li<=alpha:
            lambda_chosen = lambdas[i]
            break
 
    y_pred = np.array(model.predict_proba(x.reshape(1,-1)))[:,:,1]

    if lambdas_probs:
        y_pred_chosen = np.nonzero(y_pred.T[0, :] >= 1-lambda_chosen)[0]
    else:
        y_pred_sorted = np.argsort(y_pred.T)[0]
        y_pred_sorted_inv = y_pred_sorted[::-1]
        y_pred_chosen = y_pred_sorted_inv[:lambda_chosen]
    y_PI = np.zeros((nclasses))
    for y in y_pred_chosen:
        y_PI[y]=1.0
    
    return lambda_chosen,y_PI


In [None]:
# parameters for all settings
np.random.seed(12345)

N = 2000
p = 10
noise_level = .1
avg_classes = 1/3

from scipy.special import erfinv, erf
bias = -0.5

train_lag = 200
ntrial = 1
rho = 0.99
nclasses = p
coefficients_type = 1

X = np.random.normal(size=(ntrial,N,p))
Y = np.zeros((3,ntrial,N,nclasses))
noise = np.random.normal(size=(ntrial,N,nclasses))

# setting 1: i.i.d. data
beta_c1 = np.eye(p) 
Y[0] = X.dot(beta_c1.T) + bias + noise_level*noise

# setting 2: changepoints
changepoints = np.r_[N//4, 3*N//4]
n_changepoint = 2
beta_c2 = [np.zeros_like(beta_c1) for i in range(3)]
for i in range(3):
    if i == 0:
        beta_c2[i][:, :] = beta_c1[:, :]
    else:
        beta_c2[i][:, 1:] = beta_c2[i-1][:, :-1]
        beta_c2[i][:, 0] = beta_c2[i-1][:, -1]

for i in np.arange(3):
    if i == 0:
        ind_min = 0
    else:
        ind_min = changepoints[i-1]
    if i == n_changepoint:
        ind_max = N
    else:
        ind_max = changepoints[i]
        
    Y[1,:,ind_min:ind_max,:] = X[:,ind_min:ind_max,:].dot(beta_c2[i].T) + bias + noise[:,ind_min:ind_max,:]
    
# setting 3: distribution drift
beta_c_start  = beta_c1
beta_c_end = beta_c2[-1]
beta_c3 = beta_c_start + np.outer(np.arange(N)/(N-1),beta_c_end-beta_c_start).reshape(-1,nclasses,p)
for i in np.arange(N):
    Y[2,:,i,:] = X[:,i,:].dot(beta_c3[i].T) + bias + noise[:,i,:]
Ylabels = np.where(Y>0.0,1,0)

setting_names = ['Setting 1 (i.i.d. data)',\
                     'Setting 2 (changepoints)','Setting 3 (distribution drift)']

print(beta_c3[-1])

In [None]:
# run all methods

PI_CP_LS = np.zeros((3,ntrial,N,nclasses))
PI_CP_LS[:,:,:train_lag,:]=np.inf
PI_nexCP_LS = np.copy(PI_CP_LS)

PI_split_CP_LS = np.copy(PI_CP_LS)
PI_split_nexCP_LS = np.copy(PI_CP_LS)
lambdas_chosen = np.zeros((3,ntrial,N))
lambdas_chosen_nex = np.zeros((3,ntrial,N))
alpha = 0.2
for itrial in np.arange(ntrial):
    print(itrial)
    for n in np.arange(train_lag,N):
        if n%100==0:
            print('npoint',n)
        weights=rho**(np.arange(n,0,-1))
        for setting in np.arange(3):
            # CRC
            lambdas_chosen[setting,itrial,n], PI_split_CP_LS[setting,itrial,n,:] = \
                splitCP(X[itrial,:n,:],Ylabels[setting,itrial,:n,:],X[itrial,n,:],nclasses,alpha,lambdas_probs=True)
            # non-x CRC
            lambdas_chosen_nex[setting,itrial,n], PI_split_nexCP_LS[setting,itrial,n,:] = \
                splitCP(X[itrial,:n,:],Ylabels[setting,itrial,:n,:],X[itrial,n,:],nclasses,alpha,lambdas_probs=True,\
                    weights=weights)
            

In [None]:
print(lambdas_chosen.max(),lambdas_chosen_nex.max())

print(lambdas_chosen[1, 0, :])



In [None]:
zero = np.zeros((len(PI_CP_LS[train_lag:])))


fnr_diff = Ylabels[:,:,train_lag:,:]-PI_split_CP_LS[:,:,train_lag:,:]
fnr_idx = fnr_diff==1.0
fnr_idx = fnr_idx.astype(int) # get false negatives
fnr_inst = np.sum(fnr_idx,axis=-1)/np.sum(Ylabels[:,:,train_lag:,:],axis=-1) # compute fnr
fnr_inst[np.isnan(fnr_inst)] = 0.0
fnr_trial_crc = np.mean(fnr_inst,axis=1) # average over trials

fnr_diff_nex = Ylabels[:,:,train_lag:,:]-PI_split_nexCP_LS[:,:,train_lag:,:]
fnr_idx_nex = fnr_diff_nex==1.0
fnr_idx_nex = fnr_idx_nex.astype(int) # get false negatives
fnr_inst_nex1 = np.sum(fnr_idx_nex,axis=-1)
fnr_inst_nex = fnr_inst_nex1/np.sum(Ylabels[:,:,train_lag:,:],axis=-1) # compute fnr
fnr_inst_nex[np.isnan(fnr_inst_nex)] = 0.0
fnr_trial_nex_crc = np.mean(fnr_inst_nex,axis=1) # average over trials

    
PI_width_crc = lambdas_chosen

PI_width_nex_crc = lambdas_chosen_nex

# save results
np.savetxt('simulations_CRC_fnr_trial_'+str(alpha)+'_classes_'+str(nclasses)+'_rho_'+str(rho)+'_ntrial_'+str(ntrial)+'.txt',fnr_trial_crc)
np.savetxt('simulations_CRC_lambdas_'+str(alpha)+'_classes_'+str(nclasses)+'_rho_'+str(rho)+'_ntrial_'+str(ntrial)+'.txt',np.mean(lambdas_chosen,axis=1))
np.savetxt('simulations_nonxCRC_fnr_trial_'+str(alpha)+'_classes_'+str(nclasses)+'_rho_'+str(rho)+'_ntrial_'+str(ntrial)+'.txt',fnr_trial_nex_crc)
np.savetxt('simulations_nonxCRC_lambdas_'+str(alpha)+'_classes_'+str(nclasses)+'_rho_'+str(rho)+'_ntrial_'+str(ntrial)+'.txt',np.mean(lambdas_chosen_nex,axis=1))


In [None]:
for setting in np.arange(3):
    print(np.array([[setting_names[setting],'',''],\
        ['CRC',np.mean(fnr_trial_crc[setting]),np.mean(lambdas_chosen[setting])],\
        ['nonx-CRC',np.mean(fnr_trial_nex_crc[setting]),np.mean(lambdas_chosen_nex[setting])]]))


In [None]:
plt.rcParams.update({'font.size': 20})

window = 30 # will display a rolling average

def rolling_avg(x,window):
    return np.convolve(x,np.ones(window)/window)[(window-1):-window]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# seaborn style
sns.set(style="whitegrid")
pastel_palette = sns.color_palette("deep")
font_size = 20

num_rows = 2
num_cols = 3

# figure with subplots
fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 6))

for i in np.arange(3):
    sm_cov_CP_LS = rolling_avg(fnr_trial_crc[i], window)
    sm_cov_nexCP_LS = rolling_avg(fnr_trial_nex_crc[i], window)

    axes[0, i].plot(np.arange(train_lag + window, N), sm_cov_CP_LS, color=pastel_palette[1], label="CRC")
    axes[0, i].plot(np.arange(train_lag + window, N), sm_cov_nexCP_LS, color=pastel_palette[2], label="Non-X CRC")
    axes[0, i].hlines(alpha, xmin=train_lag + window, xmax=N, linestyles='--', colors='black')
    axes[0, i].set_title(setting_names[i], fontsize=font_size)
    if i==0:
        axes[0, i].set_ylabel('loss', fontsize=font_size)
    axes[0, i].set_ylim([0, 1])

for i in np.arange(3):
    sm_cov_CP_LS = rolling_avg(np.mean(lambdas_chosen[i][:, train_lag:], 0), window)
    sm_cov_nexCP_LS = rolling_avg(np.mean(lambdas_chosen_nex[i][:, train_lag:], 0), window)

    axes[1, i].plot(np.arange(train_lag + window, N), sm_cov_CP_LS, color=pastel_palette[1], label="CRC")
    axes[1, i].plot(np.arange(train_lag + window, N), sm_cov_nexCP_LS, color=pastel_palette[2], label="Non-X CRC")
    if i==0:
        axes[1, i].set_ylabel('lambda', fontsize=font_size)
    axes[1, i].set_xlabel('time', fontsize=font_size)
    axes[1, i].set_ylim([0, 1])

# shared legend below the subplots
handles, labels = axes[0, 0].get_legend_handles_labels()
fig.legend(handles, labels, loc='lower center', ncol=2, bbox_to_anchor=(0.5, -0.1), fontsize=font_size)


plt.tight_layout()
# save plot
plt.savefig('simulations_plot_' + str(alpha) + '_classes_' + str(nclasses) +'_rho_' + str(rho)+ '_ntrial_' + str(ntrial) +'_window_' + str(window) + '.png', dpi=500, bbox_inches='tight')
plt.show()
