In [1]:
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing as mp

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader, TensorDataset
from torch.autograd import Variable
import torchvision
from torchvision.transforms import transforms

from tqdm.notebook import tqdm
import time
from models import modelA
from train_vgg19 import vgg19
from utils import get_data, load_list, save_list
from utils import normalize as Normalize
import warnings

from sklearn.preprocessing import scale
from sklearn.neighbors import KernelDensity
from sklearn.metrics import roc_curve, auc
import scipy.io as sio
from sklearn.preprocessing import scale
from advertorch.attacks import PGDAttack, GradientSignAttack, LinfBasicIterativeAttack, \
                                    CarliniWagnerL2Attack, DDNL2Attack, SinglePixelAttack, JacobianSaliencyMapAttack
from utils import CW

from sklearn.linear_model import LogisticRegressionCV

device = 'cuda:3' if torch.cuda.is_available() else 'cpu'

ImportError: cannot import name 'normalize'

In [2]:
def flip(x, nb_diff):
    original_shape = x.shape
    x = np.copy(np.reshape(x, (-1,)))
    candidate_inds = np.where(x < 0.99)[0]
    assert candidate_inds.shape[0] >= nb_diff
    inds = np.random.choice(candidate_inds, nb_diff)
    x[inds] = 1.

    return np.reshape(x, original_shape)

In [3]:
def classifier_adv(data):
        return F.log_softmax(model(data))

In [4]:
def get_noisy_samples(X_test, X_test_adv, dataset, attack):
    if attack in ['jsma', 'cw']:
        X_test_noisy = np.zeros_like(X_test)
        for i in range(len(X_test)):
            # Count the number of pixels that are different
            nb_diff = len(np.where(X_test[i] != X_test_adv[i])[0])
            # Randomly flip an equal number of pixels (flip means move to max
            # value of 1)
            X_test_noisy[i] = flip(X_test[i], nb_diff)
    else:
        warnings.warn("Using pre-set Gaussian scale sizes to craft noisy "
                      "samples. If you've altered the eps/eps-iter parameters "
                      "of the attacks used, you'll need to update these. In "
                      "the future, scale sizes will be inferred automatically "
                      "from the adversarial samples.")
        # Add Gaussian noise to the samples
        X_test_noisy = np.minimum(
            np.maximum(
                X_test + np.random.normal(loc=0, scale=STDEVS[dataset][attack],
                                          size=X_test.shape),
                0
            ),
            1
        )

    return X_test_noisy

In [5]:
def get_mc_predictions(model, X, nb_iter=50, batch_size=256):
    model.train()
    if dataset == 'mnist':
        output_dim = list(model.children())[-1].out_features
    else:
        output_dim = list(model.children())[-1][-1].out_features
    get_output = lambda data : F.softmax(model(data))
#     get_output = lambda data : model(data)
    X = torch.Tensor(X).to(device)
    
    def predict():
        n_batches = int(np.ceil(X.shape[0] / float(batch_size)))
        output = np.zeros(shape=(len(X), output_dim))
        for i in range(n_batches):
            output[i * batch_size:(i + 1) * batch_size] = get_output(X[i * batch_size:(i + 1) * batch_size]).detach().cpu().numpy()
        return output

    preds_mc = []
    for i in tqdm(range(nb_iter)):
        preds_mc.append(predict())

    return np.asarray(preds_mc)

In [6]:
def get_deep_representations(model, X, batch_size=256):
    model.eval()
    X = torch.Tensor(X).to(device)
    # mnist modelA last hidden layer 
#     output_dim = model.layers[-4].output.shape[-1].value
    last_hidden_idx = -3
    if dataset == 'mnist':
        output_dim = list(model.children())[last_hidden_idx].out_features
    else:
        output_dim = list(model.children())[-1][last_hidden_idx].out_features

    last_hidden_output = None

    def last_hidden_hook(module, input_, output):
        nonlocal last_hidden_output
        last_hidden_output = output
    
    if dataset == 'mnist':
        list(model.children())[last_hidden_idx].register_forward_hook(last_hidden_hook)
    else:
        list(model.children())[-1][last_hidden_idx+1].register_forward_hook(last_hidden_hook)

    n_batches = int(np.ceil(X.shape[0] / float(batch_size)))
    output = np.zeros(shape=(len(X), output_dim))
    for i in tqdm(range(n_batches)):
