In [1]:
#coding:utf-8
from __future__ import print_function
import sys
sys.path.append("..")
import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.autograd import Variable
import torch.optim as optim
from torchvision import datasets, transforms
from models.resnet import *
from models.vggnet import *
from models.mynet import *
from logic_units import *
import numpy as np
from tqdm import tqdm
import time
import pandas as pd

In [2]:
class NeuronsActivate:
    def __init__(self, model,data,threshold):
        self.model = model
        self.data = data
        self.threshold = threshold
    def get_neurons_activate(self,target_layer):
        sample_num=self.data.shape[0] #样本个数
        neurons_activate_dict=torch.zeros(sample_num,1).to(device)
        layer_dict = self.get_model_layers()
        for layer, module in layer_dict.items():
            #print('ceng:',layer,module)
            if layer==target_layer:
                outputs = torch.squeeze(self.extract_outputs(module))
                scaled_outputs = self.scale(outputs)
                sample_layer_outputs=scaled_outputs.view(sample_num,-1)  #sample_layer_outputs表示所有样本的某层输出--神经元激活值
                activation=torch.gt(sample_layer_outputs, self.threshold)  #大于门限则激活
                neurons_activate_dict=torch.cat([neurons_activate_dict, activation], dim=1)
        return neurons_activate_dict.detach().cpu().numpy()
    def step_through_model(self, model,prefix=''):
        for name, module in model.named_children():
            path = '{}/{}'.format(prefix, name)
            if (isinstance(module, nn.Conv1d)
                or isinstance(module, nn.Conv2d)
                or isinstance(module, nn.Linear)): # test for dataset
                yield (path, name, module)
            else:
                yield from self.step_through_model(module, path)
    def get_model_layers(self, cross_section_size=0):
        layer_dict = {}
        i = 0
        for (path, name, module) in self.step_through_model(self.model):
            layer_dict[str(i) + path] = module
            i += 1
        if cross_section_size > 0:
            target_layers = list(layer_dict)[0::cross_section_size] 
            layer_dict = { target_layer: layer_dict[target_layer] for target_layer in target_layers }
        return layer_dict

    def scale(self, out, rmax=1, rmin=0):
        output_std = (out - out.min()) / (out.max() - out.min())
        output_scaled = output_std * (rmax - rmin) + rmin
        return output_scaled

    def extract_outputs(self,module, force_relu=True):
        outputs = []      
        def hook(module, input, output):
            if force_relu:
                outputs.append(torch.relu(output))   
            else:
                outputs.append(output)
        handle = module.register_forward_hook(hook)     
        self.model(self.data)
        handle.remove()
        return torch.stack(outputs)

