In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import os
import pprint
import sys
import warnings

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d

import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Dropout


module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

warnings.filterwarnings('ignore')

if tf.executing_eagerly():
    tf.compat.v1.disable_eager_execution()

from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning.perturbations import add_pattern_bd, add_single_bd, insert_image
from art.estimators.classification import KerasClassifier
from art.utils import load_mnist, preprocess



In [2]:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

In [3]:
(x_raw, y_raw), (x_raw_test, y_raw_test), min_, max_ = load_mnist(raw=True)

n_train = np.shape(x_raw)[0]
num_selection = 7500
random_selection_indices = np.random.choice(n_train, num_selection)
x_raw = x_raw[random_selection_indices]
y_raw = y_raw[random_selection_indices]

BACKDOOR_TYPE = "pattern"

In [4]:
max_val = np.max(x_raw)
def add_modification(x):
    if BACKDOOR_TYPE == 'pattern':
        return add_pattern_bd(x, pixel_value=max_val)
    elif BACKDOOR_TYPE == 'pixel':
        return add_single_bd(x, pixel_value=max_val) 
    else:
        raise("Unknown backdoor type")

In [5]:
def poison_dataset(x_clean, y_clean, percent_poison, poison_func):
    x_poison = np.copy(x_clean)
    y_poison = np.copy(y_clean)
    is_poison = np.zeros(np.shape(y_poison))
    
    sources = np.arange(10)
    targets = (np.arange(10) + 1) % 10 
    for i, (src, tgt) in enumerate(zip(sources, targets)):
        n_points_in_tgt = np.size(np.where(y_clean == tgt))
        num_poison = round((percent_poison * n_points_in_tgt) / (1 - percent_poison))
        src_imgs = x_clean[y_clean == src]

        n_points_in_src = np.shape(src_imgs)[0]
        indices_to_be_poisoned = np.random.choice(n_points_in_src, num_poison)

        imgs_to_be_poisoned = np.copy(src_imgs[indices_to_be_poisoned])
        backdoor_attack = PoisoningAttackBackdoor(poison_func)
        imgs_to_be_poisoned, poison_labels = backdoor_attack.poison(imgs_to_be_poisoned, y=np.ones(num_poison) * tgt)
        x_poison = np.append(x_poison, imgs_to_be_poisoned, axis=0)
        y_poison = np.append(y_poison, poison_labels, axis=0)
        is_poison = np.append(is_poison, np.ones(num_poison))

    is_poison = is_poison != 0

    return is_poison, x_poison, y_poison

In [6]:
percent_poison = 0.33
(is_poison_train, x_poisoned_raw, y_poisoned_raw) = poison_dataset(x_raw, y_raw, percent_poison, add_modification)
x_train, y_train = preprocess(x_poisoned_raw, y_poisoned_raw)

x_train = np.expand_dims(x_train, axis=3)


(is_poison_test, x_poisoned_raw_test, y_poisoned_raw_test) = poison_dataset(x_raw_test, y_raw_test, percent_poison, add_modification)
x_test, y_test = preprocess(x_poisoned_raw_test, y_poisoned_raw_test)

x_test = np.expand_dims(x_test, axis=3)

n_train = np.shape(y_train)[0]
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
x_train = x_train[shuffled_indices]
y_train = y_train[shuffled_indices]
is_poison_train = is_poison_train[shuffled_indices]

In [7]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 32)        320       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 24, 24, 64)        18496     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 12, 12, 64)        0         
_________________________________________________________________
dropout (Dropout)            (None, 12, 12, 64)        0         
_________________________________________________________________
flatten (Flatten)            (None, 9216)              0         
_________________________________________________________________
dense (Dense)                (None, 128)               1179776   
_________________________________________________________________
dropout_1 (Dropout)          (None, 128)               0

In [8]:
classifier = KerasClassifier(model=model, clip_values=(0, 1))
classifier.fit(x_train, y_train, nb_epochs=5, batch_size=128)

INFO:art.estimators.classification.keras:Inferred 7 hidden layers on Keras classifier.
[INFO] Inferred 7 hidden layers on Keras classifier.


