# 0. Imports, config, and utility setup

In [None]:
import os, math, random, time
import numpy as np, pandas as pd
import matplotlib.pyplot as plt
import torch, torch.nn as nn, torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Subset

SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

def set_deterministic():
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

class Timer:
    def __init__(self, name="block"):
        self.name = name
        self.t0 = None
    def __enter__(self):
        self.t0 = time.time()
    def __exit__(self, exc_type, exc, tb):
        print(f"{self.name} took {time.time() - self.t0:.2f}s")


# 1. Dataset paths, transforms, and loaders

In [None]:
IMAGENET_ROOT = r"C:\Users\ducta\Downloads"
IMAGENETV2_ROOT = r"C:\Users\ducta\Downloads\imagenetv2-matched-frequency"

IMAGENET_VAL_DIR = os.path.join(IMAGENET_ROOT, "imagenet-val")
IMAGENETV2_DIR = IMAGENETV2_ROOT
print("ImageNet-Val dir:", IMAGENET_VAL_DIR)
print("ImageNetV2 dir:", IMAGENETV2_DIR)

if not os.path.isdir(IMAGENET_VAL_DIR):
    raise RuntimeError("ImageNet-Val directory not found. Please set IMAGENET_VAL_DIR correctly.")
if not os.path.isdir(IMAGENETV2_DIR):
    raise RuntimeError("ImageNet-V2 directory not found. Please set IMAGENETV2_DIR correctly.")

imagenet_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

imagenet_val_ds = datasets.ImageFolder(IMAGENET_VAL_DIR, transform=imagenet_transform)
print("ImageNet-Val size:", len(imagenet_val_ds))
imagenetv2_ds = datasets.ImageFolder(IMAGENETV2_DIR, transform=imagenet_transform)
print("ImageNet-V2 size:", len(imagenetv2_ds))

def make_loader(dataset, batch_size=64, num_workers=4):
    return DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)


# 2. Base models and temperature scaling (Platt scaling)

In [None]:
def get_model(model_name="resnet152"):
    if model_name == "resnet152":
        model = torchvision.models.resnet152(weights="IMAGENET1K_V1")
    else:
        model = getattr(torchvision.models, model_name)(weights="IMAGENET1K_V1")
    model.eval()
    model.to(device)
    for p in model.parameters():
        p.requires_grad = False
    return model

@torch.no_grad()
def collect_logits(model, dataset, batch_size=64, num_workers=4):
    loader = make_loader(dataset, batch_size=batch_size, num_workers=num_workers)
    all_logits, all_labels = [], []
    for x, y in loader:
        x = x.to(device)
        logits = model(x)
        all_logits.append(logits.cpu())
        all_labels.append(y)
    return torch.cat(all_logits, dim=0), torch.cat(all_labels, dim=0)

class TemperatureScaler(nn.Module):
    def __init__(self, init_T=1.0):
        super().__init__()
        self.log_T = nn.Parameter(torch.tensor(math.log(init_T)))
    @property
    def T(self):
        return torch.exp(self.log_T)
    def forward(self, logits):
        return logits / self.T

def fit_temperature(logits_cal, labels_cal, lr=0.01, steps=200):
    scaler = TemperatureScaler().to(device)
    logits_cal = logits_cal.to(device)
    labels_cal = labels_cal.to(device)
    opt = torch.optim.Adam([scaler.log_T], lr=lr)
    for _ in range(steps):
        opt.zero_grad()
        loss = F.cross_entropy(scaler(logits_cal), labels_cal)
        loss.backward()
        opt.step()
    return scaler.T.detach().item()

def apply_temperature(logits, T):
    return F.softmax(logits / T, dim=1)


# 3. Helper: sorting probabilities and ranking the true label

In [None]:
def sort_probs_and_get_L(probs, labels):
    sorted_probs, sorted_idx = torch.sort(probs, dim=1, descending=True)
    sorted_probs_np = sorted_probs.cpu().numpy()
    sorted_idx_np = sorted_idx.cpu().numpy()
    labels_np = labels.cpu().numpy()
    N, K = sorted_probs_np.shape
    L = np.empty(N, dtype=np.int64)
    for i in range(N):
        L[i] = np.where(sorted_idx_np[i] == labels_np[i])[0][0] + 1
    return sorted_probs_np, sorted_idx_np, L


# 4. Implement Naive, APS, and RAPS (Algorithms 1–3)

In [None]:
def naive_sets(sorted_probs, sorted_idx, alpha):
    N, K = sorted_probs.shape
    cum = np.cumsum(sorted_probs, axis=1)
    threshold = 1.0 - alpha
    L = (cum >= threshold).argmax(axis=1) + 1
    sets = [sorted_idx[i, :L[i]] for i in range(N)]
    return sets, L