#         output[i * batch_size:(i + 1) * batch_size] = get_encoding([X[i * batch_size:(i + 1) * batch_size], 0])[0]
        model(X[i * batch_size:(i + 1) * batch_size])
        output[i * batch_size:(i + 1) * batch_size] = last_hidden_output.detach().cpu().numpy()
    return output

In [7]:
def score_point(tup):
    x, kde = tup
    return kde.score_samples(np.reshape(x, (1, -1)))[0]

def score_samples(kdes, samples, preds, n_jobs=None):
    if n_jobs is not None:
        p = mp.Pool(n_jobs)
    else:
        p = mp.Pool()
    
    results = p.map(score_point, [(x, kdes[i]) for x, i in zip(samples, preds)])
    p.close()
    p.join()

    return results

# def normalize(normal, adv, noisy):
#     n_samples = len(normal)
#     total = scale(np.concatenate((normal, adv, noisy)))

#     return total[:n_samples], total[n_samples:2*n_samples], total[2*n_samples:]

def normalize(normal, adv):
    n_samples = len(normal)
    total = scale(np.concatenate((normal, adv)))

    return total[:n_samples], total[n_samples:2*n_samples]

def train_lr(densities_pos, densities_neg, uncerts_pos, uncerts_neg):
    """
    TODO
    :param densities_pos:
    :param densities_neg:
    :param uncerts_pos:
    :param uncerts_neg:
    :return:
    """
    values_neg = np.concatenate(
        (densities_neg.reshape((1, -1)),
         uncerts_neg.reshape((1, -1))),
        axis=0).transpose([1, 0])
    values_pos = np.concatenate(
        (densities_pos.reshape((1, -1)),
         uncerts_pos.reshape((1, -1))),
        axis=0).transpose([1, 0])

    values = np.concatenate((values_neg, values_pos))
    labels = np.concatenate(
        (np.zeros_like(densities_neg), np.ones_like(densities_pos)))

    lr = LogisticRegressionCV(n_jobs=-1).fit(values, labels)

    return values, labels, lr



def compute_roc(probs_neg, probs_pos, plot=False):
    """
    TODO
    :param probs_neg:
    :param probs_pos:
    :param plot:
    :return:
    """
    probs = np.concatenate((probs_neg, probs_pos))
    labels = np.concatenate((np.zeros_like(probs_neg), np.ones_like(probs_pos)))
    fpr, tpr, _ = roc_curve(labels, probs)
    auc_score = auc(fpr, tpr)
    if plot:
        plt.figure(figsize=(7, 6))
        plt.plot(fpr, tpr, color='blue',
                 label='ROC (AUC = %0.4f)' % auc_score)
        plt.legend(loc='lower right')
        plt.title("ROC Curve")
        plt.xlabel("FPR")
        plt.ylabel("TPR")
        plt.show()

    return fpr, tpr, auc_score

# Parameter setting

In [8]:
attack = 'fgsm'
dataset = 'mnist'

print('Loading %s data set' % dataset)
if dataset == 'mnist':
#     org_train_path = '../gen/adv_data/v1/mnist/org/mnist_org_data/mnist_org_train_7944.pkl'
#     org_test_path = '../gen/adv_data/v1/mnist/org/mnist_org_data/mnist_org_test_1987.pkl'
    org_train_path = '../gen/adv_data/v2/mnist/org/mnist_org_data/mnist_org_train_24000.pkl'
    org_test_path = '../gen/adv_data/v2/mnist/org/mnist_org_data/mnist_org_test_6000.pkl'
elif dataset == 'cifar':
    org_train_path = '../gen/adv_data/v1/cifar10/org/cifar10_org_data/cifar10_org_train_7349.pkl'
    org_test_path = '../gen/adv_data/v1/cifar10/org/cifar10_org_data/cifar10_org_test_1838.pkl'
