<table style="border: none" align="center">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="4" color="black"><b>  Demonstrate detection of adversarial samples using ART  </b></font></font></th>
   </tr> 
</table>

In this notebook we demonstrate the detection of adversarial samples using ART. Our classifier will be a **ResNet** architecture for the [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html) image data set.


## Contents

1.	[Loading prereqs and data](#prereqs)
2.  [Evaluating the classifier](#classifier)
3.  [Training the detector](#train_detector)
4.  [Evaluating the detector](#detector)

<a id="prereqs"></a>
## 1. Loading prereqs and data

In [28]:
import warnings
warnings.filterwarnings('ignore')


from keras.models import load_model

from art import config
from art.utils import load_dataset, get_file, preprocess
from art.estimators.classification import KerasClassifier
from art.attacks.evasion import FastGradientMethod
from art.defences.detector.evasion.subsetscanning import SubsetScanningDetector

import os
import sys
import six
import numpy as np
from typing import Tuple

%matplotlib inline
import matplotlib.pyplot as plt

BATCH_SIZE = 100
NB_TRAIN = 100
NB_TEST = 100

__file__ = '/home/jinie/PycharmProjects/adversarial-robustness-toolbox/uilts/'

In [17]:
DATASET_TYPE = Tuple[  # pylint: disable=C0103
    Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray], float, float
]

def load_cifar10(
    raw: bool = False,
) -> DATASET_TYPE:
    """
    Loads CIFAR10 dataset from config.CIFAR10_PATH or downloads it if necessary.

    :param raw: `True` if no preprocessing should be applied to the data. Otherwise, data is normalized to 1.
    :return: `(x_train, y_train), (x_test, y_test), min, max`
    """

    def load_batch(fpath: str) -> Tuple[np.ndarray, np.ndarray]:
        """
        Utility function for loading CIFAR batches, as written in Keras.

        :param fpath: Full path to the batch file.
        :return: `(data, labels)`
        """
        with open(fpath, "rb") as file_:
            if sys.version_info < (3,):
                content = six.moves.cPickle.load(file_)
            else:
                content = six.moves.cPickle.load(file_, encoding="bytes")
                content_decoded = {}
                for key, value in content.items():
                    content_decoded[key.decode("utf8")] = value
                content = content_decoded
        data = content["data"]
        labels = content["labels"]

        data = data.reshape(data.shape[0], 3, 32, 32)
        return data, labels

    path = os.path.join('/home/jinie/Documents/cifar-10-batches-py/')
    num_train_samples = 50000

    x_train = np.zeros((num_train_samples, 3, 32, 32), dtype=np.uint8)
    y_train = np.zeros((num_train_samples,), dtype=np.uint8)

    for i in range(1, 6):
        fpath = os.path.join(path, "data_batch_" + str(i))
        data, labels = load_batch(fpath)
        x_train[(i - 1) * 10000 : i * 10000, :, :, :] = data
        y_train[(i - 1) * 10000 : i * 10000] = labels

    fpath = os.path.join(path, "test_batch")
    x_test, y_test = load_batch(fpath)
    y_train = np.reshape(y_train, (len(y_train), 1))
    y_test = np.reshape(y_test, (len(y_test), 1))

    # Set channels last
    x_train = x_train.transpose((0, 2, 3, 1))
    x_test = x_test.transpose((0, 2, 3, 1))

    min_, max_ = 0.0, 255.0
    if not raw:
        min_, max_ = 0.0, 1.0
        x_train, y_train = preprocess(x_train, y_train, clip_values=(0, 255))
        x_test, y_test = preprocess(x_test, y_test, clip_values=(0, 255))

    return (x_train, y_train), (x_test, y_test), min_, max_

In [18]:
(x_train, y_train), (x_test, y_test), min_, max_ = load_cifar10()

num_samples_train = 100
num_samples_test = 100
x_train = x_train[0:num_samples_train]
y_train = y_train[0:num_samples_train]
x_test = x_test[0:num_samples_test]
y_test = y_test[0:num_samples_test]

class_descr = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

In [29]:
(x_train, y_train), (x_test, y_test), _, _ = load_dataset("mnist")
x_train, y_train = x_train[:NB_TRAIN], y_train[:NB_TRAIN]
x_test, y_test = x_test[:NB_TEST], y_test[:NB_TEST]

<a id="classifier"></a>
## 2. Subset Scan Detector

Keras classifier

In [30]:
def _kr_weights_loader(dataset, weights_type, layer="DENSE"):
    import keras.backend as k

    filename = str(weights_type) + "_" + str(layer) + "_" + str(dataset) + ".npy"

    def _kr_initializer(_, dtype=None):
        weights = np.load(os.path.join(os.path.dirname(os.path.dirname(__file__)), "utils/resources/models", filename))
        return k.variable(value=weights, dtype=dtype)

    return _kr_initializer


def get_image_classifier_kr(
    loss_name="categorical_crossentropy", loss_type="function_losses", from_logits=False, load_init=True
):
    """
    Standard Keras classifier for unit testing
    The weights and biases are identical to the TensorFlow model in get_classifier_tf().
    :param loss_name: The name of the loss function.
    :type loss_name: `str`
    :param loss_type: The type of loss function definitions: label (loss function defined by string of its name),
                      function_losses (loss function imported from keras.losses), function_backend (loss function
                      imported from keras.backend)
    :type loss_type: `str`
    :param from_logits: Flag if model should predict logits (True) or probabilities (False).
    :type from_logits: `bool`
    :param load_init: Load the initial weights if True.
    :type load_init: `bool`
    :return: KerasClassifier, tf.Session()
    """
    import tensorflow as tf

    tf_version = [int(v) for v in tf.__version__.split(".")]
    if tf_version[0] == 2 and tf_version[1] >= 3:
        is_tf23_keras24 = True
        tf.compat.v1.disable_eager_execution()
        from tensorflow import keras
        from tensorflow.keras.models import Sequential
        from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D
    else:
        is_tf23_keras24 = False
        import keras
        from keras.models import Sequential
        from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D

    from art.estimators.classification.keras import KerasClassifier

    # Create simple CNN
    model = Sequential()

    if load_init:
        if is_tf23_keras24:
            model.add(
                Conv2D(
                    1,
                    kernel_size=(7, 7),
                    activation="relu",
                    input_shape=(28, 28, 1),
                    kernel_initializer=_tf_weights_loader("MNIST", "W", "CONV2D", 2),
                    bias_initializer=_tf_weights_loader("MNIST", "B", "CONV2D", 2),
                )
            )
        else:
            model.add(
                Conv2D(
                    1,
                    kernel_size=(7, 7),
                    activation="relu",
                    input_shape=(28, 28, 1),
                    kernel_initializer=_kr_weights_loader("MNIST", "W", "CONV2D"),
                    bias_initializer=_kr_weights_loader("MNIST", "B", "CONV2D"),
                )
            )
    else:
        model.add(Conv2D(1, kernel_size=(7, 7), activation="relu", input_shape=(28, 28, 1)))

    model.add(MaxPooling2D(pool_size=(4, 4)))
    model.add(Flatten())

    if from_logits:
        if load_init:
            if is_tf23_keras24:
                model.add(
                    Dense(
                        10,
                        activation="linear",
                        kernel_initializer=_tf_weights_loader("MNIST", "W", "DENSE", 2),
                        bias_initializer=_tf_weights_loader("MNIST", "B", "DENSE", 2),
                    )
                )
            else:
                model.add(
                    Dense(
                        10,
                        activation="linear",
                        kernel_initializer=_kr_weights_loader("MNIST", "W", "DENSE"),
                        bias_initializer=_kr_weights_loader("MNIST", "B", "DENSE"),
                    )
                )
        else:
            model.add(Dense(10, activation="linear"))
    else:
        if load_init:
            if is_tf23_keras24:
                model.add(
                    Dense(
                        10,
                        activation="softmax",
                        kernel_initializer=_tf_weights_loader("MNIST", "W", "DENSE", 2),
                        bias_initializer=_tf_weights_loader("MNIST", "B", "DENSE", 2),
                    )
                )
            else:
                model.add(
                    Dense(
                        10,
                        activation="softmax",
                        kernel_initializer=_kr_weights_loader("MNIST", "W", "DENSE"),
                        bias_initializer=_kr_weights_loader("MNIST", "B", "DENSE"),
                    )
                )
        else:
            model.add(Dense(10, activation="softmax"))

    if loss_name == "categorical_hinge":
        if loss_type == "label":
            raise AttributeError("This combination of loss function options is not supported.")
        elif loss_type == "function_losses":
            loss = keras.losses.categorical_hinge
    elif loss_name == "categorical_crossentropy":
        if loss_type == "label":
            if from_logits:
                raise AttributeError("This combination of loss function options is not supported.")
            else:
                loss = loss_name
        elif loss_type == "function_losses":
            if from_logits:
                if int(keras.__version__.split(".")[0]) == 2 and int(keras.__version__.split(".")[1]) >= 3:

                    def categorical_crossentropy(y_true, y_pred):
                        return keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)

                    loss = categorical_crossentropy
                else:
                    raise NotImplementedError("This combination of loss function options is not supported.")
            else:
                loss = keras.losses.categorical_crossentropy
        elif loss_type == "function_backend":
            if from_logits:

                def categorical_crossentropy(y_true, y_pred):
                    return keras.backend.categorical_crossentropy(y_true, y_pred, from_logits=True)

                loss = categorical_crossentropy
            else:
                loss = keras.backend.categorical_crossentropy
    elif loss_name == "sparse_categorical_crossentropy":
        if loss_type == "label":
            if from_logits:
                raise AttributeError("This combination of loss function options is not supported.")
            else:
                loss = loss_name
        elif loss_type == "function_losses":
            if from_logits:
                if int(keras.__version__.split(".")[0]) == 2 and int(keras.__version__.split(".")[1]) >= 3:

                    def sparse_categorical_crossentropy(y_true, y_pred):
                        return keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)

                    loss = sparse_categorical_crossentropy
                else:
                    raise AttributeError("This combination of loss function options is not supported.")
            else:
                loss = keras.losses.sparse_categorical_crossentropy
        elif loss_type == "function_backend":
            if from_logits:

                def sparse_categorical_crossentropy(y_true, y_pred):
                    return keras.backend.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)

                loss = sparse_categorical_crossentropy
            else:
                loss = keras.backend.sparse_categorical_crossentropy
    elif loss_name == "kullback_leibler_divergence":
        if loss_type == "label":
            raise AttributeError("This combination of loss function options is not supported.")
        elif loss_type == "function_losses":
            loss = keras.losses.kullback_leibler_divergence
        elif loss_type == "function_backend":
            raise AttributeError("This combination of loss function options is not supported.")
    elif loss_name == "cosine_similarity":
        if loss_type == "label":
            loss = loss_name
        elif loss_type == "function_losses":
            loss = keras.losses.cosine_similarity
        elif loss_type == "function_backend":
            loss = keras.backend.cosine_similarity

    else:
        raise ValueError("Loss name not recognised.")

    model.compile(loss=loss, optimizer=keras.optimizers.Adam(lr=0.01), metrics=["accuracy"])

    # Get classifier
    krc = KerasClassifier(model, clip_values=(0, 1), use_logits=from_logits)

    return krc

