此部分为Dataset Inference部分代码的解构

# Dataset Inference
Dataset Inference的核心思想是

## Embedding Generation

In [None]:
import sys
import time

sys.path.append('.')
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
from tqdm.notebook import tqdm
import os
from funcs import load_dataset
from funcs import test
from funcs import get_student_teacher

In [None]:
import argparse
parse = argparse.ArgumentParser()
parse.add_argument('--dataset_path', type=str, default='', help="dataset path")
parse.add_argument('--batch_size', type=int, default=100)
parse.add_argument('--epoch', type=int, default=20)
parse.add_argument('--device', type=str, default='cuda')
parse.add_argument('--model_root', type=str, default='')
parse.add_argument('--num_classes', type=int, default=10)
parse.add_argument('--distance', type=str, default=None)
parse.add_argument('--file_dir', type=str, default='',help="格式为./feature/数据库名/模型的编号或名称/")
parse.add_argument('--dataset',type=str, default='CIFAR10',choices=['CIFAR10','self'])
parse.add_argument('--mode', type=str, default='teacher')
parse.add_argument('--normalize', type=int, default=1)
args = parse.parse_args(args=['--dataset_path','',
                              '--batch_size', '100',
                              '--epoch', '20',
                              '--device', 'cuda',
                              '--model_root', './trained/CIFAR10/test/final.pt',
                              '--num_classes', '10',
                              '--distance', 'None',
                              '--file_dir', './feature/CIFAR10/test/',
                              '--dataset', 'CIFAR10',
                              '--normalize', '1',
                              '--mode', 'teacher',
                              ])
print(args)

In [None]:
def rand_steps(model, X, y, args, target=None):
    # 盲步
    # X 是一个sample
    # y 是X所对应的标签
    # target 感觉没啥意义
    del target
    start = time.time()
    is_training = model.training
    model.eval()

    # 定义噪声
    uni, std, scale = (0.005, 0.005, 0.01)
    steps = 50
    # 设定随机的步数
    noise_2 = lambda X: torch.normal(0, std, size=X.shape).cuda()
    noise_1 = lambda X: torch.from_numpy(np.random.laplace(loc=0.0, scale=scale, size=X.shape)).float().to(args.device)
    noise_inf = lambda X: torch.empty_like(X).uniform_(-uni, uni)
    noise_map = {"l1": noise_1, "l2": noise_2, "linf": noise_inf}
    mag = 1
    # mag 表示步进的程度，mag逐渐加大

    delta = noise_map[args.distance](X)
    delta_base = delta.clone()
    delta.data = torch.min(torch.max(delta.detach(), -X), 1 - X)
    loss = 0
    with torch.no_grad():
        for t in range(steps):
            if t > 0:
                preds = model(X_r + delta_r)
                # X_r + delta_r 表示施加噪声后预测仍然为y的点
                new_remaining = (preds.max(1)[1] == y[remaining])
                remaining_temp = remaining.clone()
                remaining[remaining_temp] = new_remaining
                # 更新remaining状态
            else:
                preds = model(X + delta)
                # preds 表示从model中得到此时X+噪声后的预测分布
                remaining = (preds.max(1)[1] == y)
                # remaining 表示预测概率的第一维中最大值的索引是否为y，即预测未发生改变的点的索引

            if remaining.sum() == 0: break
            # 当remaining中所有预测都与y不同，则表示转换完成，结束rand_step

            X_r = X[remaining]
            # X_r 表示X中仍然预测为y的点
            delta_r = delta[remaining]
            # delta[remaining] 表示在X_r处的噪声
            preds = model(X_r + delta_r)
            mag += 1
            delta_r = delta_base[remaining] * mag
            # 加深预测未发生改变处的噪声
            delta_r.data = torch.min(torch.max(delta_r.detach(), -X_r), 1 - X_r)
            # 截取 X+delta_r[remaining] 至 [0, 1]
            delta[remaining] = delta_r.detach()
            # delta与delta_r共享内存
        # print(
        #    f"Number of steps = {t + 1} | Failed to convert = {(model(X + delta).max(1)[1] == y).sum().item()} | Time taken = {time.time() - start}")
        # 输出结果，Failed to convert表示在施加噪声的最大值后也未改变预测的点的个数
    if is_training:
        model.train()
    return delta


