[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aangelopoulos/conformal-prediction/blob/main/notebooks/imagenet-smallest-sets.ipynb)

# Conformal prediction for classification purpose
Inspired by the tutorial of Angelopoulos

In [None]:
import requests

exec(requests.get("https://raw.githubusercontent.com/claireBoyer/tutorial-conformal-prediction/main/labs/aux-npt/get-send-code.html").content)

npt_config = {
    'session_name': 'TutoCP',
    'session_owner': 'aymeric',
    'sender_name': input("Your name:"),
}
send('started', 20)


# The ''send'' function enables us to see which question you have reached
# If this cell keeps runnning for more than a minute, restart kernel 
# If it happens again, redefine 
# def send(a,b): return()

import os
import json
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.image import imread
!pip install -U --no-cache-dir gdown --pre

## Data preparation

### Load the data

In [None]:
# Load cached data
if not os.path.exists('../data'):
    os.system('gdown 1h7S6N_Rx7gdfO3ZunzErZy6H7620EbZK -O ../data.tar.gz')
    os.system('tar -xf ../data.tar.gz -C ../')
    os.system('rm ../data.tar.gz')
if not os.path.exists('../data/imagenet/human_readable_labels.json'):
    !wget -nv -O ../data/imagenet/human_readable_labels.json -L https://raw.githubusercontent.com/anishathalye/imagenet-simple-labels/master/imagenet-simple-labels.json

data = np.load('../data/imagenet/imagenet-resnet152.npz')
example_paths = os.listdir('../data/imagenet/examples')
smx = data['smx']
labels = data['labels'].astype(int)

### Setting parameters for conformal prediction 

In [None]:
# Problem setup
n_cal = 1000 # number of calibration points
alpha = 0.1 # 1-alpha is the desired coverage

### Splitting the data

In [None]:
# Split the softmax scores into calibration and validation sets (save the shuffling)
idx = np.array([1] * n_cal + [0] * (smx.shape[0]-n_cal)) > 0
np.random.seed(42)
np.random.shuffle(idx)
cal_smx, test_smx = smx[idx,:], smx[~idx,:]
cal_labels, test_labels = labels[idx], labels[~idx]

### Understanding what is stored in the data

In [None]:
# Zoom on the labels in the calibration set
print("The first 10 labels in the calibration set are ", cal_labels[:10])
print("The minimal label is ",np.min(cal_labels))
print("The maximal label is ",np.max(cal_labels))
plt.hist(cal_labels,bins=1000);

In [None]:
# Zoom on the labels in the test set
print("The first 10 labels in the test set are ", test_labels[:10])
print("The minimal label is ",np.min(test_labels))
print("The maximal label is ",np.max(test_labels))
plt.hist(test_labels,bins=1000);

In [None]:
print("The total number of different labels in the dataset:",len(np.unique(labels)))

In [None]:
# Look at a row of cal_smx: this corresponds to the predicted probabilities 
# (the training has been already performed)
plt.plot(cal_smx[450])
print("The sum of a row of cal_smx gives ",np.sum(cal_smx[450]))

### A first way for conformal prediction

<div class="alert alert-block alert-info">
    1. For each calibration point, compute a score. <br>
    2. And then evaluate an empirical quantile of the calibration scores.  
</div> 



In [None]:
# 1: get conformal scores
cal_scores = 1-cal_smx[np.arange(n_cal),cal_labels] # TODO OPERAND


# 2: get adjusted quantile
q_level = np.ceil((n_cal+1)*(1-alpha))/n_cal                  # TODO OPERAND
qhat = np.quantile(cal_scores, q_level, interpolation='higher')# TODO OPERAND


<div class="alert alert-block alert-info">
    3. Form the prediction sets for the test set
</div> 

In [None]:
# 3: form prediction sets
prediction_sets = test_smx >= (1-qhat) # 3: form prediction sets

<div class="alert alert-block alert-info">
    4. Compute the empirical coverage
</div> 

In [None]:
# Calculate empirical coverage
empirical_coverage = prediction_sets[np.arange(prediction_sets.shape[0]),test_labels].mean() #TODO OPERAND
print(f"The empirical coverage is: {empirical_coverage}")
send(["The empirical coverage is:", empirical_coverage], 21)

<div class="alert alert-block alert-info">
    5. Illustrate prediction sets for some points of the test set
</div> 

In [None]:
# Making the labels human-readable
with open('../data/imagenet/human_readable_labels.json') as f:
    label_strings = np.array(json.load(f))