else:
    # Imagenet
    org_train_path = '' 
    org_test_path =  ''
    
batch_size = 100
class_num = 10
eps = 0.3

# Determines perturbation of noisy samples
STDEVS = {
    'mnist': {'fgsm': 0.310, 'pgd':0.234, 'cw':0.234},
    'cifar': {'fgsm': 0.050, 'pgd':0.033, 'cw':0.033},
}

# Gaussian kernel bandwidth for kernel density estimation
BANDWIDTHS = {'mnist': 1.20, 'cifar': 0.26}

Loading mnist data set


# Attack type

In [9]:
if attack == 'fgsm':
    print('Attack type : fgsm')
    adversary = GradientSignAttack(classifier_adv, loss_fn=nn.CrossEntropyLoss(), eps=eps,clip_min=0.0, clip_max=1.0)
elif attack == 'pgd':
    print('Attack type : pgd')
    adversary = PGDAttack(classifier_adv, loss_fn=nn.CrossEntropyLoss(), eps=eps)
elif attack == 'cw':
    print('Attack type : cw')
    adversary = CW(classifier_adv, radius=eps, class_num=class_num)

Attack type : fgsm


# Model loading

In [10]:
# mnist
if dataset == 'mnist':
    model = modelA()
    checkpoint = torch.load('../gen/models/mnist/modelA/modelA.pkl')
    model.load_state_dict(checkpoint)
    model.eval()
    model.to(device)
elif dataset == 'cifar':
    model = vgg19()
    model.features = torch.nn.DataParallel(model.features)
    model.to(device)
    checkpoint = torch.load('../gen/models/cifar10/vgg19/checkpoint_299.tar')
    model.load_state_dict(checkpoint)
    model.eval()
else:
    #Imagenet model loading
    pass
    
print('%s Model loading complete!'%dataset)

mnist Model loading complete!


In [11]:
if dataset == 'mnist':
    X_train, Y_train = load_list(org_train_path)[0].reshape(-1, 1, 28, 28), load_list(org_train_path)[1]
    X_test, Y_test = load_list(org_test_path)[0].reshape(-1, 1, 28, 28), load_list(org_test_path)[1]
elif dataset == 'cifar':
    X_train, Y_train = load_list(org_train_path)[0].reshape(-1, 3, 32, 32), load_list(org_train_path)[1]
    X_test, Y_test = load_list(org_test_path)[0].reshape(-1, 3, 32, 32), load_list(org_test_path)[1]
    X_train, X_test = Normalize(X_train), Normalize(X_test)
else:
    X_train, Y_train = load_list(org_train_path)[0].reshape(-1, 3, 32, 32), load_list(org_train_path)[1]
    X_test, Y_test = load_list(org_test_path)[0].reshape(-1, 3, 32, 32), load_list(org_test_path)[1]
    X_train, X_test = Normalize(X_train), Normalize(X_test)
    
X_test_adv = adversary.perturb(torch.Tensor(X_test).to(device), torch.Tensor(Y_test).to(device).long())
X_test_adv = X_test_adv.detach().cpu().numpy()

# Craft an equal number of noisy samples
X_test_noisy = get_noisy_samples(X_test, X_test_adv, dataset, attack)

In [16]:
X_test_adv.shape, X_train.shape, X_test.shape,Y_train.shape, Y_test.shape

((6000, 1, 28, 28), (8000, 1, 28, 28), (2000, 1, 28, 28), (8000,), (2000,))