Train on 11194 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [9]:
clean_x_test = x_test[is_poison_test == 0]
clean_y_test = y_test[is_poison_test == 0]

clean_preds = np.argmax(classifier.predict(clean_x_test), axis=1)
clean_correct = np.sum(clean_preds == np.argmax(clean_y_test, axis=1))
clean_total = clean_y_test.shape[0]

clean_acc = clean_correct / clean_total
print("\nClean test set accuracy: %.2f%%" % (clean_acc * 100))



Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.



Clean test set accuracy: 97.54%


In [10]:
poison_x_test = x_test[is_poison_test]
poison_y_test = y_test[is_poison_test]

poison_preds = np.argmax(classifier.predict(poison_x_test), axis=1)
poison_correct = np.sum(poison_preds == np.argmax(poison_y_test, axis=1))
poison_total = poison_y_test.shape[0]



poison_acc = poison_correct / poison_total
print("\n Effectiveness of poison: %.2f%%" % (poison_acc * 100))


 Effectiveness of poison: 95.96%


In [11]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

classifier = KerasClassifier(model=model, clip_values=(0, 1))

INFO:art.estimators.classification.keras:Inferred 7 hidden layers on Keras classifier.
[INFO] Inferred 7 hidden layers on Keras classifier.


In [12]:
from art.defences.preprocessor.preprocessor import Preprocessor
from art.utils import to_categorical
from typing import Optional, Tuple
class Mixup(Preprocessor):


    params = ["num_classes", "alpha", "num_mix"]

    def __init__(
        self,
        num_classes: int,
        alpha: float = 1.0,
        num_mix: int = 2,
        apply_fit: bool = True,
        apply_predict: bool = False,
    ) -> None:

        super().__init__(is_fitted=True, apply_fit=apply_fit, apply_predict=apply_predict)
        self.num_classes = num_classes
        self.alpha = alpha
        self.num_mix = num_mix
        self._check_params()

    def __call__(self, x: np.ndarray, y: Optional[np.ndarray] = None) -> Tuple[np.ndarray, Optional[np.ndarray]]:

        if y is None:
            raise ValueError("Labels `y` cannot be None.")

        
        if len(y.shape) == 2:
            y_one_hot = y
        elif len(y.shape) == 1:
            y_one_hot = to_categorical(y, self.num_classes)
        else:
            raise ValueError(
                "Shape of labels not recognised. "
                "Please provide labels in shape (nb_samples,) or (nb_samples, nb_classes)"
            )

        n = x.shape[0]

        
        lmbs = np.random.dirichlet([self.alpha] * self.num_mix)

        x_aug = lmbs[0] * x
        y_aug = lmbs[0] * y_one_hot
        for lmb in lmbs[1:]:
           
            indices = np.random.permutation(n)
            x_aug = x_aug + lmb * x[indices]
            y_aug = y_aug + lmb * y_one_hot[indices]

        return x_aug, y_aug

    def _check_params(self) -> None:
        if self.num_classes <= 0:
            raise ValueError("The number of classes must be positive")

        if self.alpha <= 0:
            raise ValueError("The mixing interpolation strength must be positive.")

        if self.num_mix < 2:
            raise ValueError("The number of samples to mix must be at least 2.")