# Show some examples
example_paths =os.listdir('../data/imagenet/examples')
for i in range(10):
    rand_path = np.random.choice(example_paths)
    img = imread('../data/imagenet/examples/' + rand_path )
    img_index = int(rand_path.split('.')[0])
    #
    prediction_set = smx[img_index] > 1-qhat #TODO OPERAND
    #
    plt.figure()
    plt.imshow(img)
    plt.axis('off')
    send(plt, 22)
    plt.show()
    print(f"The prediction set is: {list(label_strings[prediction_set])}")

<div class="alert alert-block alert-success">This method leads to <b>small prediction sets</b>, but sacrifices <b>adaptivity</b>. </div>



## Towards more adaptive prediction sets 

<div class="alert alert-block alert-info">
    <ol>
<li>Sort in decreasing order $\hat{p}_{\sigma_i(1)}(X_i) \geq ... \geq  \hat{p}_{\sigma_i(C)}(X_i)$</li>
   
<li> Compute the calibration scores $S_i = \sum_{k=1}^{\sigma_i^{-1}(Y_i)} \hat{p}_{\sigma_i(k)}(X_i)$ (sum of the estimated probabilities associated to classes at least as large as that of the true class $Y_i$)} </li>
    <li> Get the quantile $q_{1-\alpha}(\mathcal{S})$ </li>
<li> Return the classes $\sigma_{\text{new}}(1),... ,\sigma_{\text{new}}(r^\star)$ where $r^{\star} = \text{argmax}_{1 \leq r \leq C}\left\{ \sum_{k=1}^{r} \hat{p}_{\sigma_{\text{new}}(k)}(X_{\text{new}}) < q_{1-\alpha}(\mathcal{S}) \right\} + 1$</li>
         </ol>

NB : to avoid the +1, you can use the corrected quantile ;)

In [None]:
# Get scores

# get for each row the position of the predicted probabilities in decreasing order

cal_pi = cal_smx.argsort(1)[:,::-1] #TODO OPERAND
#argsort in increasing order, so order reverting by [:,::-1] 

# form for each row the predicted probabilities in decreasing order
cal_srt = np.take_along_axis(cal_smx,cal_pi,axis=1) #TODO OPERAND


# get the rank of the true label in the sorted predicted probabilities
cal_L = np.where(cal_pi == cal_labels[:,None])[1] #TODO OPERAND

# compute the calibration scores (why minus something????)
cal_scores = cal_srt.cumsum(axis=1)[np.arange(n_cal),cal_L] #TODO OPERAND

# Get the score quantile
qhat = np.quantile(cal_scores, np.ceil((n_cal+1)*(1-alpha))/n_cal, interpolation='higher') #TODO OPERAND



In [None]:
# Deploy on the test set
n_test = test_smx.shape[0]


# sorting the predicted probabilities (softmax outputs) on the test set
test_pi = test_smx.argsort(1)[:,::-1] #TODO OPERAND


# reordering
test_srt = np.take_along_axis(test_smx,test_pi,axis=1) #TODO OPERAND


#test_srt_cumsum = test_srt.cumsum(axis=1)
# cumulative softmax outputs
indicators = test_srt.cumsum(axis=1) <= qhat #TODO OPERAND
prediction_sets = np.take_along_axis(indicators,test_pi.argsort(axis=1),axis=1) #TODO OPERAND


In [None]:
# Calculate empirical coverage

empirical_coverage = prediction_sets[np.arange(n_test),test_labels].mean() #TODO OPERAND
print(f"The empirical coverage is: {empirical_coverage}")
print(f"The quantile is: {qhat}")
send(["The empirical coverage is:", empirical_coverage], 23)

In [None]:
# Show some examples
with open('../data/imagenet/human_readable_labels.json') as f:
    label_strings = np.array(json.load(f))

example_paths =os.listdir('../data/imagenet/examples')
for i in range(10):
    rand_path = np.random.choice(example_paths)
    img = imread('../data/imagenet/examples/' + rand_path )
    img_index = int(rand_path.split('.')[0])
    # Form the prediction set

    
    _smx = smx[img_index]         #TODO OPERAND
    _pi = np.argsort(_smx)[::-1]  #TODO OPERAND
    _srt = np.take_along_axis(_smx,_pi,axis=0) #TODO OPERAND
    _srt_cumsum = _srt.cumsum()        #TODO OPERAND
    _ind = _srt_cumsum <= qhat  #TODO OPERAND
    
