导入库//

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import DataLoader, random_split, Subset
from collections import defaultdict
import pandas as pd
from sklearn.model_selection import train_test_split
import time
import copy
import os

随机种子//

In [6]:
torch.manual_seed(42)
np.random.seed(42)

数据集//


In [7]:
class CIFARData:
    def __init__(self, dataset='cifar10', calib_size=1000, test_size=1000, batch_size=128):
        self.dataset = dataset.lower()
        self.calib_size = calib_size
        self.test_size = test_size
        self.batch_size = batch_size
        self.num_classes = 10 if dataset == 'cifar10' else 100
        self._load_data()
    
    def _load_data(self):
        transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])

        transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
        
        if self.dataset == 'cifar10':
            full_train_set = torchvision.datasets.CIFAR10(
                root='./data', train=True, download=True, transform=transform_train)
            test_set = torchvision.datasets.CIFAR10(
                root='./data', train=False, download=True, transform=transform_test)
        else:  # cifar100
            full_train_set = torchvision.datasets.CIFAR100(
                root='./data', train=True, download=True, transform=transform_train)
            test_set = torchvision.datasets.CIFAR100(
                root='./data', train=False, download=True, transform=transform_test)
        
        indices = np.arange(len(full_train_set))
        labels = full_train_set.targets
        
        train_idx, calib_idx = train_test_split(
            indices, test_size=self.calib_size, stratify=labels
        )

        self.split_indices = {
            'train_idx': train_idx,
            'calib_idx': calib_idx,
        }
        
        self.train_set = Subset(full_train_set, train_idx)
        self.calib_set = Subset(full_train_set, calib_idx)
        self.test_set = test_set
        
        self.train_loader = DataLoader(
            self.train_set, 
            batch_size=self.batch_size, shuffle=True, num_workers=2
        )
        
        self.calib_loader = DataLoader(
            self.calib_set, 
            batch_size=self.batch_size, shuffle=False, num_workers=2
        )
        
        self.test_loader = DataLoader(
            self.test_set, 
            batch_size=self.batch_size, shuffle=False, num_workers=2
        )
        
        print(f"Dataset: {self.dataset}")
        print()
        print(f"Dataset sizes for {self.dataset.upper()}:")
        print(f"  Training set: {len(self.train_set)} samples")
        print(f"  Calibration set: {len(self.calib_set)} samples")
        print(f"  Test set: {len(self.test_set)} samples")

残差块//

In [8]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = nn.ReLU()(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = nn.ReLU()(out)
        return out

Resnet//

In [9]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = nn.ReLU()(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = nn.AdaptiveAvgPool2d((1, 1))(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def ResNet18(num_classes=10):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes)

def ResNet34(num_classes=10):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes)

def ResNet50(num_classes=10):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes)

训练

In [10]:
class Trainer:
    def __init__(self, model, data_loader, num_classes=10, lr=0.1, epochs=100):
        self.model = model
        if self.model == 'resnet18':
            self.lr = 0.1
        elif self.model == 'resnet34':
            self.lr = 0.05
        else:
            self.lr = 0.03
        self.train_loader = data_loader.train_loader
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.SGD(
            model.parameters(), lr=self.lr, momentum=0.9, weight_decay=5e-4
        )
        self.scheduler = optim.lr_scheduler.MultiStepLR(
            self.optimizer, milestones=[50, 75], gamma=0.1
        )
        self.epochs = epochs
        self.best_acc = 0.0
        self.num_classes = num_classes
        
    def train(self):
        self.model.train()
        for epoch in range(self.epochs):
            start_time = time.time()
            running_loss = 0.0
            correct = 0
            total = 0
            
            for inputs, labels in self.train_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                
                running_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
            
            self.scheduler.step()
            epoch_acc = 100. * correct / total
            epoch_time = time.time() - start_time
            
            print(f'Epoch [{epoch+1}/{self.epochs}] - Loss: {running_loss/len(self.train_loader):.4f} '
                  f'Acc: {epoch_acc:.2f}% - Time: {epoch_time:.2f}s')
        
        print('Training finished!')
        return self.model
    
    def save_model(self, model_path):
        torch.save(self.model.state_dict(), model_path)
        print(f'Model saved to {model_path}')