In [3]:
## DSC
class DSC(object):
    def __init__(self, train, label, model, layers, u=2, k_bins=1000, threshold=10 ** -5):
        '''
        train:训练集数据
        label:训练集的标签
        model:输入模型
        layers:输出张量层
        std : 方差筛选
        u : 上界
        k_bins: 分割段数
        threshold: 阈值筛选
        '''
        self.train = train
        self.model = model
        self.layers = layers
        self.lst = []
        self.std_lst = []
        self.mask = []
        self.neuron_activate_train = []
        index_lst = []
        self.u = u
        self.k_bins = k_bins
        self.threshold = threshold
        self.test_score = []

        batch_size=128
        datalist=torch.split(train, batch_size, dim=0)
        neurons_activate=[]
        for data_batch in datalist:         
            na=NeuronsActivate(model,data_batch,0.0)
            batch_neurons_activate=na.get_neurons_activate(self.layers)
            #upper = (batch_neurons_activate > 0.0)  # nc准则：大于门限就是激活
            #batch_coverage = np.sum(upper,axis=1)  # 统计激活了的神经元的个数
            neurons_activate.append(batch_neurons_activate)
        
        self.neuron_activate_train = np.concatenate(neurons_activate, axis=0)
        print('neuron_train',self.neuron_activate_train.shape)
        self.train_label = label.cpu().detach().numpy()
        self.lst = list(zip(index_lst, self.lst))

    def find_closest_at(self, at, train_ats):
        dist = np.linalg.norm(at - train_ats, axis=1)  # 二范数值
        return (min(dist), train_ats[np.argmin(dist)])  # 找到二范数值最近的,同时把结果返回去

    def fit(self, test, label):
        time_limit = 43200
        start = time.time()
        self.neuron_activate_test = []
        
        batch_size=128
        datalist=torch.split(test, batch_size, dim=0)
        neurons_activate=[]
        for data_batch in datalist:         
            na=NeuronsActivate(model,data_batch,0.0)
            batch_neurons_activate=na.get_neurons_activate(self.layers)
            #upper = (batch_neurons_activate > 0.0)  # nc准则：大于门限就是激活
            #batch_coverage = np.sum(upper,axis=1)  # 统计激活了的神经元的个数
            neurons_activate.append(batch_neurons_activate)
        
        self.neuron_activate_test = np.concatenate(neurons_activate, axis=0)

        class_matrix = {}
        all_idx = []
        for i, lb in enumerate(self.train_label):
            if lb not in class_matrix:
                class_matrix[lb] = []
            class_matrix[lb].append(i)
            all_idx.append(i)
        # print(class_matrix)

        # time_limit = 10

        # dsa代码  这里也写错了,我们的代码没有找新的参考点,而是还是用的测试集
        for test_sample, label_sample in tqdm(zip(self.neuron_activate_test, label)):
            end = time.time()
            if end - start >= time_limit:
                print("=======================time limit=======================")
                return None
            # print("剩余时间: {}".format(time_limit - (end - start)))
            x = self.neuron_activate_train[class_matrix[label_sample]]
            a_dist, a_dot = self.find_closest_at(test_sample, x)
            y = self.neuron_activate_train[list(set(all_idx) - set(class_matrix[label_sample]))]
            b_dist, _ = self.find_closest_at(
                a_dot, y
            )  # 求出最近的距离值
            self.test_score.append(a_dist / b_dist)

        # for test_sample, label_sample in tqdm(zip(self.neuron_activate_test, label)):
        #     dist_a = np.min(
        #         ((self.neuron_activate_train[self.train_label ==   label_sample, :] - test_sample) ** 2).sum(axis=1))
        #     dist_b = np.min(
        #         ((self.neuron_activate_train[self.train_label != label_sample, :] - test_sample) ** 2).sum(axis=1))
        #     self.test_score.append(dist_a / dist_b)
        bins = np.linspace(np.amin(self.test_score), self.u, self.k_bins)
        x = np.unique(np.digitize(self.test_score, bins))
        rate = len(np.unique(x)) / float(self.k_bins)
        return rate

    def get_sore(self):
        return self.test_score

    # def get_rate(self, u=None, k_bins=None, auto=False):
    #     if auto:
    #         self.u = np.max(np.array(self.test_score))  # 将u中最大值设置为上界
    #     else:
    #         self.u = u
    #     bins = np.linspace(np.amin(self.test_score), self.u, self.k_bins)
    #     x = np.unique(np.digitize(self.test_score, bins))
    #     rate = len(np.unique(x)) / float(k_bins)
    #     return rate

    def get_u(self):
        return self.u

    def rank_2(self):
        return np.argsort(self.get_sore())[::-1]  # 由大到小排序

    def rank_fast(self):
        bins = np.linspace(np.amin(self.test_score), self.u, self.k_bins)
        score_bin = np.digitize(self.test_score, bins)
        score_bin_uni = np.unique(score_bin)
        res_idx_arr = []
        for x in score_bin_uni:
            np.random.seed(41)
            idx_arr = np.argwhere(score_bin == x).flatten()
            idx = np.random.choice(idx_arr)
            res_idx_arr.append(idx)
        return res_idx_arr

