In [5]:
import torch
from torch.utils.data import TensorDataset
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from torchcp.regression import Metrics
from torchcp.regression.loss import QuantileLoss
from torchcp.regression.predictors import ACI, CQR
from torchcp.utils import fix_randomness
from torchcp_utils.dataset import build_reg_data
from torchcp_utils.utils import build_regression_model
from torchcp_utils.regression import train

In [16]:

##################################
# Preparing dataset
##################################
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
fix_randomness(seed=42)
X, y = build_reg_data(data_name="temperature")
num_examples = X.shape[0]
T0 = int(num_examples * 0.7)
T1 = T0 + int(num_examples * 0.1)
train_dataset = TensorDataset(torch.from_numpy(X[:T0, :]), torch.from_numpy(y[:T0])) # first 70% of data
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=100, shuffle=True, pin_memory=True)
cal_dataset = TensorDataset(torch.from_numpy(X[T0:T1, :]), torch.from_numpy(y[T0:T1])) # next 10% of data
test_dataset = TensorDataset(torch.from_numpy(X[T1:, :]), torch.from_numpy(y[T1:])) # last 20% of data
cal_data_loader = torch.utils.data.DataLoader(cal_dataset, batch_size=100, shuffle=False, pin_memory=True)
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, pin_memory=True)

# print shapes of train, cal and test datasets
print("train data shape", X[:T0, :].shape)
print("cal data shape", X[T0:T1, :].shape)
print("test data shape", X[T1:, :].shape)


cuda:0
train data shape (13104, 5)
cal data shape (1872, 5)
test data shape (3745, 5)
Shape of X in train_dataset: torch.Size([13104, 5])


In [24]:
# model definition
alpha = 0.1
quantiles = [alpha / 2, 1 - alpha / 2]
model = build_regression_model("NonLinearNet")(X.shape[1], 2, 64, 0.5).to(device)
criterion = QuantileLoss(quantiles)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

epochs = 8
for epoch in range(epochs):
    train(model, device, epoch, train_data_loader, criterion, optimizer)
        
model.eval()        

NonLinearNet(
  (base_model): Sequential(
    (0): Linear(in_features=5, out_features=64, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=64, out_features=64, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.5, inplace=False)
    (6): Linear(in_features=64, out_features=2, bias=True)
  )
)

In [25]:
##################################
# Conformal Quantile Regression
##################################
print("########################## CQR ###########################")

predictor = CQR(model)

# true labels
actual_labels = []
for _, labels in test_data_loader:
    actual_labels.append(labels)
y_actual = torch.cat(actual_labels).numpy()

# calibration and evaluation
predictor.calibrate(cal_data_loader, alpha)
metric_cqr = predictor.evaluate(test_data_loader)
print(metric_cqr)

# prediction set
all_predictions = []
for x_batch, _ in test_data_loader:
    prediction_set = predictor.predict(x_batch)
    all_predictions.append(prediction_set)
all_predictions = torch.cat(all_predictions, dim=0)
# print(all_predictions.shape)

# conformal prediction intervals
prediction_intervals = all_predictions.squeeze(1).detach().cpu().numpy()
lower_bounds = prediction_intervals[:, 0]
upper_bounds = prediction_intervals[:, 1]
indices = np.arange(len(lower_bounds))

# plot
fig = go.Figure([
    go.Scatter(
        name='Lower Bound',
        x=indices,
        y=lower_bounds,
        mode='lines',
        line=dict(width=0.5, color='blue'),
        fillcolor='rgba(135, 206, 235, 0.5)',
        fill='tonexty'
    ),
    go.Scatter(
        name='Upper Bound',
        x=indices,
        y=upper_bounds,
        mode='lines',
        line=dict(width=0.5, color='blue'),
        fillcolor='rgba(135, 206, 235, 0.5)',
        fill='tonexty'
    ),
    go.Scatter(
        name='Actual Values',
        x=indices,
        y=y_actual,
        mode='lines',
        line=dict(color='red')
    )
])

fig.update_layout(
    title='Comparison of Prediction Intervals and Actual Data',
    xaxis_title='Index',
    yaxis_title='Value',
    legend_title='Legend',
    hovermode='x'
)
fig.show()

########################## CQR ###########################
{'Coverage_rate': tensor([0.9015]), 'Average_size': tensor([28.1988])}


In [None]:
##################################
# Adaptive Conformal Inference,
##################################      
print("########################## ACI ###########################")
predictor = ACI(model, 0.0001)
test_y = torch.from_numpy(y[T0:num_examples]).to(device)
predicts = torch.zeros((num_examples - T0, 2)).to(device)
for i in range(num_examples - T0):
    with torch.no_grad():
        cal_dataset = TensorDataset(torch.from_numpy(X[i:(T0 + i), :]), torch.from_numpy(y[i:(T0 + i)]))
        cal_data_loader = torch.utils.data.DataLoader(cal_dataset, batch_size=100, shuffle=False, pin_memory=True)
        predictor.calibrate(cal_data_loader, alpha)
        tmp_x = torch.from_numpy(X[(T0 + i), :])
        if i == 0:
            tmp_prediction_intervals = predictor.predict(tmp_x)
        else:
            tmp_prediction_intervals = predictor.predict(tmp_x, test_y[i - 1], predicts[i - 1])
        predicts[i, :] = tmp_prediction_intervals

metrics = Metrics()
print("Evaluating prediction sets...")
print(f"Coverage_rate: {metrics('coverage_rate')(predicts, test_y)}")
print(f"Average_size: {metrics('average_size')(predicts)}")