In [31]:
classifier = get_image_classifier_kr()

Generate adversarial samples

In [32]:
attacker = FastGradientMethod(classifier, eps=0.05)
x_train_adv = attacker.generate(x_train)
x_test_adv = attacker.generate(x_test) # this takes about two minutes









Compile training data for detector

In [33]:
x_train_detector = np.concatenate((x_train, x_train_adv), axis=0)

bgd = x_train
clean = x_test
anom = x_test_adv


Adversarial samples detecting

In [36]:
detector = SubsetScanningDetector(classifier, bgd, layer=1)

# clean_scors, adv_scores, dpwr = detector.scan(clean, clean)
clean_scores, adv_scores, dpwr = detector.scan(clean, anom)

print(clean_scores)
print(adv_scores)

Subset scanning:   0%|          | 0/200 [00:00<?, ?it/s]

[8.353407688374094, 13.346051838929732, 16.92527157653528, 10.54952468451945, 18.451540434417502, 16.144130778606844, 19.070703157390493, 9.179674981574411, 13.353154486970144, 6.831363209181577, 20.285431801291388, 16.144130778606844, 9.402902843498225, 16.144130778606844, 9.535351578695247, 13.353154486970144, 14.574494189585415, 9.71632945972361, 13.84536155052378, 14.135382712060528, 10.344461526373797, 10.802101477998978, 16.67790808665872, 11.335717703010879, 17.247487589450948, 10.250855796898612, 10.69144303671026, 19.171924803479335, 14.836814374925844, 6.884756236180809, 12.933600178439825, 6.119783321049607, 10.676841471143785, 19.070703157390493, 9.77371284381141, 12.564784632942692, 10.651069335266296, 5.123522406886183, 11.976359914476078, 5.354810405918406, 6.89899503578038, 12.089479697525201, 13.349492210173345, 17.163632841651445, 12.473350209495303, 14.911497069372816, 7.685283610329273, 16.015262206715676, 7.0330164563463, 15.256562525912395, 13.316636708208987, 7.6