In [17]:
for s_type, dset in zip(['normal', 'noisy', 'adversarial'], [X_test, X_test_noisy, X_test_adv]):
    model.eval()
    if dataset == 'mnist':
        dset = torch.Tensor(dset).to(device).reshape(-1, 1, 28, 28)
    else:
        dset = torch.Tensor(dset).to(device).reshape(-1, 3, 32, 32)
        
        
    output = model(dset)
    
    pred = torch.argmax(output, 1)
    
    correct = (pred.detach().cpu().numpy() == Y_test).sum()
    acc = correct / dset.shape[0]
    
    print("Model accuracy on the %s test set: %0.2f%%" %
              (s_type, 100 * acc))
    #Compute and display average perturbation sizes which define noisy perturbation size
    if not s_type == 'normal':
        l2_diff = np.linalg.norm(dset.detach().cpu().numpy().reshape((len(X_test), -1)) - X_test.reshape((len(X_test), -1)), axis=1).mean()
        print("Average L-2 perturbation size of the %s test set: %0.2f" % (s_type, l2_diff))
    

Model accuracy on the normal test set: 100.00%


AttributeError: 'bool' object has no attribute 'sum'

In [18]:
data = torch.Tensor(X_test).to(device)
preds_test = torch.argmax(model(data), 1).detach().cpu().numpy()
inds_correct = np.where(preds_test == Y_test)[0]

In [19]:
X_test = X_test[inds_correct]
# X_test_noisy = X_test_noisy[inds_correct]
X_test_adv = X_test_adv[inds_correct]

# Bayesian uncertainty

In [20]:
 print('Getting Monte Carlo dropout variance predictions...')
uncerts_normal = get_mc_predictions(model, X_test, batch_size=batch_size).var(axis=0).mean(axis=1)
# uncerts_noisy = get_mc_predictions(model, X_test_noisy, batch_size=batch_size).var(axis=0).mean(axis=1)
uncerts_adv = get_mc_predictions(model, X_test_adv, batch_size=batch_size).var(axis=0).mean(axis=1)

Getting Monte Carlo dropout variance predictions...


HBox(children=(IntProgress(value=0, max=50), HTML(value='')))




HBox(children=(IntProgress(value=0, max=50), HTML(value='')))




In [21]:
uncerts_normal.mean(), uncerts_adv.mean()

(0.0005858372816411122, 0.0075784068584915576)

# Kernel Density estimation score

In [22]:
print('Getting deep feature representations...')
X_train_features = get_deep_representations(model, X_train, batch_size=batch_size)
X_test_normal_features = get_deep_representations(model, X_test,batch_size=batch_size)
# X_test_noisy_features = get_deep_representations(model, X_test_noisy, batch_size=batch_size)
X_test_adv_features = get_deep_representations(model, X_test_adv, batch_size=batch_size)

Getting deep feature representations...


HBox(children=(IntProgress(value=0, max=80), HTML(value='')))




HBox(children=(IntProgress(value=0, max=20), HTML(value='')))




HBox(children=(IntProgress(value=0, max=20), HTML(value='')))




In [23]:
X_test_normal_features.mean(), X_test_adv_features.mean()

(-2.6006504176887977, -1.5195453637127647)

In [24]:
print('Training KDEs...')
class_inds = {}
Y_train = np.eye(10)[Y_train]
for i in range(Y_train.shape[1]):
#     class_inds[i] = Y_train[i]
    class_inds[i] = np.where(Y_train.argmax(axis=1) == i)[0]
kdes = {}
warnings.warn("Using pre-set kernel bandwidths that were determined "
              "optimal for the specific CNN models of the paper. If you've "
              "changed your model, you'll need to re-optimize the "
              "bandwidth.")
for i in range(Y_train.shape[1]):
    kdes[i] = KernelDensity(kernel='gaussian', bandwidth=BANDWIDTHS[dataset]).fit(X_train_features[class_inds[i]])
print('Training finished!')

Training KDEs...
Training finished!


In [25]:
# Get model predictions
print('Computing model predictions...')
model.eval()
preds_test_normal = torch.argmax(model(torch.Tensor(X_test).to(device)), 1).detach().cpu().numpy()
# preds_test_noisy = torch.argmax(model(torch.Tensor(X_test_noisy).to(device)), 1).detach().cpu().numpy()
preds_test_adv = torch.argmax(model(torch.Tensor(X_test_adv).to(device)), 1).detach().cpu().numpy()
print('Computing prediction finished!')