def conformal_quantile(E, alpha):
    n = len(E)
    q = math.ceil((n + 1) * (1 - alpha))
    E_sorted = np.sort(E)
    idx = min(q, n) - 1
    return E_sorted[idx]

def aps_sets(sorted_probs, sorted_idx, tau):
    cum = np.cumsum(sorted_probs, axis=1)
    L = (cum <= tau).sum(axis=1)
    L = np.clip(L, 1, sorted_probs.shape[1])
    sets = [sorted_idx[i, :L[i]] for i in range(len(sorted_probs))]
    return sets, L

def raps_calibration(sorted_probs, sorted_idx, L_cal, alpha, k_reg=5, lam=0.2):
    n, K = sorted_probs.shape
    E = np.empty(n, dtype=np.float64)
    for i in range(n):
        Li = L_cal[i]
        base = sorted_probs[i, :Li].sum()
        penalty = lam * max(Li - k_reg, 0)
        E[i] = base + penalty
    tau = conformal_quantile(E, alpha)
    return tau

def raps_sets(sorted_probs, sorted_idx, tau, k_reg=5, lam=0.2):
    N, K = sorted_probs.shape
    sets = []
    L_out = np.empty(N, dtype=np.int64)
    for i in range(N):
        cum = 0.0
        L = 0
        for j in range(K):
            rank = j + 1
            base = cum + sorted_probs[i, j]
            penalty = lam * max(rank - k_reg, 0)
            if base + penalty <= tau:
                cum = base
                L = rank
            else:
                break
        if L == 0:
            L = 1
        sets.append(sorted_idx[i, :L])
        L_out[i] = L
    return sets, L_out


# 5. Metrics: coverage, average set size, accuracies

In [None]:
def coverage(pred_sets, labels):
    hits = 0
    for i, S in enumerate(pred_sets):
        if labels[i].item() in S:
            hits += 1
    return hits / len(labels)

def avg_size(pred_sets):
    return np.mean([len(S) for S in pred_sets])

def topk_accuracies(probs, labels, ks=(1, 5)):
    maxk = max(ks)
    topk = probs.topk(maxk, dim=1).indices
    res = {}
    labels = labels.view(-1, 1)
    for k in ks:
        correct = (topk[:, :k] == labels).any(dim=1).float().mean().item()
        res[k] = correct
    return res


# 6. Experiment 1: coverage vs set size on ImageNet-Val

In [None]:
set_deterministic()
model = get_model("resnet152")
with Timer("Collect ImageNet-Val logits"):
    logits_val, labels_val = collect_logits(model, imagenet_val_ds, batch_size=64)
alpha_values = [0.1, 0.05]
N_TRIALS = 5
results_exp1 = []
first_split = None
for trial in range(N_TRIALS):
    perm = torch.randperm(len(logits_val))
    cal_idx = perm[:20000]
    eval_idx = perm[20000:40000]
    logits_cal = logits_val[cal_idx]
    labels_cal = labels_val[cal_idx]
    logits_eval = logits_val[eval_idx]
    labels_eval = labels_val[eval_idx]
    T = fit_temperature(logits_cal, labels_cal)
    probs_all = F.softmax(logits_val / T, dim=1)
    probs_cal = probs_all[cal_idx]
    probs_eval = probs_all[eval_idx]
    sorted_cal, idx_cal, L_cal = sort_probs_and_get_L(probs_cal, labels_cal)
    sorted_eval, idx_eval, L_eval = sort_probs_and_get_L(probs_eval, labels_eval)
    topk = topk_accuracies(probs_eval, labels_eval)
    for alpha in alpha_values:
        naive_sets_eval, naive_L = naive_sets(sorted_eval, idx_eval, alpha)
        naive_cov = coverage(naive_sets_eval, labels_eval)
        naive_size = avg_size(naive_sets_eval)
        tau_aps = raps_calibration(sorted_cal, idx_cal, L_cal, alpha, k_reg=5, lam=0.0)
        aps_sets_eval, aps_L = raps_sets(sorted_eval, idx_eval, tau_aps, k_reg=5, lam=0.0)
        aps_cov = coverage(aps_sets_eval, labels_eval)
        aps_size = avg_size(aps_sets_eval)
        tau_raps = raps_calibration(sorted_cal, idx_cal, L_cal, alpha, k_reg=5, lam=0.2)
        raps_sets_eval, raps_L = raps_sets(sorted_eval, idx_eval, tau_raps, k_reg=5, lam=0.2)
        raps_cov = coverage(raps_sets_eval, labels_eval)
        raps_size = avg_size(raps_sets_eval)
        results_exp1.append({"method": "Naive", "alpha": alpha, "trial": trial, "coverage": naive_cov, "avg_size": naive_size, "top1": topk[1], "top5": topk[5]})
        results_exp1.append({"method": "APS", "alpha": alpha, "trial": trial, "coverage": aps_cov, "avg_size": aps_size, "top1": topk[1], "top5": topk[5]})
        results_exp1.append({"method": "RAPS", "alpha": alpha, "trial": trial, "coverage": raps_cov, "avg_size": raps_size, "top1": topk[1], "top5": topk[5]})
    if trial == 0:
        first_split = {"cal_idx": cal_idx, "eval_idx": eval_idx, "probs_all": probs_all, "probs_eval": probs_eval, "sorted_eval": sorted_eval, "idx_eval": idx_eval, "labels_eval": labels_eval, "labels_cal": labels_cal, "sorted_cal": sorted_cal, "L_cal": L_cal, "L_eval": L_eval}