#     _smx = smx[img_index]         #TODO OPERAND
#     _pi = np.argsort(_smx)[::-1]  #TODO OPERAND
#     _srt = np.take_along_axis(_smx,_pi,axis=0) #TODO OPERAND
#     _srt_reg_cumsum = _srt_reg.cumsum()        #TODO OPERAND
#     _ind = (_srt_reg_cumsum - np.random.rand()*_srt_reg) <= qhat if rand else _srt_reg_cumsum - _srt_reg <= qhat #TODO OPERAND

    if disallow_zero_sets: _ind[0] = True
    prediction_set = np.take_along_axis(_ind,_pi.argsort(),axis=0)
    plt.figure()
    plt.imshow(img)
    plt.axis('off')
    send(plt, 24)
    plt.show()
    print(f"The prediction set is: {list(label_strings[prediction_set])}")

## Another way for adaptive predictive sets in classification


[Sadinle, Lei, Wasserman] https://arxiv.org/abs/1609.00451


[Notebook source](https://github.com/aangelopoulos/conformal-prediction/blob/main/notebooks/imagenet-raps.ipynb)

In [None]:
# Set RAPS regularization parameters (larger lam_reg and smaller k_reg leads to smaller sets)
lam_reg = 0.01
k_reg = 5
disallow_zero_sets = False # Set this to False in order to see the coverage upper bound hold
rand = True # Set this to True in order to see the coverage upper bound hold
reg_vec = np.array(k_reg*[0,] + (smx.shape[1]-k_reg)*[lam_reg,])[None,:] # create a row with 5 zeroes and the rest at 0.01

In [None]:
# Get scores

# get for each row the position of the predicted probabilities in decreasing order
cal_pi = cal_smx.argsort(1)[:,::-1] #argsort in increasing order, so order reverting by [:,::-1] 

# form for each row the predicted probabilities in decreasing order
cal_srt = np.take_along_axis(cal_smx,cal_pi,axis=1)

cal_srt_reg = cal_srt + reg_vec # addition row by row of 0 0 0 0 0 0.1 0.1 ....

# get the rank of the true label in the sorted predicted probabilities
cal_L = np.where(cal_pi == cal_labels[:,None])[1]

# compute the calibration scores (why minus something????)
cal_scores = cal_srt_reg.cumsum(axis=1)[np.arange(n_cal),cal_L] - np.random.rand(n_cal)*cal_srt_reg[np.arange(n_cal),cal_L]

# Get the score quantile
qhat = np.quantile(cal_scores, np.ceil((n_cal+1)*(1-alpha))/n_cal, interpolation='higher')


In [None]:
# Deploy
n_test = test_smx.shape[0]
test_pi = test_smx.argsort(1)[:,::-1] # sorting the predicted probabilities (softmax outputs) on the test set
test_srt = np.take_along_axis(test_smx,test_pi,axis=1) # reordering
test_srt_reg = test_srt + reg_vec # ???
test_srt_reg_cumsum = test_srt_reg.cumsum(axis=1) # cumulative softmax outputs
indicators = (test_srt_reg.cumsum(axis=1) - np.random.rand(n_test,1)*test_srt_reg) <= qhat if rand else test_srt_reg.cumsum(axis=1) - test_srt_reg <= qhat
if disallow_zero_sets: indicators[:,0] = True
prediction_sets = np.take_along_axis(indicators,test_pi.argsort(axis=1),axis=1)

In [None]:
# Calculate empirical coverage
empirical_coverage = prediction_sets[np.arange(n_test),test_labels].mean()
print(f"The empirical coverage is: {empirical_coverage}")
print(f"The quantile is: {qhat}")

In [None]:
# Show some examples
with open('../data/imagenet/human_readable_labels.json') as f:
    label_strings = np.array(json.load(f))

example_paths =os.listdir('../data/imagenet/examples')
for i in range(10):
    rand_path = np.random.choice(example_paths)
    img = imread('../data/imagenet/examples/' + rand_path )
    img_index = int(rand_path.split('.')[0])
    # Form the prediction set
    _smx = smx[img_index]
    _pi = np.argsort(_smx)[::-1]
    _srt = np.take_along_axis(_smx,_pi,axis=0)
    _srt_reg = _srt + reg_vec.squeeze()
    _srt_reg_cumsum = _srt_reg.cumsum()
    _ind = (_srt_reg_cumsum - np.random.rand()*_srt_reg) <= qhat if rand else _srt_reg_cumsum - _srt_reg <= qhat
    if disallow_zero_sets: _ind[0] = True
    prediction_set = np.take_along_axis(_ind,_pi.argsort(),axis=0)
    plt.figure()
    plt.imshow(img)
    plt.axis('off')
    plt.show()
    print(f"The prediction set is: {list(label_strings[prediction_set])}")