In [None]:
def norms(Z):
    return Z.view(Z.shape[0], -1).norm(dim=1)[:, None, None, None]

def norms_linf_squeezed(Z):
    return Z.view(Z.shape[0], -1).abs().max(dim=1)[0]

def norms_l1_squeezed(Z):
    return Z.view(Z.shape[0], -1).abs().sum(dim=1)[:, None, None, None].squeeze(1).squeeze(1).squeeze(1)

def norms_l2_squeezed(Z):
    return norms(Z).squeeze(1).squeeze(1).squeeze(1)

In [None]:
def get_random_label_only(args, loader, model, num_images = 1000):
    print("getting random attacks")
    batch_size = args.batch_size
    max_iter = num_images/batch_size
    # max_iter
    lp_dist = [[],[],[]]
    # lp_dist 表示三种范数计算下的distance
    ex_skipped = 0
    for i, batch in enumerate(loader):
        # if args.regressor_embed == 1: ##We need an extra set of `distinct images for training the confidence regressor
        #    if(ex_skipped < num_images):
        #        y = batch[1]
        #        ex_skipped += y.shape[0]
        #        continue
        # 原论文中还有上面这部分代码，但我不是很理解作用
        for j, distance in enumerate(["linf", "l2", "l1"]):
            temp_list = []
            for target_i in range(10):
                # 反复计算以增加鲁棒性
                X, y = batch[0].to(args.device), batch[1].to(args.device)
                args.distance = distance
                # 此处distance为None
                preds = model(X)
                targets = None
                delta = rand_steps(model, X, y, args, target = targets)
                # delta 为将sample X中一点的预测发生改变时所需要的最小的噪声
                yp = model(X + delta)
                distance_dict = {"linf": norms_linf_squeezed, "l1":norms_l1_squeezed, "l2": norms_l2_squeezed}
                distances = distance_dict[distance](delta)
                # 不是很理解这个地方的数学意义
                temp_list.append(distances.cpu().detach().unsqueeze(-1))
            temp_dist = torch.cat(temp_list, dim=1)
            lp_dist[j].append(temp_dist)
        if i+1 >= max_iter:
            break

    lp_d = [torch.cat(lp_dist[i], dim=0).unsqueeze(-1) for i in range(3)]
    full_d = torch.cat(lp_d, dim=-1)
    print(full_d.shape)
    return full_d


In [None]:
def feature_extractor(args):
    print(args)
    train_loader, test_loader = load_dataset(args)
    student, _ = get_student_teacher(args)
    student.train()
    student = student.to(args.device)
    student.load_state_dict(torch.load(args.model_root), strict=False)
    student.eval()
    print(test(student, test_loader, args))

    file_dir = args.file_dir
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    test_d = get_random_label_only(args, test_loader, student)
    print(test_d)

    torch.save(test_d, f"{args.file_dir}test_rand.pt")

    train_d = get_random_label_only(args, train_loader, student)
    print(train_d)
    torch.save(train_d, f"{args.file_dir}train_rand.pt")

In [None]:
feature_extractor(args)

## 通过prediction margin来训练一个二元分类器

In [None]:
import os, sys
import argparse
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from importlib import reload
from tqdm.auto import tqdm

import random

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

In [None]:
import seaborn as sns
import pandas as pd
import os

In [None]:
from scipy.stats import combine_pvalues, ttest_ind_from_stats, ttest_ind
from functools import reduce
from scipy.stats import hmean

In [None]:
split_index = 500
root = args.file_dir
train_root = root + 'train_rand.pt'
test_root = root + 'test_rand.pt'
trains = torch.load(train_root)
tests = torch.load(test_root)

mean_cifar = trains.mean(dim=(0, 1))
std_cifar = trains.std(dim=(0, 1))

trains = (trains - mean_cifar)/std_cifar
tests = (tests - mean_cifar)/std_cifar

f_num = 30
a_num = 30