In [17]:
from art.defences.trainer.trainer import Trainer
from typing import List, Optional, Union, Tuple, TYPE_CHECKING
from tqdm.auto import trange
import time
if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal
class DPInstaHideTrainer(Trainer):


    def __init__(
        self,
        classifier: "CLASSIFIER_LOSS_GRADIENTS_TYPE",
        augmentations: Union["Preprocessor", List["Preprocessor"]],
        noise: Literal["gaussian", "laplacian", "exponential"] = "laplacian",
        loc: Union[int, float] = 0.0,
        scale: Union[int, float] = 0.03,
        clip_values: "CLIP_VALUES_TYPE" = (0.0, 1.0),
    ):

        from art.defences.preprocessor import Preprocessor

        super().__init__(classifier)
        if isinstance(augmentations, Preprocessor):
            self.augmentations = [augmentations]
        else:
            self.augmentations = augmentations
        self.noise = noise
        self.loc = loc
        self.scale = scale
        self.clip_values = clip_values

    def _generate_noise(self, x: np.ndarray) -> np.ndarray:
        if self.noise == "gaussian":
            noise = np.random.normal(loc=self.loc, scale=self.scale, size=x.shape)
        elif self.noise == "laplacian":
            noise = np.random.laplace(loc=self.loc, scale=self.scale, size=x.shape)
        elif self.noise == "exponential":
            noise = np.random.exponential(scale=self.scale, size=x.shape)
        else:
            raise ValueError("The provided noise type is not supported:", self.noise)

        x_noise = x + noise
        x_noise = np.clip(x_noise, self.clip_values[0], self.clip_values[1])

        return x_noise.astype(x.dtype)

    def fit(  # pylint: disable=W0221
        self,
        x: np.ndarray,
        y: np.ndarray,
        validation_data: Optional[Tuple[np.ndarray, np.ndarray]] = None,
        batch_size: int = 128,
        nb_epochs: int = 20,
        **kwargs
    ):
        
        logger.info("Performing adversarial training with DP-InstaHide protocol")

        nb_batches = int(np.ceil(len(x) / batch_size))
        ind = np.arange(len(x))

        logger.info("Adversarial Training DP-InstaHide")

        for i_epoch in trange(nb_epochs, desc="DP-InstaHide training epochs"):
           
            np.random.shuffle(ind)
            start_time = time.time()
            train_loss = 0.0
            train_acc = 0.0
            train_n = 0.0

            for batch_id in range(nb_batches):
                
                x_batch = x[ind[batch_id * batch_size : min((batch_id + 1) * batch_size, x.shape[0])]]
                y_batch = y[ind[batch_id * batch_size : min((batch_id + 1) * batch_size, x.shape[0])]]

               
                x_aug = x_batch.copy()
                y_aug = y_batch.copy()
                for augmentation in self.augmentations:
                    x_aug, y_aug = augmentation(x_aug, y_aug)

               
                x_aug = self._generate_noise(x_aug)

                
                self._classifier.fit(x_aug, y_aug, nb_epochs=1, batch_size=x_aug.shape[0], verbose=0, **kwargs)

                
                loss = self._classifier.compute_loss(x_aug, y_aug, reduction="mean")
                output = np.argmax(self.predict(x_batch), axis=1)
                acc = np.sum(output == np.argmax(y_batch, axis=1))
                n = len(x_aug)

                
                train_loss += np.sum(loss)
                train_acc += acc
                train_n += n

            train_time = time.time()

            
            if validation_data is not None:
                (x_test, y_test) = validation_data
                output = np.argmax(self.predict(x_test), axis=1)
                test_loss = self._classifier.compute_loss(x_test, y_test, reduction="mean")
                test_acc = np.mean(output == np.argmax(y_test, axis=1))
                logger.info(
                    "epoch: %s time(s): %.1f, loss(tr): %.4f, acc(tr): %.4f, loss(val): %.4f, acc(val): %.4f",
                    i_epoch,
                    train_time - start_time,
                    train_loss / train_n,
                    train_acc / train_n,
                    test_loss,
                    test_acc,
                )
            else:
                logger.info(
                    "epoch: %s time(s): %.1f, loss: %.4f, acc: %.4f",
                    i_epoch,
                    train_time - start_time,
                    train_loss / train_n,
                    train_acc / train_n,
                )

    def fit_generator(self, generator: "DataGenerator", nb_epochs: int = 20, **kwargs):

        logger.info("Performing adversarial training with DP-InstaHide protocol")
        size = generator.size
        batch_size = generator.batch_size
        if size is not None:
            nb_batches = int(np.ceil(size / batch_size))
        else:
            raise ValueError("Size is None.")

        logger.info("Adversarial Training DP-InstaHide")

        for i_epoch in trange(nb_epochs, desc="DP-InstaHide training epochs"):
            start_time = time.time()
            train_loss = 0.0
            train_acc = 0.0
            train_n = 0.0

            for _ in range(nb_batches):
               
                x_batch, y_batch = generator.get_batch()

                
                x_aug = x_batch.copy()
                y_aug = y_batch.copy()
                for augmentation in self.augmentations:
                    x_aug, y_aug = augmentation(x_aug, y_aug)

                
                x_aug = self._generate_noise(x_aug)

                
                self._classifier.fit(x_aug, y_aug, nb_epochs=1, batch_size=x_aug.shape[0], verbose=0, **kwargs)

                
                loss = self._classifier.compute_loss(x_aug, y_aug, reduction="mean")
                output = np.argmax(self.predict(x_batch), axis=1)
                acc = np.sum(output == np.argmax(y_batch, axis=1))
                n = len(x_aug)

                
                train_loss += np.sum(loss)
                train_acc += acc
                train_n += n

            train_time = time.time()

            
            logger.info(
                "epoch: %s time(s): %.1f, loss: %.4f, acc: %.4f",
                i_epoch,
                train_time - start_time,
                train_loss / train_n,
                train_acc / train_n,
            )

    def predict(self, x: np.ndarray, **kwargs) -> np.ndarray:
        
        return self._classifier.predict(x, **kwargs)