共形预测

In [None]:
class ConformalPredictor:
    def __init__(self, model, calib_loader, alpha=0.1, num_classes=10):
        self.model = model
        self.calib_loader = calib_loader
        if self.model == 'resnet18':
            self.alpha = 0.1
        elif self.model == 'resnet34':
            self.alpha = 0.05
        else: # 'resnet50'
            self.alpha = 0.03
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        self.q = None
    
    def compute_scores(self, method='likelihood'):
        print("Calibration method:", method)
        scores = []
        model_to_use = self.model
        model_to_use.eval()
        
        with torch.no_grad():
            for inputs, labels in self.calib_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = model_to_use(inputs)
                
                probs = torch.softmax(outputs, dim=1)
                
                batch_scores = None
                if method == 'likelihood':
                    batch_scores = 1 - probs[torch.arange(len(labels)), labels].cpu().numpy()
                
                elif method == 'cumulative':
                    sorted_probs, _ = torch.sort(probs, dim=1, descending=True)
                    cum_probs = torch.cumsum(sorted_probs, dim=1)
                    ranks = (probs == probs.gather(1, labels.view(-1, 1))).nonzero()[:, 1]
                    #match_matrix = (sorted_indices == labels.view(-1, 1))
                    #ranks = match_matrix.int().argmax(dim=1)
                    batch_scores = cum_probs[torch.arange(len(labels)), ranks].cpu().numpy()
                
                scores.extend(batch_scores)
        
        return np.array(scores)
    
    def calibrate(self, method='likelihood'):
        if self.calib_loader is None:
            raise ValueError("The calibration data loader is not set! Please provide a valid calib_loader.")

        scores = self.compute_scores(method)
        
        print(f"Scores statistics ({method}):")
        print(f"  Min: {np.min(scores):.6f}, Max: {np.max(scores):.6f}")
        print(f"  Mean: {np.mean(scores):.6f}, Median: {np.median(scores):.6f}")
        print(f"  10th percentile: {np.percentile(scores, 10):.6f}")
        print(f"  90th percentile: {np.percentile(scores, 90):.6f}")
        
        if method in ['likelihood', 'cumulative']:
            self.q = np.percentile(scores, 100 * (1 - self.alpha))
        
        print(f'Calibration complete - Quantile: {self.q:.4f}')
        return self.q
    
    def predict(self, inputs, method='likelihood'):
        model_to_use = self.model
        model_to_use.eval()
        
        with torch.no_grad():
            inputs = inputs.to(self.device)
            outputs = model_to_use(inputs)
            
            probs = torch.softmax(outputs, dim=1)[0]

            pred_set = None
            if method == 'likelihood':
                pred_mask = (1 - probs <= self.q)
                pred_set = torch.nonzero(pred_mask).cpu().numpy().flatten()

                if pred_set.size == 0:
                    pred_set = np.array([probs.argmax().item()])
        
            elif method == 'cumulative':
                sorted_probs, sorted_idxs = torch.sort(probs, descending=True)
                cum_probs = torch.cumsum(sorted_probs, dim=0)
        
                mask = (1 - cum_probs <= self.q)
                if mask.any():
                    k = mask.nonzero()[0].item() + 1
                else:
                    k = 1

                pred_set = sorted_idxs[:k].cpu().numpy()
        
            if pred_set.size == 0:
                pred_set = np.array([probs.argmax().item()])
        
        return pred_set
    
    def evaluate(self, test_loader, method='likelihood', save_path=None):
        coverage = []
        set_sizes = []
        prediction_sets = []
        correct = []
        losses = []
        unconformity_scores = []
        
        model_to_use = self.model
        model_to_use.eval()
        criterion = nn.CrossEntropyLoss(reduction='none')
        
        if save_path:
            dir_path = os.path.dirname(save_path)
            if dir_path:
                os.makedirs(dir_path, exist_ok=True)
            f = open(save_path, 'w')
            f.write("sample_index,true_label,prediction_set,set_size,is_covered,is_correct,loss\n")
        
        sample_index = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = model_to_use(inputs)
                
                batch_losses = criterion(outputs, labels).cpu().numpy()
                losses.extend(batch_losses)
                
                _, predicted = outputs.max(1)
                batch_correct = predicted.eq(labels).cpu().numpy()
                correct.extend(batch_correct)
                
                for i in range(inputs.size(0)):
                    input_i = inputs[i].unsqueeze(0)
                    output_i = model_to_use(input_i)
                    prob_i = torch.softmax(output_i, dim=1)[0]
                    
                    true_label = labels[i].item()
                    if method == 'likelihood':
                        unconformity_score = 1 - prob_i[true_label].item()
                    elif method == 'cumulative':
                        sorted_probs, sorted_idxs = torch.sort(prob_i, descending=True)
                        cum_probs = torch.cumsum(sorted_probs, dim=0)
                        rank = (sorted_idxs == true_label).nonzero().item()
                        unconformity_score = 1 - cum_probs[rank].item()
                    
                    unconformity_scores.append(unconformity_score)
                    
                    pred_set = self.predict(input_i, method)
                    
                    prediction_sets.append(pred_set.tolist())
                    is_covered = true_label in pred_set
                    coverage.append(is_covered)
                    set_size = len(pred_set)
                    set_sizes.append(set_size)
                    
                    is_correct = batch_correct[i]
                    
                    if save_path:
                        set_str = ",".join(map(str, pred_set))
                        f.write(f"{sample_index},{true_label},{set_str},{set_size},{int(is_covered)},{int(is_correct)},{batch_losses[i]:.6f}\n")
                    
                    sample_index += 1
        
        if save_path:
            f.close()
            print(f"Prediction sets saved to: {save_path}")
        
        coverage_rate = np.mean(coverage) * 100
        avg_set_size = np.mean(set_sizes)
        accuracy = np.mean(correct) * 100
        avg_loss = np.mean(losses)
        
        print(f'Coverage: {coverage_rate:.2f}% | Avg Set Size: {avg_set_size:.2f} '
              f'| Accuracy: {accuracy:.2f}% | Avg Loss: {avg_loss:.4f}')
        
        results = {
            'coverage': coverage,
            'set_size': set_sizes,
            'prediction_sets': prediction_sets,
            'correct': correct,
            'loss': losses,
            'unconformity_scores': unconformity_scores
        }
        
        return results

