In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 1. generate synthetic dataset

np.random.seed(0)
n_train, n_calib = 100, 100
X = np.random.uniform(-5, 5, size=(n_train + n_calib, 1))
y = 2 * X.squeeze() + np.random.normal(0, 1, size=(n_train + n_calib))

X_train, y_train = X[:n_train], y[:n_train]
X_calib, y_calib = X[n_train:], y[n_train:]

# 2. fit base regression model

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train, y_train)

# 3. define coverage policy

class AlphaNet(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.initialize_to_one()
    
    def initialize_to_one(self):
        with torch.no_grad():
            self.fc2.weight.data.normal_(0, 0.01)
            self.fc2.bias.data.fill_(5.0)  # sigmoid(5) ≈ 0.993
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        return self.sigmoid(self.fc2(x)).squeeze(-1)

def interval_size(scores, alpha):
    """
    scores: tensor of shape (n,) -> S(X_j, Y_j)
    alpha: scalar
    returns: 2*r, the size of the conformal interval
    """
    n = len(scores)
    r = scores.sum() / (alpha * (n + 1) - 1)
    return 2 * r

# 4. build leave-one-out dataset features

train_inputs = []
n_calib = len(X_calib)

for i in range(n_calib):
    loo_X = np.delete(X_calib, i, axis=0)
    loo_y = np.delete(y_calib, i, axis=0)
    scores = np.abs(model.predict(loo_X) - loo_y)

    # feature vector: [sum of LOO scores] only
    input_feat = torch.tensor([scores.sum()], dtype=torch.float32)
    train_inputs.append(input_feat)

X_train_alpha = torch.stack(train_inputs)  # shape (n_calib, 1)

# 5. training function

def train_alpha_net(lambda_reg, run_id=0, num_epochs=50, batch_size=32):
    train_dataset = TensorDataset(X_train_alpha, torch.arange(len(X_train_alpha)))  # dummy y
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    alpha_net = AlphaNet(input_dim=1).to(device)
    optimizer = optim.Adam(alpha_net.parameters(), lr=1e-3)
    
    all_losses, all_sizes, all_alphas = [], [], []

    for epoch in range(num_epochs):
        total_loss = 0
        total_size = 0
        total_alpha = 0
        
        for x_batch, idx_batch in train_loader:
            x_batch = x_batch.to(device)
            alpha_pred = alpha_net(x_batch)
            
            batch_sizes = []
            for j, idx in enumerate(idx_batch):
                loo_X = np.delete(X_calib, idx.item(), axis=0)
                loo_y = np.delete(y_calib, idx.item(), axis=0)
                scores = torch.tensor(np.abs(model.predict(loo_X) - loo_y), dtype=torch.float32).to(device)
                size_j = interval_size(scores, alpha_pred[j])
                batch_sizes.append(size_j)
            
            batch_sizes = torch.stack(batch_sizes)
            loss = (batch_sizes + lambda_reg * alpha_pred).mean()
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            total_size += batch_sizes.mean().item()
            total_alpha += alpha_pred.mean().item()
        
        all_losses.append(total_loss / len(train_loader))
        all_sizes.append(total_size / len(train_loader))
        all_alphas.append(total_alpha / len(train_loader))
        
        if epoch % 5 == 0 or epoch == num_epochs-1:
            print(f"Lambda {lambda_reg} | Run {run_id+1} | Epoch {epoch}/{num_epochs} | "
                  f"Loss: {all_losses[-1]:.4f} | Mean Size: {all_sizes[-1]:.4f} | Mean Alpha: {all_alphas[-1]:.4f}")
    
    return np.array(all_losses), np.array(all_sizes), np.array(all_alphas), alpha_net  

# 6. run multiple lambdas and runs

lambdas = [10, 20, 50]
num_runs = 4
all_results = {}

for lam in lambdas:
    losses_runs = []
    sizes_runs = []
    alphas_runs = []
    for run in range(num_runs):
        np.random.seed(run)
        torch.manual_seed(run)
        print(f"\nStarting Lambda {lam} | Run {run+1}/{num_runs}")
        losses, sizes, alphas, alpha_net = train_alpha_net(lambda_reg=lam, run_id=run)
        losses_runs.append(losses)
        sizes_runs.append(sizes)
        alphas_runs.append(alphas)
    all_results[lam] = {
        'losses': np.array(losses_runs),
        'sizes': np.array(sizes_runs),
        'alphas': np.array(alphas_runs)
    }


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.ticker import AutoMinorLocator
from tueplots import bundles

# extract arrays from all_results
all_losses_10 = all_results[10]['losses']
all_sizes_10 = all_results[10]['sizes']
all_alphas_10 = all_results[10]['alphas']

all_losses_20 = all_results[20]['losses']
all_sizes_20 = all_results[20]['sizes']
all_alphas_20 = all_results[20]['alphas']

all_losses_50 = all_results[50]['losses']
all_sizes_50 = all_results[50]['sizes']
all_alphas_50 = all_results[50]['alphas']

# define moving average smoothing function
def moving_average(arr, window_size):
    return np.convolve(arr, np.ones(window_size)/window_size, mode='valid')

# smooth each run first, then average and std across runs
def smooth_all(arr_2d, window):
    smoothed_runs = []
    for run in arr_2d:
        smoothed = moving_average(run, window)
        smoothed_runs.append(smoothed)
    smoothed_runs = np.array(smoothed_runs)
    mean = np.nanmean(smoothed_runs, axis=0)
    std = np.nanstd(smoothed_runs, axis=0)
    return mean, std

window_size = 10

# compute smoothed means and stds
mean_loss_10, std_loss_10 = smooth_all(all_losses_10, window_size)
mean_loss_20, std_loss_20 = smooth_all(all_losses_20, window_size)
mean_loss_50, std_loss_50 = smooth_all(all_losses_50, window_size)

mean_size_10, std_size_10 = smooth_all(all_sizes_10, window_size)
mean_size_20, std_size_20 = smooth_all(all_sizes_20, window_size)
mean_size_50, std_size_50 = smooth_all(all_sizes_50, window_size)

mean_alpha_10, std_alpha_10 = smooth_all(all_alphas_10, window_size)
mean_alpha_20, std_alpha_20 = smooth_all(all_alphas_20, window_size)
mean_alpha_50, std_alpha_50 = smooth_all(all_alphas_50, window_size)

# adjust epochs due to smoothing window
epochs = np.arange(1, all_losses_10.shape[1] + 1)
epochs_smoothed = epochs[:len(mean_loss_10)]

# style setup
plt.rcParams.update(bundles.icml2024())
plt.rcParams.update({
    "axes.labelsize": 18,
    "axes.titlesize": 18,
    "xtick.labelsize": 16,
    "ytick.labelsize": 16,
    "legend.fontsize": 16,
    "lines.linewidth": 2,
    "axes.linewidth": 2,
})

fig, axs = plt.subplots(1, 3, figsize=(14, 4))

# colors for lambda = 10, 20, 50
color_10 = "#0077bb"
color_20 = "#cc3311"
color_50 = "#44aa99"

# plot 1: loss
axs[0].plot(epochs_smoothed, mean_loss_10, label=r'$\lambda=10$', color=color_10)
axs[0].fill_between(epochs_smoothed, mean_loss_10-std_loss_10, mean_loss_10+std_loss_10, color=color_10, alpha=0.3)

axs[0].plot(epochs_smoothed, mean_loss_20, label=r'$\lambda=20$', color=color_20)
axs[0].fill_between(epochs_smoothed, mean_loss_20-std_loss_20, mean_loss_20+std_loss_20, color=color_20, alpha=0.3)

axs[0].plot(epochs_smoothed, mean_loss_50, label=r'$\lambda=50$', color=color_50)
axs[0].fill_between(epochs_smoothed, mean_loss_50-std_loss_50, mean_loss_50+std_loss_50, color=color_50, alpha=0.3)

axs[0].set_xlabel("Epoch")
axs[0].set_ylabel("Loss")
axs[0].set_title("Training Loss")
axs[0].grid(True)

# plot 2: mean size 
axs[1].plot(epochs_smoothed, mean_size_10, label=r'$\lambda=10$', color=color_10)
axs[1].fill_between(epochs_smoothed, mean_size_10-std_size_10, mean_size_10+std_size_10, color=color_10, alpha=0.3)

axs[1].plot(epochs_smoothed, mean_size_20, label=r'$\lambda=20$', color=color_20)
axs[1].fill_between(epochs_smoothed, mean_size_20-std_size_20, mean_size_20+std_size_20, color=color_20, alpha=0.3)

axs[1].plot(epochs_smoothed, mean_size_50, label=r'$\lambda=50$', color=color_50)
axs[1].fill_between(epochs_smoothed, mean_size_50-std_size_50, mean_size_50+std_size_50, color=color_50, alpha=0.3)

axs[1].set_xlabel("Epoch")
axs[1].set_ylabel("Mean Size")
axs[1].set_title("Mean Size per Epoch")
axs[1].grid(True)

# plot 3: mean miscoverage
axs[2].plot(epochs_smoothed, mean_alpha_10, label=r'$\lambda=10$', color=color_10)
axs[2].fill_between(epochs_smoothed, mean_alpha_10-std_alpha_10, mean_alpha_10+std_alpha_10, color=color_10, alpha=0.3)

axs[2].plot(epochs_smoothed, mean_alpha_20, label=r'$\lambda=20$', color=color_20)
axs[2].fill_between(epochs_smoothed, mean_alpha_20-std_alpha_20, mean_alpha_20+std_alpha_20, color=color_20, alpha=0.3)

axs[2].plot(epochs_smoothed, mean_alpha_50, label=r'$\lambda=50$', color=color_50)
axs[2].fill_between(epochs_smoothed, mean_alpha_50-std_alpha_50, mean_alpha_50+std_alpha_50, color=color_50, alpha=0.3)

axs[2].set_xlabel("Epoch")
axs[2].set_ylabel(r"Mean $\tilde\alpha$")
axs[2].set_title(r"Mean $\tilde\alpha$ per Epoch")
axs[2].grid(True)
axs[2].legend(frameon=True)

for ax in axs:
    ax.xaxis.set_minor_locator(AutoMinorLocator(5))
    ax.yaxis.set_minor_locator(AutoMinorLocator(2))
    ax.tick_params(which='both', length=4)
    ax.tick_params(which='minor', length=2, width=1.5)
    ax.tick_params(which='major', width=2)

plt.tight_layout()
plt.savefig("plots/reg_training.pdf", format="pdf", bbox_inches="tight")
plt.show()


In [None]:
import matplotlib.pyplot as plt
from tueplots import bundles
from matplotlib.ticker import AutoMinorLocator

# style setup
plt.rcParams.update(bundles.icml2024())
plt.rcParams.update({
    "axes.labelsize": 18,
    "axes.titlesize": 18,
    "xtick.labelsize": 16,
    "ytick.labelsize": 16,
    "legend.fontsize": 16,
    "lines.linewidth": 2,
    "axes.linewidth": 2,
})

fig, ax = plt.subplots(figsize=(6, 5))

# plot training data
ax.scatter(X_train, y_train, color="#0077bb", alpha=0.7, label="Training Data")

# plot fitted regression line
x_line = np.linspace(-5, 5, 200).reshape(-1, 1)
y_pred_line = model.predict(x_line)
ax.plot(x_line, y_pred_line, color="black", linewidth=2, label=r"$f$")

ax.set_xlabel("X")
ax.set_ylabel("y")
ax.legend(frameon=True)
ax.grid(True)

ax.xaxis.set_minor_locator(AutoMinorLocator(5))
ax.yaxis.set_minor_locator(AutoMinorLocator(2))
ax.tick_params(which='both', length=4)
ax.tick_params(which='minor', length=2, width=1.5)
ax.tick_params(which='major', width=2)

plt.tight_layout()
plt.savefig("plots/reg_data.pdf", format="pdf", bbox_inches="tight")
plt.show()


In [None]:
# 7. evaluate conformal sets on test points using interval_size

alpha_net.eval()

# calibration scores
scores_calib = np.abs(model.predict(X_calib) - y_calib)
sum_scores = scores_calib.sum()

# generate test features and labels
X_test = np.random.uniform(-5, 5, size=(100, 1))
y_test = 2 * X_test.squeeze() + np.random.normal(0, 1, size=100)

feat = torch.tensor([sum_scores], dtype=torch.float32).to(device)
alpha_val = alpha_net(feat).item()

# compute conformal interval size
size = interval_size(torch.tensor(scores_calib, dtype=torch.float32), alpha_val)

# 8. plot size

print("Mean conformal set size:", size)


In [None]:
# 9. evaluate conformal sets on calibration points (LOO)

sizes_calib = []
n_calib = len(X_calib)

for i in range(n_calib):
    # leave-one-out scores
    loo_X = np.delete(X_calib, i, axis=0)
    loo_y = np.delete(y_calib, i, axis=0)
    scores_loo = np.abs(model.predict(loo_X) - loo_y)

    # feature vector for AlphaNet: only sum of LOO scores
    feat_i = torch.tensor([scores_loo.sum()], dtype=torch.float32).to(device)
    alpha_val = alpha_net(feat_i).item()

    # compute conformal set size
    size_i = interval_size(torch.tensor(scores_loo, dtype=torch.float32), alpha_val)
    sizes_calib.append(size_i)

# 10. plot histogram and mean size 

import matplotlib.pyplot as plt

plt.hist(sizes_calib, bins=10, edgecolor='k')
plt.xlabel("Conformal set size")
plt.ylabel("Frequency")
plt.title("Histogram of conformal set sizes (Calibration points, LOO)")
plt.show()

print("Mean conformal set size (calibration, LOO):", np.mean(sizes_calib))