results_exp1_df = pd.DataFrame(results_exp1)
summary_exp1 = results_exp1_df.groupby(["method", "alpha"]).mean().reset_index()
print(summary_exp1)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
for method in ["Naive", "APS", "RAPS"]:
    for alpha in alpha_values:
        subset = results_exp1_df[(results_exp1_df.method == method) & (results_exp1_df.alpha == alpha)]
        axes[0].scatter([1 - alpha], [subset.coverage.mean()], label=f"{method} α={alpha}")
        axes[1].scatter([1 - alpha], [subset.avg_size.mean()], label=f"{method} α={alpha}")
axes[0].plot([0, 1], [0, 1], "k--")
axes[0].set_xlabel("Target coverage")
axes[0].set_ylabel("Empirical coverage")
axes[1].set_xlabel("Target coverage")
axes[1].set_ylabel("Average set size")
axes[0].legend()
axes[1].legend()
plt.tight_layout()
plt.show()


# 7. Experiment 2: coverage vs set size on ImageNet-V2

In [None]:
with Timer("Collect ImageNet-V2 logits"):
    logits_v2, labels_v2 = collect_logits(model, imagenetv2_ds, batch_size=64)
alpha_values_v2 = [0.1, 0.05]
N_TRIALS_V2 = 5
results_exp2 = []
for trial in range(N_TRIALS_V2):
    perm = torch.randperm(len(logits_v2))
    cal_idx = perm[:5000]
    eval_idx = perm[5000:10000]
    logits_cal = logits_v2[cal_idx]
    labels_cal = labels_v2[cal_idx]
    logits_eval = logits_v2[eval_idx]
    labels_eval = labels_v2[eval_idx]
    T = fit_temperature(logits_cal, labels_cal)
    probs_all = F.softmax(logits_v2 / T, dim=1)
    probs_cal = probs_all[cal_idx]
    probs_eval = probs_all[eval_idx]
    sorted_cal, idx_cal, L_cal = sort_probs_and_get_L(probs_cal, labels_cal)
    sorted_eval, idx_eval, L_eval = sort_probs_and_get_L(probs_eval, labels_eval)
    topk = topk_accuracies(probs_eval, labels_eval)
    for alpha in alpha_values_v2:
        naive_sets_eval, naive_L = naive_sets(sorted_eval, idx_eval, alpha)
        tau_aps = raps_calibration(sorted_cal, idx_cal, L_cal, alpha, k_reg=5, lam=0.0)
        aps_sets_eval, aps_L = raps_sets(sorted_eval, idx_eval, tau_aps, k_reg=5, lam=0.0)
        tau_raps = raps_calibration(sorted_cal, idx_cal, L_cal, alpha, k_reg=5, lam=0.2)
        raps_sets_eval, raps_L = raps_sets(sorted_eval, idx_eval, tau_raps, k_reg=5, lam=0.2)
        results_exp2.append({"method": "Naive", "alpha": alpha, "trial": trial, "coverage": coverage(naive_sets_eval, labels_eval), "avg_size": avg_size(naive_sets_eval), "top1": topk[1], "top5": topk[5]})
        results_exp2.append({"method": "APS", "alpha": alpha, "trial": trial, "coverage": coverage(aps_sets_eval, labels_eval), "avg_size": avg_size(aps_sets_eval), "top1": topk[1], "top5": topk[5]})
        results_exp2.append({"method": "RAPS", "alpha": alpha, "trial": trial, "coverage": coverage(raps_sets_eval, labels_eval), "avg_size": avg_size(raps_sets_eval), "top1": topk[1], "top5": topk[5]})
