In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import numpy as np
from sklearn.metrics import roc_auc_score, average_precision_score, roc_curve

batch_size = 128
epochs = 10
lr = 1e-3
mc_samples = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(0)
np.random.seed(0)



In [2]:
class CNN(nn.Module):
    def __init__(self, dropout_p=0.3, num_classes=10):
        super().__init__()

        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(p=dropout_p)

        self.fc1 = nn.Linear(128 * 4 * 4, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward_features(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.dropout(x)

        x = self.pool(F.relu(self.conv2(x)))
        x = self.dropout(x)

        x = self.pool(F.relu(self.conv3(x)))
        x = self.dropout(x)

        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)

        return x          # это h(x)

    def forward(self, x):
        h = self.forward_features(x)  # фичи
        x = self.fc2(h)               # логиты
        return x


In [3]:
def estimate_react_threshold(model, loader, quantile=0.90):
    model.to(device)
    model.eval()
    feats_list = []

    with torch.no_grad():
        for x, _ in loader:
            x = x.to(device)
            h = model.forward_features(x)        # [B, D]
            feats_list.append(h.view(-1).cpu())

    all_feats = torch.cat(feats_list, dim=0)     # [N * D]
    c = torch.quantile(all_feats, quantile)
    print(f"ReAct threshold c (q={quantile}): {c.item():.4f}")
    return c


In [4]:
def get_react_ood_scores(model, id_loader, ood_loader, c):
    model.to(device)
    model.eval()

    def compute_scores(loader):
        scores = []
        with torch.no_grad():
            for x, _ in loader:
                x = x.to(device)
                h = model.forward_features(x)
                h_clipped = torch.clamp(h, max=c)
                logits = model.fc2(h_clipped)
                probs = F.softmax(logits, dim=1)
                max_probs, _ = probs.max(dim=1)
                score = 1.0 - max_probs
                scores.append(score.cpu().numpy())
        return np.concatenate(scores)

    id_scores = compute_scores(id_loader)
    ood_scores = compute_scores(ood_loader)
    return id_scores, ood_scores


In [5]:
transform_cifar = transforms.Compose([
    transforms.ToTensor()
])

transform_mnist = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.expand(3, -1, -1))
])

train_id = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_cifar)
test_id = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_cifar)
test_ood = datasets.MNIST(root='./data', train=False, download=True, transform=transform_mnist)

train_id_loader = DataLoader(train_id, batch_size=batch_size, shuffle=True)
test_id_loader = DataLoader(test_id, batch_size=batch_size, shuffle=False)
test_ood_loader = DataLoader(test_ood, batch_size=batch_size, shuffle=False)

In [6]:
def train(model, train_loader, epochs, lr):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_function = nn.CrossEntropyLoss()

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0.0
        correct_samples = 0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            optimizer.zero_grad()

            logits = model(x)
            loss = loss_function(logits, y)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = logits.argmax(dim=1)

            correct_samples += (preds == y).sum().item()

        print(f'Epoch {epoch}; Train loss {total_loss / len(train_loader)}; Accuracy {correct_samples / len(train_loader) * 100}')

In [7]:
def compute_ood_metrics(id_scores, ood_scores):
    y_true = np.concatenate([
        np.zeros_like(id_scores),
        np.ones_like(ood_scores)
    ])
    scores = np.concatenate([id_scores, ood_scores])

    auroc = roc_auc_score(y_true, scores)
    aupr = average_precision_score(y_true, scores)

    fpr, tpr, _ = roc_curve(y_true, scores)
    target_tpr = 0.95
    idxs = np.where(tpr >= target_tpr)[0]
    if len(idxs) > 0:
        fpr95 = fpr[idxs[0]]
    else:
        fpr95 = 1.0

    print(f'AUROC {auroc}')
    print(f'AUPR {aupr}')
    print(f'FPR@95%TPR {fpr95}')

    return auroc, aupr, fpr95

