In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import numpy as np
from sklearn.metrics import roc_auc_score, average_precision_score, roc_curve

batch_size = 128
epochs = 10
lr = 1e-3
mc_samples = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(0)
np.random.seed(0)



In [27]:
class CNN(nn.Module):
    def __init__(self, dropout_p=0.3, num_classes=10):
        super().__init__()

        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(p=dropout_p)

        self.fc1 = nn.Linear(128 * 4 * 4, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.react_threshold = 3.5

    def forward(self, x, react=False):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.dropout(x)

        x = self.pool(F.relu(self.conv2(x)))
        x = self.dropout(x)

        x = self.pool(F.relu(self.conv3(x)))
        x = self.dropout(x)

        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        if react and self.react_threshold:
            x = torch.clamp(x, max=self.react_threshold)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [3]:
transform_cifar = transforms.Compose([
    transforms.ToTensor()
])

transform_mnist = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.expand(3, -1, -1))
])

train_id = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_cifar)
test_id = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_cifar)
test_ood = datasets.MNIST(root='./data', train=False, download=True, transform=transform_mnist)

train_id_loader = DataLoader(train_id, batch_size=batch_size, shuffle=True)
test_id_loader = DataLoader(test_id, batch_size=batch_size, shuffle=False)
test_ood_loader = DataLoader(test_ood, batch_size=batch_size, shuffle=False)

In [4]:
def train(model, train_loader, epochs, lr):
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_function = nn.CrossEntropyLoss()

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0.0
        correct_samples = 0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            optimizer.zero_grad()

            logits = model(x)
            loss = loss_function(logits, y)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = logits.argmax(dim=1)

            correct_samples += (preds == y).sum().item()

        print(f'Epoch {epoch}; Train loss {total_loss / len(train_loader)}; Accuracy {correct_samples / len(train_loader) * 100}')

In [5]:
def compute_ood_metrics(id_scores, ood_scores):
    y_true = np.concatenate([
        np.zeros_like(id_scores),
        np.ones_like(ood_scores)
    ])
    scores = np.concatenate([id_scores, ood_scores])

    auroc = roc_auc_score(y_true, scores)
    aupr = average_precision_score(y_true, scores)

    fpr, tpr, _ = roc_curve(y_true, scores)
    target_tpr = 0.95
    idxs = np.where(tpr >= target_tpr)[0]
    if len(idxs) > 0:
        fpr95 = fpr[idxs[0]]
    else:
        fpr95 = 1.0

    print(f'AUROC {auroc}')
    print(f'AUPR {aupr}')
    print(f'FPR@95%TPR {fpr95}')

    return auroc, aupr, fpr95

In [31]:
def get_softmax_ood_scores(model, id_loader, ood_loader):
    model.to(device)
    model.eval()

    id_scores = []
    ood_scores = []

    with torch.no_grad():
        for x, _ in id_loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            max_probs, _ = probs.max(dim=1)
            scores = 1.0 - max_probs

            id_scores.append(scores.cpu().numpy())

    with torch.no_grad():
        for x, _ in ood_loader:
            x = x.to(device)
            logits = model(x)
            probs = F.softmax(logits, dim=1)
            max_probs, _ = probs.max(dim=1)
            scores = 1.0 - max_probs

            ood_scores.append(scores.cpu().numpy())

    id_scores = np.concatenate(id_scores)
    ood_scores = np.concatenate(ood_scores)

    return id_scores, ood_scores


def get_mcd_ood_entropy(model, x, T=20):
    model.to(device)
    model.train()

    with torch.no_grad():
        probs_T = []
        for _ in range(T):
            logits = model(x)
            probs = F.softmax(logits, dim=1)

            probs_T.append(probs.unsqueeze(0))

        probs_T = torch.cat(probs_T, dim=0)

    p_mean = probs_T.mean(dim=0)

    eps = 1e-8
    entropy = -torch.sum(p_mean * torch.log(p_mean + eps), dim=1)

    return entropy


def get_mcd_ood_scores(model, id_loader, ood_loader, T=20):
    model.to(device)

    id_scores = []
    ood_scores = []

    for x, _ in id_loader:
        x = x.to(device)
        entropy = get_mcd_ood_entropy(model, x, T=T)

        id_scores.append(entropy.cpu().numpy())

    for x, _ in ood_loader:
        x = x.to(device)
        entropy = get_mcd_ood_entropy(model, x, T=T)

        ood_scores.append(entropy.cpu().numpy())

    id_scores = np.concatenate(id_scores)
    ood_scores = np.concatenate(ood_scores)

    return id_scores, ood_scores

In [32]:
model = CNN(dropout_p=0.3, num_classes=10)
train(model, train_id_loader, epochs=epochs, lr=lr)

Epoch 1; Train loss 1.7194243050597209; Accuracy 4725.063938618926
Epoch 2; Train loss 1.3840523006970926; Accuracy 6378.7723785166245
Epoch 3; Train loss 1.2548502627236153; Accuracy 7027.109974424552
Epoch 4; Train loss 1.150143935399897; Accuracy 7533.248081841432
Epoch 5; Train loss 1.0783916948091647; Accuracy 7903.324808184144
Epoch 6; Train loss 1.0121265161982582; Accuracy 8197.442455242966
Epoch 7; Train loss 0.9643249353179542; Accuracy 8413.299232736572
Epoch 8; Train loss 0.9225511217056332; Accuracy 8612.27621483376
Epoch 9; Train loss 0.8867015358432174; Accuracy 8805.882352941177
Epoch 10; Train loss 0.860364586953312; Accuracy 8885.677749360613


In [33]:
softmax_id_scores, softmax_ood_scores = get_softmax_ood_scores(model, test_id_loader, test_ood_loader)


In [34]:
auroc, aupr, fpr95 = compute_ood_metrics(softmax_id_scores, softmax_ood_scores)

AUROC 0.535232375
AUPR 0.5079264969001228
FPR@95%TPR 0.9259


In [35]:
mcd_id_scores, mcd_ood_scores = get_mcd_ood_scores(model, test_id_loader, test_ood_loader)

In [36]:
mcd_auroc, mcd_aupr, mcd_fpr95 = compute_ood_metrics(mcd_id_scores, mcd_ood_scores)

AUROC 0.62703765
AUPR 0.5587726766030782
FPR@95%TPR 0.8202


# Пишем ReAct

In [37]:
def get_react_ood_scores(model, id_loader, ood_loader):
    model.eval()

    id_scores = []
    with torch.no_grad():
        for x, _ in id_loader:
            x = x.to(device)
            logits = model(x, react=True)
            probs = F.softmax(logits, dim=1)
            score = probs.max(dim=1)[0]
            id_scores.append(score.cpu())
    
    ood_scores = []
    with torch.no_grad():
        for x, _ in ood_loader:
            x = x.to(device)
            logits = model(x, react=True)
            probs = F.softmax(logits, dim=1)
            score = probs.max(dim=1)[0]
            ood_scores.append(score.cpu())
    
    id_scores = np.concatenate(id_scores)
    ood_scores = np.concatenate(ood_scores)
    return id_scores, ood_scores

    

In [38]:
react_id_scores, react_ood_scores = get_react_ood_scores(model, test_id_loader, test_ood_loader)

In [39]:
react_auroc, react_aupr, react_fpr95 = compute_ood_metrics(react_id_scores, react_ood_scores)

AUROC 0.41797458499999995
AUPR 0.4257946028018784
FPR@95%TPR 0.9519