train and save

In [9]:
DATASETS = ['cifar10', 'cifar100']
MODELS = ['resnet18','resnet34','resnet50'] 
METHODS = ['likelihood','cumulative']
EPOCHS = 100
CALIB_SIZE = 1000
TEST_SIZE = 1000 
BATCH_SIZE = 128

for DATASET in DATASETS:

    print("Loading data...")
    data = CIFARData(dataset=DATASET, calib_size=CALIB_SIZE, test_size=TEST_SIZE, batch_size=BATCH_SIZE)
    num_classes = data.num_classes
    np.save(f'{DATASET}_split_indices_0.npy', data.split_indices)

    for model_name in MODELS:
        print(f"\n===== Processing {model_name} =====")
        
        if model_name == 'resnet18':
            model = ResNet18(num_classes)
        elif model_name == 'resnet34':
            model = ResNet34(num_classes)
        else:  # resnet50
            model = ResNet50(num_classes)
        
        print("Training model...")
        trainer = Trainer(model, data, num_classes=num_classes, epochs=EPOCHS)
        model = trainer.train()
        
        model_path = f"{model_name}_{DATASET}_0.pth"
        trainer.save_model(model_path)

Loading data...
Dataset: cifar10

Dataset sizes for CIFAR10:
  Training set: 49000 samples
  Calibration set: 1000 samples
  Test set: 10000 samples