results_exp2_df = pd.DataFrame(results_exp2)
summary_exp2 = results_exp2_df.groupby(["method", "alpha"]).mean().reset_index()
print(summary_exp2)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
for method in ["Naive", "APS", "RAPS"]:
    for alpha in alpha_values_v2:
        subset = results_exp2_df[(results_exp2_df.method == method) & (results_exp2_df.alpha == alpha)]
        axes[0].scatter([1 - alpha], [subset.coverage.mean()], label=f"{method} α={alpha}")
        axes[1].scatter([1 - alpha], [subset.avg_size.mean()], label=f"{method} α={alpha}")
axes[0].plot([0, 1], [0, 1], "k--")
axes[0].set_xlabel("Target coverage")
axes[0].set_ylabel("Empirical coverage")
axes[1].set_xlabel("Target coverage")
axes[1].set_ylabel("Average set size")
axes[0].legend()
axes[1].legend()
plt.tight_layout()
plt.show()


# 8. Experiment 3: histograms of set sizes (Naive, APS, RAPS)

In [None]:
alpha_hist = 0.1
lam_values = [0.01, 0.1, 1.0]
cal_idx = first_split["cal_idx"]
eval_idx = first_split["eval_idx"]
probs_all = first_split["probs_all"]
probs_eval = first_split["probs_eval"]
sorted_eval = first_split["sorted_eval"]
idx_eval = first_split["idx_eval"]
labels_eval = first_split["labels_eval"]
sorted_cal = first_split["sorted_cal"]
L_cal = first_split["L_cal"]
naive_sets_eval, naive_L = naive_sets(sorted_eval, idx_eval, alpha_hist)
tau_aps = raps_calibration(sorted_cal, idx_eval, L_cal, alpha_hist, k_reg=5, lam=0.0)
aps_sets_eval, aps_L = raps_sets(sorted_eval, idx_eval, tau_aps, k_reg=5, lam=0.0)
fig, ax = plt.subplots(figsize=(8, 6))
bins = np.arange(1, 51)
ax.hist(naive_L, bins=bins, alpha=0.5, label="Naive", log=True)
ax.hist(aps_L, bins=bins, alpha=0.5, label="APS", log=True)
for lam in lam_values:
    tau = raps_calibration(sorted_cal, idx_eval, L_cal, alpha_hist, k_reg=5, lam=lam)
    r_sets, r_L = raps_sets(sorted_eval, idx_eval, tau, k_reg=5, lam=lam)
    ax.hist(r_L, bins=bins, alpha=0.5, label=f"RAPS λ={lam}", log=True)
ax.set_xlabel("Set size")
ax.set_ylabel("Frequency (log)")
ax.legend()
plt.tight_layout()
plt.show()


# 9. Experiment 4: adaptiveness w.r.t. image difficulty

In [None]:
alpha_adapt = 0.1
lam_range = [0.0, 0.001, 0.01, 0.1, 1.0]
k_reg = 5
sorted_eval = first_split["sorted_eval"]
idx_eval = first_split["idx_eval"]
labels_eval = first_split["labels_eval"]
sorted_cal = first_split["sorted_cal"]
L_cal = first_split["L_cal"]
rank_true = np.empty(len(labels_eval), dtype=np.int64)
for i in range(len(labels_eval)):
    rank_true[i] = np.where(idx_eval[i] == labels_eval[i].item())[0][0] + 1
bins = [(1, 1), (2, 3), (4, 6), (7, 10), (11, 100), (101, 1000)]
bin_labels = ["1", "2-3", "4-6", "7-10", "11-100", "101-1000"]
records = []
for lam in lam_range:
    tau = raps_calibration(sorted_cal, idx_eval, L_cal, alpha_adapt, k_reg=k_reg, lam=lam)
    sets, L_out = raps_sets(sorted_eval, idx_eval, tau, k_reg=k_reg, lam=lam)
    hits = np.array([labels_eval[i].item() in sets[i] for i in range(len(sets))])
    for (a, b), name in zip(bins, bin_labels):
        mask = (rank_true >= a) & (rank_true <= b)
        if mask.sum() == 0:
            continue
        cov = hits[mask].mean()
        size = L_out[mask].mean()
        records.append({"bin": name, "lambda": lam, "coverage": cov, "avg_size": size})
adapt_df = pd.DataFrame(records)
print(adapt_df.pivot_table(index="bin", columns="lambda", values="avg_size"))


# 10. Notebook polish and summaries

In [None]:
print("Experiment 1 summary:")
print(summary_exp1)
print("Experiment 2 summary:")
print(summary_exp2)
print("Adaptiveness summary:")
print(adapt_df)