Computing model predictions...
Computing prediction finished!


In [26]:
# Get density estimates
print('computing densities...')
densities_normal = score_samples(
    kdes,
    X_test_normal_features,
    preds_test_normal
)
# densities_noisy = score_samples(
#     kdes,
#     X_test_noisy_features,
#     preds_test_noisy
# )
densities_adv = score_samples(
    kdes,
    X_test_adv_features,
    preds_test_adv
)

print('Computing densities finished!')

computing densities...
Computing densities finished!


In [27]:
## Z-score the uncertainty and density values
# uncerts_normal_z, uncerts_adv_z, uncerts_noisy_z = normalize(
#     uncerts_normal,
#     uncerts_adv,
#     uncerts_noisy
# )
# densities_normal_z, densities_adv_z, densities_noisy_z = normalize(
#     densities_normal,
#     densities_adv,
#     densities_noisy
# )

uncerts_normal_z, uncerts_adv_z = normalize(
    uncerts_normal,
    uncerts_adv,
)
densities_normal_z, densities_adv_z = normalize(
    densities_normal,
    densities_adv,
)



## Build detector
# print('Training start...')

# values, labels, lr = train_lr(
#     densities_pos=densities_adv_z,
#     densities_neg=np.concatenate((densities_normal_z, densities_noisy_z)),
#     uncerts_pos=uncerts_adv_z,
#     uncerts_neg=np.concatenate((uncerts_normal_z, uncerts_noisy_z))
# )

# print('Training end!')

print('Training start...')

values, labels, lr = train_lr(
    densities_pos=densities_adv_z,
    densities_neg=densities_normal_z,
    uncerts_pos=uncerts_adv_z,
    uncerts_neg=uncerts_normal_z
)

print('Training end!')

Training start...
Training end!


In [29]:
## Evaluate detector
# Compute logistic regression model predictions
probs = lr.predict_proba(values)[:, 1]
# Compute AUC
n_samples = len(X_test)
# The first 2/3 of 'probs' is the negative class (normal and noisy samples),
# and the last 1/3 is the positive class (adversarial samples).
# fpr, tpr, auc_score = compute_roc(
#     probs_neg=probs[:2 * n_samples],
#     probs_pos=probs[2 * n_samples:]
# )

fpr, tpr, auc_score = compute_roc(
    probs_neg=probs[:n_samples],
    probs_pos=probs[n_samples:]
)
print('Detector ROC-AUC score: %0.4f' % auc_score)

Detector ROC-AUC score: 0.9586


In [30]:
pred = (probs>=0.5).astype('int')
idx = (uncerts_adv_z / uncerts_noisy_z) > 1

In [31]:
(pred == labels).sum() / pred.shape[0]

0.6965904969169388

In [29]:
((uncerts_adv / uncerts_normal) > 1)

array([False,  True,  True, ...,  True,  True, False])

In [25]:
cut = int(pred.shape[0] / 3)

In [26]:
normal_pred = pred[:cut]
noisy_pred = pred[cut:cut*2]
adv_pred = pred[cut*2:]
normal_labels = labels[:cut]
noisy_labels = labels[cut:cut*2]
adv_labels = labels[cut*2:]

In [28]:
idx = (uncerts_adv / uncerts_normal) > 1
print('Uncertainty :',idx.sum()/idx.shape[0])
idx = (np.array(densities_adv) / np.array(densities_normal)) < 1
print('Kernel densities :', idx.sum()/idx.shape[0])

Uncertainty : 0.9167573449401524
Kernel densities : 0.9042437431991295


In [40]:
uncerts_normal.mean(), uncerts_noisy.mean(), uncerts_adv.mean()

(0.0006046953096136334, 0.0029852911732738186, 0.007750921423681355)

In [31]:
np.array(densities_adv).mean(), np.array(densities_normal).mean()

(-400.5944232222921, -260.94075303422323)

In [47]:
np.array(densities_adv).mean(), np.array(densities_normal).mean()

(-400.5944232222921, -260.94075303422323)