In [1]:
! git clone https://github.com/modestyachts/ImageNetV2.git
! cd ImageNetV2

Cloning into 'ImageNetV2'...
remote: Enumerating objects: 1479, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (7/7), done.[K
remote: Total 1479 (delta 2), reused 0 (delta 0), pack-reused 1472 (from 1)[K
Receiving objects: 100% (1479/1479), 122.91 MiB | 22.08 MiB/s, done.
Resolving deltas: 100% (1093/1093), done.
Updating files: 100% (1375/1375), done.


In [2]:
# Step 1: Install dependencies
!pip install git+https://github.com/modestyachts/ImageNetV2_pytorch
!pip install tqdm


Collecting git+https://github.com/modestyachts/ImageNetV2_pytorch
  Cloning https://github.com/modestyachts/ImageNetV2_pytorch to /tmp/pip-req-build-q74ue96n
  Running command git clone --filter=blob:none --quiet https://github.com/modestyachts/ImageNetV2_pytorch /tmp/pip-req-build-q74ue96n
  Resolved https://github.com/modestyachts/ImageNetV2_pytorch to commit 14d4456c39fe7f02a665544dd9fc37c1a5f8b635
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: imagenetv2_pytorch
  Building wheel for imagenetv2_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for imagenetv2_pytorch: filename=imagenetv2_pytorch-0.1-py3-none-any.whl size=2658 sha256=97d777a87852f8279d1833bb94e9e24f23f16738ef18e43aefa7366b32c32bbe
  Stored in directory: /tmp/pip-ephem-wheel-cache-h63993wh/wheels/eb/61/f3/007769f94191be99678049fc34bfce389459b3752fb7f27c78
Successfully built imagenetv2_pytorch
Installing collected packages: imagenetv2_pytorch
Successfully installed ima

In [3]:
import torch
from torchvision import models, transforms
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm
from torch.nn import functional as F
import numpy as np
import torchvision
import matplotlib.pyplot as plt
from torch import nn, optim
from scipy.special import softmax
from imagenetv2_pytorch import ImageNetV2Dataset
from tqdm import tqdm


In [4]:
# Step 3: Set device to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [5]:
model_constructors = {
    "ResNeXt101":models.resnext101_64x4d,
    "ResNet152": models.resnet152,
    "ResNet101": models.resnet101,
    "ResNet50": models.resnet50,
    "ResNet18": models.resnet18,
    "VGG16_BN": models.vgg16_bn
}

# Initialize and move models to device
model_dict = {}
for name, constructor in model_constructors.items():
    model = constructor(pretrained=True, progress=True)
    model.eval()
    model = model.to(device)
    model_dict[name] = model

# Transformation pipeline
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])




Downloading: "https://download.pytorch.org/models/resnext101_64x4d-173b62eb.pth" to /root/.cache/torch/hub/checkpoints/resnext101_64x4d-173b62eb.pth
100%|██████████| 319M/319M [00:02<00:00, 144MB/s]
Downloading: "https://download.pytorch.org/models/resnet152-394f9c45.pth" to /root/.cache/torch/hub/checkpoints/resnet152-394f9c45.pth
100%|██████████| 230M/230M [00:01<00:00, 165MB/s]
Downloading: "https://download.pytorch.org/models/resnet101-63fe2227.pth" to /root/.cache/torch/hub/checkpoints/resnet101-63fe2227.pth
100%|██████████| 171M/171M [00:01<00:00, 150MB/s]
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:01<00:00, 76.3MB/s]
Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 78.1MB/s]
Downloading: "https://download.pytorch.org/models/vgg16_bn-

 # Load the data set

In [6]:
batch_size = 128
num_calib = 4500
num_param = 1000
total_size = 10000
# Step 6: Load ImageNet-V2 dataset
dataset = ImageNetV2Dataset("matched-frequency", transform=transform)
calib_set, test_set, param_set = random_split(dataset, [num_calib, total_size - num_param - num_calib, num_param])

calib_loader = DataLoader(calib_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)
param_loader = DataLoader(param_set, batch_size=batch_size, shuffle=True)


Dataset matched-frequency not found on disk, downloading....


100%|██████████| 1.26G/1.26G [00:17<00:00, 70.6MiB/s]


Extracting....


 # Function for Conformal Prediction Procedures

In [7]:

def get_logits(model, dataloader, device):
    model.eval()
    logits_list, labels_list = [], []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            logits_list.append(outputs.cpu())
            labels_list.append(labels)
    return torch.cat(logits_list), torch.cat(labels_list)