===== Processing resnet18 =====
Training model...
Epoch [1/100] - Loss: 1.5879 Acc: 41.76% - Time: 11.67s
Epoch [2/100] - Loss: 1.0798 Acc: 61.29% - Time: 11.03s
Epoch [3/100] - Loss: 0.8431 Acc: 70.08% - Time: 11.14s
Epoch [4/100] - Loss: 0.6974 Acc: 75.86% - Time: 11.19s
Epoch [5/100] - Loss: 0.5988 Acc: 79.22% - Time: 11.25s
Epoch [6/100] - Loss: 0.5309 Acc: 81.61% - Time: 11.21s
Epoch [7/100] - Loss: 0.4717 Acc: 83.66% - Time: 11.27s
Epoch [8/100] - Loss: 0.4343 Acc: 84.82% - Time: 11.32s
Epoch [9/100] - Loss: 0.3999 Acc: 86.15% - Time: 11.36s
Epoch [10/100] - Loss: 0.3765 Acc: 86.96% - Time: 11.35s
Epoch [11/100] - Loss: 0.3480 Acc: 88.01% - Time: 11.30s
Epoch [12/100] - Loss: 0.3279 Acc: 88.69% - Time: 11.47s
Epoch [13/100] - Loss: 0.3089 Acc: 89.31% - Time: 11.42s
Epoch [14/100] - Loss: 0.2878 Acc: 90.07% - Time: 11.43s
Epoch [15/1

100.0%


Dataset: cifar100

Dataset sizes for CIFAR100:
  Training set: 49000 samples
  Calibration set: 1000 samples
  Test set: 10000 samples

===== Processing resnet18 =====
Training model...
Epoch [1/100] - Loss: 3.8369 Acc: 11.21% - Time: 11.18s
Epoch [2/100] - Loss: 3.1472 Acc: 22.16% - Time: 11.16s
Epoch [3/100] - Loss: 2.6416 Acc: 31.58% - Time: 11.14s
Epoch [4/100] - Loss: 2.2286 Acc: 40.23% - Time: 11.27s
Epoch [5/100] - Loss: 1.9413 Acc: 46.95% - Time: 11.29s
Epoch [6/100] - Loss: 1.7314 Acc: 51.43% - Time: 11.26s
Epoch [7/100] - Loss: 1.5736 Acc: 55.71% - Time: 11.30s
Epoch [8/100] - Loss: 1.4501 Acc: 58.63% - Time: 11.40s
Epoch [9/100] - Loss: 1.3354 Acc: 61.35% - Time: 11.37s
Epoch [10/100] - Loss: 1.2450 Acc: 63.67% - Time: 11.28s
Epoch [11/100] - Loss: 1.1729 Acc: 65.66% - Time: 11.24s
Epoch [12/100] - Loss: 1.0962 Acc: 67.74% - Time: 11.23s
Epoch [13/100] - Loss: 1.0375 Acc: 69.19% - Time: 11.24s
Epoch [14/100] - Loss: 0.9753 Acc: 70.90% - Time: 11.24s
Epoch [15/100] - Loss: 0.

二 做一些测试

In [None]:
import pickle

CALIB_SIZE = 1000
TEST_SIZE = 1000 
BATCH_SIZE = 128

DATASETS = ['cifar10', 'cifar100']
MODELS = ['resnet18', 'resnet34', 'resnet50']
METHODS = ['likelihood', 'cumulative']

DATASET = 'cifar10' # 'cifar100'

def create_calib_loader(dataset_name, split_indices_file, batch_size=128):
    """
    Create a calibration data loader based on the saved index file

    Parameters:
        dataset_name: Name of the dataset ('cifar10' or 'cifar100')
        split_indices_file: Path to the saved index file
        batch_size: Batch size of the data loader

    Returns:
    calib_loader: Calibration data loader
    """
    split_indices = np.load(split_indices_file, allow_pickle=True).item()
    calib_idx = split_indices['calib_idx']
    
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    
    if dataset_name == 'cifar10':
        full_train_set = torchvision.datasets.CIFAR10(
            root='./data', train=True, download=True, transform=transform_test)
    else:  # cifar100
        full_train_set = torchvision.datasets.CIFAR100(
            root='./data', train=True, download=True, transform=transform_test)
    
    calib_set = Subset(full_train_set, calib_idx)
    
    calib_loader = DataLoader(
        calib_set, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=2
    )
    
    print(f"Created calibration loader for {dataset_name}")
    print(f"  Calibration set size: {len(calib_set)} samples")
    print(f"  Batch size: {batch_size}")
    
    return calib_loader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for model_name in MODELS:
    print(f"\n{'='*50}")
    print(f"Processing model: {model_name}")
    print(f"{'='*50}")

    data = CIFARData(dataset=DATASET, calib_size=CALIB_SIZE, test_size=TEST_SIZE, batch_size=BATCH_SIZE)
    num_classes = data.num_classes

    model_path = f"{model_name}_{DATASET}_0.pth"
    split_indices_file = f'{DATASET}_split_indices_0.npy'
    
    calib_loader = create_calib_loader(DATASET, split_indices_file)
    test_loader = data.test_loader

    model_path = f"{model_name}_{DATASET}_0.pth"

    if model_name == 'resnet18':
        model = ResNet18(num_classes=num_classes)
    elif model_name == 'resnet34':
        model = ResNet34(num_classes=num_classes)
    elif model_name == 'resnet50':
        model = ResNet50(num_classes=num_classes)

    state_dict = torch.load(model_path, weights_only=False)
    model.load_state_dict(state_dict)
    print(f"Loaded state_dict from {model_path}")

    model = model.to(device)
    model.eval()
    
    for method in METHODS:
        cp = ConformalPredictor(model, calib_loader=calib_loader, num_classes=10)
        
        print("\nCalibrating model...")
        cp.calibrate(method=method)
        
        print("\nEvaluating model on test set...")
        save_path = f"{model_name}_{DATASET}_{method}_prediction_set_0.csv"
        results = cp.evaluate(test_loader, method=method, save_path=save_path)
        
        results_path = f"{model_name}_{DATASET}_{method}_full_results_0.pkl"
        with open(results_path, 'wb') as f:
            pickle.dump(results, f)
        print(f"Full results saved to {results_path}")



Processing model: resnet18
Dataset: cifar10

Dataset sizes for CIFAR10:
  Training set: 49000 samples
  Calibration set: 1000 samples
  Test set: 10000 samples
Created calibration loader for cifar10
  Calibration set size: 1000 samples
  Batch size: 128
Loaded state_dict from resnet18_cifar10_0.pth

Calibrating model...
Calibration method: likelihood
Scores statistics (likelihood):
  Min: 0.000001, Max: 0.999972
  Mean: 0.065806, Median: 0.000448
  10th percentile: 0.000065
  90th percentile: 0.119974
Calibration complete - Quantile: 0.9297

Evaluating model on test set...
Prediction sets saved to: resnet18_cifar10_likelihood_prediction_set_0.csv
Coverage: 97.21% | Avg Set Size: 1.09 | Accuracy: 94.56% | Avg Loss: 0.2108
Full results saved to resnet18_cifar10_likelihood_full_results_0.pkl

Calibrating model...
Calibration method: cumulative
Scores statistics (cumulative):
  Min: 0.460271, Max: 1.000000
  Mean: 0.996258, Median: 0.999954
  10th percentile: 0.999002
  90th percentile: 1