In [18]:
mixup = Mixup(num_classes=10, num_mix=2)
trainer = DPInstaHideTrainer(
    classifier=classifier,
    augmentations=mixup,
    noise='laplacian',
    scale=0.3,
    clip_values=(0, 1) 
)
trainer.fit(x_train, y_train, nb_epochs=5, batch_size=128)

INFO:root:Performing adversarial training with DP-InstaHide protocol
[INFO] Performing adversarial training with DP-InstaHide protocol
INFO:root:Adversarial Training DP-InstaHide
[INFO] Adversarial Training DP-InstaHide
DP-InstaHide training epochs:   0%|          | 0/5 [00:00<?, ?it/s]INFO:root:epoch: 0 time(s): 26.2, loss: 0.0152, acc: 0.4559
[INFO] epoch: 0 time(s): 26.2, loss: 0.0152, acc: 0.4559
DP-InstaHide training epochs:  20%|██        | 1/5 [00:26<01:44, 26.19s/it]INFO:root:epoch: 1 time(s): 28.4, loss: 0.0126, acc: 0.5899
[INFO] epoch: 1 time(s): 28.4, loss: 0.0126, acc: 0.5899
DP-InstaHide training epochs:  40%|████      | 2/5 [00:54<01:22, 27.49s/it]INFO:root:epoch: 2 time(s): 31.9, loss: 0.0119, acc: 0.6230
[INFO] epoch: 2 time(s): 31.9, loss: 0.0119, acc: 0.6230
DP-InstaHide training epochs:  60%|██████    | 3/5 [01:26<00:59, 29.51s/it]INFO:root:epoch: 3 time(s): 36.1, loss: 0.0110, acc: 0.6654
[INFO] epoch: 3 time(s): 36.1, loss: 0.0110, acc: 0.6654
DP-InstaHide trainin

In [19]:
clean_x_test = x_test[is_poison_test == 0]
clean_y_test = y_test[is_poison_test == 0]

clean_preds = np.argmax(classifier.predict(clean_x_test), axis=1)
clean_correct = np.sum(clean_preds == np.argmax(clean_y_test, axis=1))
clean_total = clean_y_test.shape[0]

clean_acc = clean_correct / clean_total
print("\nClean test set accuracy: %.2f%%" % (clean_acc * 100))




Clean test set accuracy: 94.91%


In [20]:
poison_x_test = x_test[is_poison_test]
poison_y_test = y_test[is_poison_test]

poison_preds = np.argmax(classifier.predict(poison_x_test), axis=1)
poison_correct = np.sum(poison_preds == np.argmax(poison_y_test, axis=1))
poison_total = poison_y_test.shape[0]



poison_acc = poison_correct / poison_total
print("\n Effectiveness of poison: %.2f%%" % (poison_acc * 100))


 Effectiveness of poison: 75.70%