def temperature_scaling(logits, labels, max_iters=50, lr=0.01, epsilon=1e-4):
    T = nn.Parameter(torch.tensor([1.3], requires_grad=True))
    optimizer = optim.SGD([T], lr=lr)
    criterion = nn.CrossEntropyLoss()

    for i in range(max_iters):
        optimizer.zero_grad()
        scaled_logits = logits / T
        loss = criterion(scaled_logits, labels)
        loss.backward()
        optimizer.step()
        if abs(loss.item()) < epsilon:
            break
    return T.detach()

def compute_scores(sorted_probs_list, sorted_indices_list, labels,
                   lambda_reg=0.1, k_reg=5, randomized=True, allow_zero_sets=True):
    scores = []
    for i in range(len(labels)):
        sorted_probs = sorted_probs_list[i]
        sorted_indices = sorted_indices_list[i]
        cumsum = np.cumsum(sorted_probs)

        penalties = np.zeros_like(sorted_probs)
        penalties[k_reg:] += lambda_reg
        penalties_cumsum = np.cumsum(penalties)

        target = labels[i].item()
        rank = np.where(sorted_indices == target)[0][0]

        if not randomized:
            tau = cumsum[rank] + penalties_cumsum[rank]
        else:
            U = np.random.rand()
            if rank == 0:
                tau = U * cumsum[0] + penalties_cumsum[0] if allow_zero_sets else cumsum[0] + penalties_cumsum[0]
            else:
                tau = U * sorted_probs[rank] + cumsum[rank - 1] + penalties_cumsum[rank]

        scores.append(tau)
    return np.array(scores)


def compute_threshold(scores, alpha):
    return np.quantile(scores, 1 - alpha, method="higher")

def predict_set(sorted_probs, sorted_indices, tau, lambda_reg=0.1, k_reg=5, randomized=True, allow_zero_sets=True):

    cumsum = np.cumsum(sorted_probs)

    penalties = np.zeros_like(sorted_probs)
    penalties[k_reg:] += lambda_reg
    penalties_cumsum = np.cumsum(penalties)

    sizes_base = (cumsum + penalties_cumsum <= tau).sum() + 1
    sizes_base = min(sizes_base, len(sorted_probs))

    if randomized and sizes_base < len(sorted_probs):
        V = 1 / sorted_probs[sizes_base - 1] * (
            tau - (cumsum[sizes_base - 1] - sorted_probs[sizes_base - 1]) - penalties_cumsum[sizes_base - 1]
        )
        sizes = sizes_base - int(np.random.rand() >= V)
    else:
        sizes = sizes_base

    if tau == 1.0:
        sizes = len(sorted_probs)

    if not allow_zero_sets and sizes == 0:
        sizes = 1

    sizes = max(sizes, 1)
    return sorted_indices[:sizes].tolist()


def platt_scaling(logits_data, T):
    logits_data = logits_data.cpu().numpy()
    sorted_probs = []
    sorted_indices =[]
    for i in range(len(logits_data)):
        scaled_logits = logits_data[i] / T.item()
        probs = softmax(scaled_logits)
        sorted_indices.append(np.argsort(probs)[::-1])
        sorted_probs.append(np.sort(probs)[::-1])

    return sorted_probs, sorted_indices

def optimal_k_reg(sorted_indices_list, labels, alpha):

    ranks = []
    for i in range(len(labels)):
        true_label = labels[i].item()
        rank = np.where(sorted_indices_list[i] == true_label)[0][0]
        ranks.append(rank)

    k_reg = compute_threshold(np.array(ranks), alpha=alpha)
    return k_reg



In [8]:
# Hyperparameters
lambda_reg = 0.01
k_reg = 5
randomized = True
allow_zero_sets = True
allow_optimal_k = True
alpha = 0.1
methods = ['RAPS', 'APS']

models_val = {
    'ResNet18': model_dict['ResNet18'],
    'ResNet50': model_dict['ResNet50'],
    'ResNet101': model_dict['ResNet101'],
    'ResNet152': model_dict['ResNet152'],
    'ResNeXt101': model_dict['ResNeXt101'],
    'VGG16_BN': model_dict['VGG16_BN']
}

model_results = {}

## Checking for each model

