In [1]:
from typing import List, Tuple

# Conformal prediction from human preferences

In this notebook I aim to explore how we can use conformal prediction to make model-free risk-controlled prediction from human preferences. We will start from a simple case study.

# Simple case

For our simple case, we will consider input data of the form $x_1,\ldots,x_d$ and will define a utility function $U(x_1,\ldots,x_d) = u$ as a low degree polynomial function. For example a linear function. To make it simpler, we will restrict all the coefficients to be in the range $[-1/d,1/d]$, and the input features to be in the range $[-1,1]$, so that the output is between $-1$ and $1$.

In [2]:
def U(x, coefficients):
    result = 0
    for i in range(len(coefficients)):
        result += coefficients[i] * x[i]
    return result/len(coefficients)

Next we need a model to predict the utility function, or in other words, fit a model to replicate the behavior of $U(a)-U(b)$. The output of the model will be a softmax distribution over bins between $-1$ and $1$.
We will follow Pytorch Lightning's [LightningModule](https://pytorch-lightning.readthedocs.io/en/stable/common/lightning_module.html) to define our model.

In [3]:
import pytorch_lightning as pl
import torch.nn as nn
import torch.nn.functional as F
import torch

class LitModel(pl.LightningModule):

    """ PyTorch Lightning model.
    Outputs the probability that model U(a,b) is in bin i.
    
    Args:
        input_features (int): Number of input features of each of the two inputs.
        output_predictions (int): Number of output prediction bins.
        hidden_dim (int): Number of hidden units in the hidden layer.
        layers (int): Number of hidden layers.
    """

    def __init__(self, input_features, output_predictions, hidden_dim=128, layers = 3):
        self.input_features = input_features
        self.output_predictions = output_predictions
        self.hidden_dim = hidden_dim
        self.layers = layers
        super().__init__()

        self.initial = nn.Sequential(
            nn.Linear(2*self.input_features, self.hidden_dim),
            nn.ReLU()
        )

        self.backbone_block = nn.Sequential(
            nn.Linear(self.hidden_dim, self.hidden_dim),
            nn.ReLU()
        )

        self.head = nn.Sequential(
            nn.Linear(self.hidden_dim, self.output_predictions),
            nn.Softmax()
        )

    def forward(self, x):
        x = self.initial(x)
        for i in range(self.layers):
            x = self.backbone_block(x)
        return self.head(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.kl_div(y_hat, y)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.02)

In [4]:
import torch
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

def create_dataloader(x_list: list, y_list: list):
    tensor_x = torch.Tensor(x_list) # transform to torch tensor
    tensor_y = torch.Tensor(y_list)
    my_dataset = TensorDataset(tensor_x,tensor_y) # create your datset
    return DataLoader(my_dataset) # create your dataloader

def create_predict_dataloader(x_list: list):
    tensor_x = torch.Tensor(x_list) # transform to torch tensor
    return DataLoader(tensor_x) # create your dataloader

The examples will be generated using the following function, which assigns them to bins.

In [5]:
def generate_examples(num_examples, num_features, num_bins):
    """Generates examples of human preferences
    If we decide to use a binary loss function, it is sufficient with num_bins = 2.
    """

    # Generate random coefficients
    coefficients = np.random.uniform(-1, 1, num_features)

    # Generate random inputs
    x0 = np.random.uniform(-1, 1, (num_examples, num_features))
    x1 = np.random.uniform(-1, 1, (num_examples, num_features))

    # Compute the utility of each input
    u = np.array([U(x0[i], coefficients) - U(x1[i], coefficients) for i in range(num_examples)])

    # Compute the bin of each input
    bins = np.array([np.digitize(u[i], np.linspace(-1, 1, num_bins)) for i in range(num_examples)])

    # Create the input list
    x_list = []
    for i in range(num_examples):
        x_list.append(np.concatenate((x0[i], x1[i])))

    # Create the output list
    y_list = []
    for i in range(num_examples):
        y = np.zeros(num_bins)
        y[bins[i]] = 1
        y_list.append(y)

    return x_list, y_list

#generate_examples(10, 2, 20)

We can then train a simple model

In [6]:
num_examples = 1000
num_features = 3
num_bins = 20
x_list, y_list = generate_examples(num_examples = num_examples, num_features = num_features, num_bins = num_bins)
train_loader = create_dataloader(x_list, y_list)
predict_loader = create_predict_dataloader(x_list)
trainer = pl.Trainer(max_epochs=5)
model = LitModel(input_features=num_features, output_predictions=num_bins)

trainer.fit(model, train_dataloaders=train_loader)

  tensor_x = torch.Tensor(x_list) # transform to torch tensor
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name           | Type       | Params
----------------------------------------------
0 | initial        | Sequential | 896   
1 | backbone_block | Sequential | 16.5 K
2 | head           | Sequential | 2.6 K 
----------------------------------------------
20.0 K    Trainable params
0         Non-trainable params
20.0 K    Total params
0.080     Total estimated model params size (MB)
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

  input = module(input)
`Trainer.fit` stopped: `max_epochs=5` reached.


# Conformal prediction

Remember that we defined the loss to be
$$
     \mathcal{L}(U, a_i,b_i,y_i) =  y_i(U(a_i)-U(b_i)) + (1-y_i)(U(b_i)-U(a_i)),
$$
for $y_i \in \{0,1\}$ the true preference, and $U$ the learned utility function.

In the conformal prediction procedure we now follow the following steps:
1. We have to define the set $\mathcal{C}_\alpha$:
$$
\mathcal{C}_\alpha(a_i,b_i) = \{u_i = U(a_i)-U(b_i) \in \mathbb{R}: \rho( u_i )\geq  1-\alpha \}
$$
where $\rho$ is the cumulative distribution function of the model's output distribution.

In [31]:
def C(alpha: float, x_list: torch.Tensor):
    loader = DataLoader(torch.Tensor(x_list))
    predictions = trainer.predict(model,loader)
    p = []
    for prediction in predictions:
        prediction = torch.flatten(prediction)
        p.append(torch.where(prediction > alpha, torch.ones_like(prediction), torch.zeros_like(prediction)))
    return torch.stack(p)

In [32]:
a = C(0.5, x_list)
a

  rank_zero_warn(


Predicting: 1000it [00:00, ?it/s]

  input = module(input)


tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])