trains_n = trains.T.reshape(1000, f_num)[:, :a_num]
tests_n = tests.T.reshape(1000, f_num)[:, :a_num]

n_ex = split_index
train = torch.cat((trains_n[:n_ex], tests_n[:n_ex]), dim=0)
y = torch.cat((torch.zeros(n_ex), torch.ones(n_ex)), dim=0)

rand = torch.randperm(y.shape[0])
train = train[rand]
y = y[rand]

In [None]:
model = nn.Sequential(nn.Linear(a_num, 100),
                      nn.ReLU(),
                      nn.Linear(100, 1),
                      nn.Tanh())
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

In [None]:
with tqdm(range(1000)) as pbar:
    for epoch in pbar:
        optimizer.zero_grad()
        inputs = train
        outputs = model(inputs)
        loss = -1 * ((2 * y - 1) * (outputs.squeeze(-1))).mean()
        loss.backward()
        optimizer.step()
        pbar.set_description('loss {}'.format(loss.item()))

## 计算p-value

In [None]:
def get_p(outputs_train, outputs_test):
    # 计算两个样本的p-value
    pred_test = outputs_test[:,0].detach().cpu().numpy()
    pred_train = outputs_train[:,0].detach().cpu().numpy()

    tval, pval = ttest_ind(pred_test, pred_train, alternative="greater", equal_var=False)
    # ttest_ind 计算两个独立分数样本的平均T-test，是对两个独立样本拥有相同平均值的零假设的检验
    # alternative greater：推测第一个样本的分布平均值大于第二个样本的分布平均值
    # equal_var false: 执行 Welch 的 T-test，该检验不假定总体方差相等
    # 详见https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ttest_ind.html#r3566833beaa2-2

    if pval < 0:
        raise Exception(f"p-value={pval}")
    return pval

def get_p_values(num_ex, train, test, k):
    total = train.shape[0]
    sum_p = 0
    p_values = []
    positions_list = []
    for i in range(k):
        positions = torch.randperm(total)[:num_ex]
        # positions 0~total-1个随机排列的前num_ex个数
        p_val = get_p(train[positions], test[positions])
        # p_val 随机计算train和test的 与positions对应的样本之间的 p-val值
        positions_list.append(positions)
        p_values.append(p_val)
    return p_values

def get_fischer(num_ex, train, test, k):
    p_values = get_p_values(num_ex, train, test, k)
    return combine_pvalues(p_values, method="mudholkar_george")[1]

def get_max_p_value(num_ex, train, test, k):
    p_values = get_p_values(num_ex, train, test, k)
    return max(p_values)

In [None]:
import scipy.stats as stats
def print_inference(outputs_train, outputs_test):
    m1, m2 = outputs_test[:,0].mean(), outputs_train[:,0].mean()
    pval = get_p(outputs_train, outputs_test)
    print(f"p-value = {pval} \t| Mean difference = {m1-m2}")

In [None]:
sus_root = "./feature/CIFAR10/test/"
# sus_root 表示可疑模型由Embedding Generation得到的数据地址
sus_train = torch.load(sus_root + 'train_rand.pt')
sus_test = torch.load(sus_root + 'test_rand.pt')

sus_train = (sus_train - mean_cifar)/std_cifar
sus_test = (sus_test - mean_cifar)/std_cifar

sus_train_n = sus_train.T.reshape(1000, f_num)[:, :a_num]
sus_test_n = sus_test.T.reshape(1000, f_num)[:, :a_num]

sus_output_tr = model(sus_train_n)
sus_output_te = model(sus_test_n)

In [None]:
vic_output_tr = model(trains_n)
vic_output_te = model(tests_n)

In [None]:
sus_output_te, sus_output_tr = sus_output_te[split_index:], sus_output_tr[split_index:]
vic_output_te, vic_output_tr = vic_output_te[split_index:], vic_output_tr[split_index:]

In [None]:
print_inference(sus_output_tr, sus_output_te)

In [None]:
print_inference(vic_output_tr, vic_output_te)

In [32]:
print_inference(vic_output_tr, vic_output_te)

p-value = 0.0 	| Mean difference = 1.8227035999298096