In [9]:
for z in range(5):
  dataset = ImageNetV2Dataset("matched-frequency", transform=transform)
  calib_set, test_set, param_set = random_split(dataset, [num_calib, total_size - num_param - num_calib, num_param])

  calib_loader = DataLoader(calib_set, batch_size=batch_size, shuffle=True)
  test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)
  param_loader = DataLoader(param_set, batch_size=batch_size, shuffle=True)


  for model_name, model in models_val.items():
      print(f"\nRunning model: {model_name}")



      # Get logits
      logits_calib, labels_calib = get_logits(model, calib_loader, device)
      logits_test, labels_test = get_logits(model, test_loader, device)
      param_logits, param_labels = get_logits(model, param_loader, device)

      # Scaling
      T = temperature_scaling(logits_calib, labels_calib)
      sorted_probs_calib, sorted_indices_calib = platt_scaling(logits_calib, T)
      sorted_probs_test, sorted_indices_test = platt_scaling(logits_test, T)
      sorted_probs_param, sorted_indices_param = platt_scaling(param_logits, T)

      results = {}

      # RAPS
      if allow_optimal_k:
          k_reg = optimal_k_reg(sorted_indices_param, param_labels, alpha=alpha)
          print(f"Optimal k_reg for alpha {alpha}: {k_reg}")

      scores = compute_scores(sorted_probs_calib, sorted_indices_calib, labels_calib,
                              lambda_reg=lambda_reg, k_reg=k_reg,
                              randomized=randomized, allow_zero_sets=allow_zero_sets)
      tau = compute_threshold(scores, alpha)

      prediction_sets = [
          predict_set(sorted_probs_test[i], sorted_indices_test[i], tau,
                      lambda_reg=lambda_reg, k_reg=k_reg,
                      randomized=randomized, allow_zero_sets=allow_zero_sets)
          for i in range(len(sorted_probs_test))
      ]
      sizes = [len(pset) for pset in prediction_sets]
      coverage = np.mean([
          labels_test[i].item() in prediction_sets[i] for i in range(len(labels_test))
      ])

      results['RAPS'] = {
          'size': np.mean(sizes),
          'coverage': coverage
      }

      # APS (lambda=0, k_reg=0)
      scores_aps = compute_scores(sorted_probs_calib, sorted_indices_calib, labels_calib,
                                  lambda_reg=0, k_reg=0,
                                  randomized=randomized, allow_zero_sets=allow_zero_sets)
      tau_aps = compute_threshold(scores_aps, alpha)

      prediction_sets_aps = [
          predict_set(sorted_probs_test[i], sorted_indices_test[i], tau_aps,
                      lambda_reg=0, k_reg=0,
                      randomized=randomized, allow_zero_sets=allow_zero_sets)
          for i in range(len(sorted_probs_test))
      ]
      sizes_aps = [len(pset) for pset in prediction_sets_aps]
      coverage_aps = np.mean([
          labels_test[i].item() in prediction_sets_aps[i] for i in range(len(labels_test))
      ])

      results['APS'] = {
          'size': np.mean(sizes_aps),
          'coverage': coverage_aps
      }

      # Save results
      model_results[model_name] = results

  print("Iteration", z)

  print("\n=== RAPS vs APS Results ===")
  header = f"{'Metric':<12} | {'Method':<6} | " + " | ".join([f"{model:<12}" for model in model_results.keys()])
  print("-" * len(header))
  print(header)
  print("-" * len(header))

  # Print Prediction Set Size
  print(f"{'Set Size':<12} | {'APS':<6} | " + " | ".join([f"{model_results[model]['APS']['size']:<12.3f}" for model in model_results]))
  print(f"{'':<12} | {'RAPS':<6} | " + " | ".join([f"{model_results[model]['RAPS']['size']:<12.3f}" for model in model_results]))
  print("-" * len(header))

  # Print Coverage
  print(f"{'Coverage':<12} | {'APS':<6} | " + " | ".join([f"{model_results[model]['APS']['coverage']:<12.3f}" for model in model_results]))
  print(f"{'':<12} | {'RAPS':<6} | " + " | ".join([f"{model_results[model]['RAPS']['coverage']:<12.3f}" for model in model_results]))
  print("-" * len(header))




Running model: ResNet18
Optimal k_reg for alpha 0.1: 17

Running model: ResNet50
Optimal k_reg for alpha 0.1: 10

Running model: ResNet101
Optimal k_reg for alpha 0.1: 9

Running model: ResNet152
Optimal k_reg for alpha 0.1: 7

Running model: ResNeXt101
Optimal k_reg for alpha 0.1: 4

Running model: VGG16_BN
Optimal k_reg for alpha 0.1: 10
Iteration 0

=== RAPS vs APS Results ===
---------------------------------------------------------------------------------------------------------------
Metric       | Method | ResNet18     | ResNet50     | ResNet101    | ResNet152    | ResNeXt101   | VGG16_BN    
---------------------------------------------------------------------------------------------------------------
Set Size     | APS    | 37.855       | 35.630       | 30.938       | 27.770       | 57.031       | 32.792      
             | RAPS   | 16.181       | 10.898       | 9.341        | 8.480        | 6.109        | 11.420      
--------------------------------------------------------