In [1]:
from keras.datasets import fashion_mnist
from art.utils import load_mnist
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, Lambda, MaxPooling2D, BatchNormalization
from tensorflow.keras.models import Sequential,load_model
from art.metrics.metrics import empirical_robustness, clever_t, clever_u, clever, loss_sensitivity, wasserstein_distance
# from art.metrics.metrics import inverper_c
import numpy as np
from art.estimators.classification.keras import KerasClassifier
import logging
from progressbar import ProgressBar

import sys
sys.path.append('..')  
from GradPri_utils.utils import *

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
tf.compat.v1.disable_eager_execution()

In [2]:
R_L1 = 40
R_L2 = 2
R_LI = 0.1

def _create_krclassifier_FMNIST():
    num_classes = 10
    input_shape = (28,28,1)
    
    model = Sequential()
    # Block 1
    model.add(Conv2D(64, (3, 3), padding='same', name = 'block1_conv1', 
                     input_shape=input_shape))
    # model.add(BatchNormalization(name = 'block1_bn1'))
    model.add(Activation('relu', name = 'block1_act1'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name='block1_pool1'))

    model.add(Conv2D(128, (3, 3), padding='same', name = 'block1_conv2'))
    # model.add(BatchNormalization(name = 'block1_bn2'))
    model.add(Activation('relu', name = 'block1_act2'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name='block1_pool2'))

    model.add(Conv2D(256, (3, 3), padding='same', name = 'block1_conv3'))
    # model.add(BatchNormalization(name = 'block1_bn3'))
    model.add(Activation('relu', name = 'block1_act3'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), name='block1_pool3'))
    # top
    model.add(Flatten(name='flatten'))
    model.add(Dense(1024, activation='relu', name='fc1'))
    model.add(Dropout(0.5, name='drop1'))
    model.add(Dense(32, activation='relu', name='fc2'))
    model.add(Dropout(0.5, name='drop2'))
    # model.add(Dense(64, activation='relu', name='fc3'))
    # model.add(Dropout(0.5, name='drop3'))
    model.add(Dense(num_classes, activation='softmax', name='predictions'))

    
    # model = load_model("./fmnist_CNN.h5")
    opt = tf.keras.optimizers.Adam(learning_rate=0.00005)
    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])

    # Get the classifier
    krc = KerasClassifier(model=model, clip_values=(0, 1), use_logits=False)

    return krc

logger = logging.getLogger(__name__)

In [48]:
# Get Fashion-MNIST
(x_train, y_train), (x_test, y_test), _, _ = load_mnist()

# Get the classifier
krc = _create_krclassifier_FMNIST()
krc.fit(x_train, y_train, batch_size=256, nb_epochs=10, shuffle=True, # nb_epochs=5
        validation_data=(x_test[0:5000], y_test[0:5000]), verbose=1)

# checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath="./fmnist_CNN.h5", 
#                                                     monitor='val_accuracy', 
#                                                     verbose=1, 
#                                                     save_best_only=True, mode = 'max')
# callbacks_list = [checkpoint]
# krc.fit(x_train, y_train, batch_size=256, epochs=20, shuffle=True,
#           validation_data=(x_test[0:5000], y_test[0:5000]), callbacks=callbacks_list)

Train on 60000 samples, validate on 5000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [49]:
Gini = []
total_sample_num = 500
predicted_confidence = []

pbar = ProgressBar()
for index in pbar(range(total_sample_num)):
    pre_tmp = krc.predict(np.reshape(x_test[index], [-1,28,28,1]))
    Gini.append(1-np.sum(pre_tmp*pre_tmp))
    predicted_confidence.append(pre_tmp)

100% |########################################################################|


In [50]:
indexs = np.argsort(Gini)
indexs = indexs[::-1]
# 计算APFD指标
APFD,_,_ = get_APFD(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                   predicted_confidence=np.array(predicted_confidence))
print("APFD: ", APFD)

# 计算RAUC指标
RAUC,_,_ = get_RAUC(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                    predicted_confidence=np.array(predicted_confidence))
print("RAUC: ", RAUC)

100% |########################################################################|
100% |########################################################################|

APFD:  0.9826666666666667
RAUC:  0.9865996649916248





In [53]:
BoundaryGini = []

pbar = ProgressBar()
for index in pbar(range(total_sample_num)):
    pred = krc.predict(np.reshape(x_test[index], [-1,28,28,1]))
    res_tmp = clever_u(krc, x_test[index], 3, 5, R_LI, norm=np.inf, pool_factor=3, verbose=False)
    BoundaryGini.append(res_tmp)

100% |########################################################################|


In [54]:
indexs = np.argsort(BoundaryGini)
# indexs = indexs[::-1]
# 计算APFD指标
APFD,_,_ = get_APFD(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                   predicted_confidence=np.array(predicted_confidence))
print("APFD: ", APFD)

# 计算RAUC指标
RAUC,_,_ = get_RAUC(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                    predicted_confidence=np.array(predicted_confidence))
print("RAUC: ", RAUC)

100% |########################################################################|
100% |########################################################################|

APFD:  0.9846666666666667
RAUC:  0.9886097152428811





In [59]:
R_L1 = 40
R_L2 = 2
R_LI = 0.05
BoundaryGini2 = []