In [8]:
def get_softmax_ood_scores(model, id_loader, ood_loader):
    model.to(device)
    model.eval()

    id_scores = []
    ood_scores = []

    with torch.no_grad():
        for x, _ in id_loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            max_probs, _ = probs.max(dim=1)
            scores = 1.0 - max_probs

            id_scores.append(scores.cpu().numpy())

    with torch.no_grad():
        for x, _ in ood_loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            max_probs, _ = probs.max(dim=1)
            scores = 1.0 - max_probs

            ood_scores.append(scores.cpu().numpy())

    id_scores = np.concatenate(id_scores)
    ood_scores = np.concatenate(ood_scores)

    return id_scores, ood_scores


def get_mcd_ood_entropy(model, x, T=20):
    model.to(device)
    model.train()

    with torch.no_grad():
        probs_T = []
        for _ in range(T):
            logits = model(x)
            probs = F.softmax(logits, dim=1)

            probs_T.append(probs.unsqueeze(0))

        probs_T = torch.cat(probs_T, dim=0)

    p_mean = probs_T.mean(dim=0)

    eps = 1e-8
    entropy = -torch.sum(p_mean * torch.log(p_mean + eps), dim=1)

    return entropy


def get_mcd_ood_scores(model, id_loader, ood_loader, T=20):
    model.to(device)

    id_scores = []
    ood_scores = []

    for x, _ in id_loader:
        x = x.to(device)
        entropy = get_mcd_ood_entropy(model, x, T=T)

        id_scores.append(entropy.cpu().numpy())

    for x, _ in ood_loader:
        x = x.to(device)
        entropy = get_mcd_ood_entropy(model, x, T=T)

        ood_scores.append(entropy.cpu().numpy())

    id_scores = np.concatenate(id_scores)
    ood_scores = np.concatenate(ood_scores)

    return id_scores, ood_scores

In [9]:
model = CNN(dropout_p=0.3, num_classes=10)
train(model, train_id_loader, epochs=epochs, lr=lr)

Epoch 1; Train loss 1.7093703880944215; Accuracy 4750.383631713556
Epoch 2; Train loss 1.3515172184580733; Accuracy 6509.20716112532
Epoch 3; Train loss 1.2133603810959155; Accuracy 7266.496163682865
Epoch 4; Train loss 1.1161006222600522; Accuracy 7686.95652173913
Epoch 5; Train loss 1.0377757351111878; Accuracy 8082.352941176471
Epoch 6; Train loss 0.9869607876023978; Accuracy 8315.856777493605
Epoch 7; Train loss 0.9463881917316895; Accuracy 8500.511508951406
Epoch 8; Train loss 0.9021060236579622; Accuracy 8729.15601023018
Epoch 9; Train loss 0.8690294059341216; Accuracy 8866.751918158567
Epoch 10; Train loss 0.843890978704633; Accuracy 8976.470588235294


In [10]:
softmax_id_scores, softmax_ood_scores = get_softmax_ood_scores(model, test_id_loader, test_ood_loader)


In [11]:
auroc, aupr, fpr95 = compute_ood_metrics(softmax_id_scores, softmax_ood_scores)

AUROC 0.633756595
AUPR 0.5624677300582986
FPR@95%TPR 0.7316


In [10]:
mcd_id_scores, mcd_ood_scores = get_mcd_ood_scores(model, test_id_loader, test_ood_loader)

In [11]:
mcd_auroc, mcd_aupr, mcd_fpr95 = compute_ood_metrics(mcd_id_scores, mcd_ood_scores)

AUROC 0.7381110450000001
AUPR 0.6358088271053212
FPR@95%TPR 0.5738


In [12]:
# 1) считаем порог ReAct по train ID (CIFAR-10)
c = estimate_react_threshold(model, train_id_loader, quantile=0.90)

# 2) считаем OOD-скоры для ReAct
react_id_scores, react_ood_scores = get_react_ood_scores(model, test_id_loader, test_ood_loader, c)

# 3) метрики OOD
react_auroc, react_aupr, react_fpr95 = compute_ood_metrics(react_id_scores, react_ood_scores)

print("ReAct AUROC:", react_auroc)
print("ReAct AUPR:", react_aupr)
print("ReAct FPR@95%TPR:", react_fpr95)


ReAct threshold c (q=0.9): 1.0540
AUROC 0.846440435
AUPR 0.8093378825336707
FPR@95%TPR 0.4423
ReAct AUROC: 0.846440435
ReAct AUPR: 0.8093378825336707
ReAct FPR@95%TPR: 0.4423