In [4]:
def error_level(pred_test_prob,true_test):#新增评价指标：严重性指标     
    error_level=[]
    pred_test_sort=np.argsort(-pred_test_prob, axis=1)
    for i in range(len(pred_test_prob)):
        if pred_test_sort[i][0]==true_test[i]:
            error_level.append(0)
        elif pred_test_sort[i][1]==true_test[i]:
            error_level.append(5)
        elif pred_test_sort[i][2]==true_test[i]:
            error_level.append(10)
        else:
            error_level.append(100)
    return error_level

In [5]:
model = vgg16_bn().to(device)
model_name='vgg16'
dataset_name='cifar'            

model.load_state_dict(torch.load('../adv_train/model-vgg16-cifar10/Standard-cifar10-model-vgg16-epoch300.pt'))
model.eval()
images=torch.load('images_of_TestCaseSet_vgg16_cifar10.pt')
labels=torch.load('labels_of_TestCaseSet_vgg16_cifar10.pt')
data=images
true_test=labels

coverage="DSC"
index=-1

# 初始化变量
k_bins = 1000

# 计算覆盖
layers='12/features/40'

batch_size=128
datalist=torch.split(data, batch_size, dim=0)

pred_test_prob=[]
for data_batch in datalist:
    output=model(data_batch.to(device))
    prob = F.softmax(output)
    pred_one=prob.cpu().detach()
    pred_test_prob.append(pred_one)
pred_test_prob=torch.cat(pred_test_prob,dim=0)
pred_test_prob=pred_test_prob.numpy()
pred_test=np.argmax(pred_test_prob, axis=1)

val_dataset = datasets.CIFAR10(root='../dataset/data', train=False,download=False, transform=transforms.ToTensor())
val_dataloader = DataLoader(val_dataset, batch_size=3000, shuffle=True)
image_iter = iter(val_dataloader)
train, Y_train = image_iter.next()
train, Y_train=train.to(device),Y_train.to(device)

# print("test len")
# print(len(test))
metrics = None
rate = None
rank_lst_time = None
rank_lst2_time = None
st = time.time()  # 计算排序时间
if coverage == "DSC":
    print("dsc..")
    metrics = DSC(train, Y_train, model, layers, k_bins=k_bins)
    print("dsc..  fit...")
    rate = metrics.fit(data, pred_test)
    #  model.get_rate()  # 获得覆盖率
en = time.time()
pre_time = st - en
start = time.time()
rank_lst2 = metrics.rank_2()
end = time.time()
rank_lst2_time = start - end + pre_time
start = time.time()
rank_lst = metrics.rank_fast()
end = time.time()
rank_lst_time = end - start + pre_time
# score = model.get_sore()  # 获得分数
u = metrics.get_u()  # 获得上界
# 构造结果
df = pd.DataFrame([])
# df["LSA"] = score
# print(pred_test)

true_test=true_test.cpu().numpy()

df['right'] = (pred_test == true_test).astype('int')  # right
df['cam'] = 0
df['cam'].loc[rank_lst] = list(range(1, len(rank_lst) + 1))  # cam
df['ctm'] = 0
df['ctm'].loc[rank_lst2] = list(range(1, len(rank_lst2) + 1))  # ctm
df['rate'] = rate  # tate
df['cam_time'] = rank_lst_time
df['ctm_time'] = rank_lst2_time

df['error_level']=error_level(pred_test_prob,true_test)

if rate is None:
    df["overtime"] = 1
# 数据集_覆盖方法_分箱_上界_选择的层数



dsc..
neuron_train (3000, 2049)
dsc..  fit...


16640it [07:41, 36.04it/s]


In [6]:
#df.to_csv('./all_output/output_cifar/{}/{}_dsc_k_{}_u_{}.csv'.format('vgg16', 'cifar', k_bins, u))
df.to_csv('./all_output/output_cifar/{}/{}_dsc_0.csv'.format('vgg16', 'cifar',0.0))