pbar = ProgressBar()
for index in pbar(range(total_sample_num)):
    pred = krc.predict(np.reshape(x_test[index], [-1,28,28,1]))
    res_tmp = inverper_c(krc, x_test[index], 3, 5, R_LI, norm=np.inf, pool_factor=3)
    BoundaryGini2.append(res_tmp)

100% |########################################################################|


In [60]:
indexs = np.argsort(BoundaryGini2)
# indexs = indexs[::-1]
# 计算APFD指标
APFD,_,_ = get_APFD(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                   predicted_confidence=np.array(predicted_confidence))
print("APFD: ", APFD)

# 计算RAUC指标
RAUC,_,_ = get_RAUC(Gini_indexs=indexs, ground_truth_label=y_test[range(total_sample_num)],
                    predicted_confidence=np.array(predicted_confidence))
print("RAUC: ", RAUC)

100% |########################################################################|
100% |########################################################################|

APFD:  0.9846666666666667
RAUC:  0.9886097152428811





In [62]:
np.argsort([1,5,3,4,7])

array([0, 2, 3, 1, 4])

In [31]:
from __future__ import absolute_import, division, print_function, unicode_literals

from functools import reduce
import logging
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING

import numpy as np
import numpy.linalg as la
from scipy.optimize import fmin as scipy_optimizer
from scipy.stats import weibull_min
from tqdm.auto import tqdm

from art.config import ART_NUMPY_DTYPE
from art.attacks.evasion.fast_gradient import FastGradientMethod
from art.attacks.evasion.hop_skip_jump import HopSkipJump
from art.utils import random_sphere

if TYPE_CHECKING:
    from art.attacks.attack import EvasionAttack
    from art.utils import CLASSIFIER_TYPE, CLASSIFIER_LOSS_GRADIENTS_TYPE, CLASSIFIER_CLASS_LOSS_GRADIENTS_TYPE
    
def inverper_c(
    classifier: "CLASSIFIER_CLASS_LOSS_GRADIENTS_TYPE",
    x: np.ndarray,
    nb_batches: int,
    batch_size: int,
    radius: float,
    norm: float,
    c_init: float = 1.0,
    pool_factor: int = 10,
) -> float:
    """
    Compute CLEVER score for a targeted attack.

    | Paper link: https://arxiv.org/abs/1801.10578

    :param classifier: A trained model.
    :param x: One input sample.
    :param nb_batches: Number of repetitions of the estimate.
    :param batch_size: Number of random examples to sample per batch.
    :param radius: Radius of the maximum perturbation.
    :param norm: Current support: 1, 2, np.inf.
    :param c_init: Initialization of Weibull distribution.
    :param pool_factor: The factor to create a pool of random samples with size pool_factor x n_s.
    :return: CLEVER score.
    """
    # Check if the targeted class is different from the predicted class
    y_pred = classifier.predict(np.array([x]))
    pred_class = np.argmax(y_pred, axis=1)[0]

    # Check if pool_factor is smaller than 1
    if pool_factor < 1:  # pragma: no cover
        raise ValueError("The `pool_factor` must be larger than 1.")

    # Some auxiliary vars
    rand_pool_grad_set = []
    grad_norm_set = []
    dim = reduce(lambda x_, y: x_ * y, x.shape, 1)
    shape = [pool_factor * batch_size]
    shape.extend(x.shape)

    # Generate a pool of samples
    rand_pool = np.reshape(
        random_sphere(nb_points=pool_factor * batch_size, nb_dims=dim, radius=radius, norm=norm),
        shape,
    )
    rand_pool += np.repeat(np.array([x]), pool_factor * batch_size, 0)
    rand_pool = rand_pool.astype(ART_NUMPY_DTYPE)
    if hasattr(classifier, "clip_values") and classifier.clip_values is not None:
        np.clip(rand_pool, classifier.clip_values[0], classifier.clip_values[1], out=rand_pool)

    # Change norm since q = p / (p-1)
    if norm == 1:
        norm = np.inf
    elif norm == np.inf:
        norm = 1
    elif norm != 2:  # pragma: no cover
        raise ValueError(f"Norm {norm} not supported")

    # Compute gradients for all samples in rand_pool
    for i in range(batch_size):
        rand_pool_batch = rand_pool[i * pool_factor : (i + 1) * pool_factor]

        # Compute gradients
        grad_pred_class = classifier.class_gradient(rand_pool_batch, label=pred_class)

        if np.isnan(grad_pred_class).any() :  # pragma: no cover
            raise Exception("The classifier results NaN gradients.")

        grad = grad_pred_class
        grad = np.reshape(grad, (pool_factor, -1))
        grad = np.linalg.norm(grad, ord=norm, axis=1)
        rand_pool_grad_set.extend(grad)

    rand_pool_grads = np.array(rand_pool_grad_set)

    # Loop over the batches
    for _ in range(nb_batches):
        # Random selection of gradients
        grad_norm = rand_pool_grads[np.random.choice(pool_factor * batch_size, batch_size)]
        grad_norm = np.max(grad_norm)
        grad_norm_set.append(grad_norm)

    # Maximum likelihood estimation for max gradient norms
    [_, loc, _] = weibull_min.fit(-np.array(grad_norm_set), c_init, optimizer=scipy_optimizer)

    # Compute function value
    values = classifier.predict(np.array([x]))
    value = values[:, pred_class] - 0.5

    # Compute scores
#     score = np.min([-value[0] / loc, radius])
    score = -value[0] / loc

    return score
