In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.optimizers import Adam

from art.estimators.classification import TensorFlowV2Classifier
from art.utils import load_mnist, preprocess
from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning.perturbations import add_pattern_bd
from art.defences.detector.poison import ActivationDefence

import matplotlib.pyplot as plt

### BadNets

In [None]:
(X_train_raw, y_train_raw), (X_test_raw, y_test_raw), min_pixel_value, max_pixel_value = load_mnist(raw=True)
nb_classes = 10

In [None]:
max_val = np.max(X_train_raw)
def add_modification(x):
    return add_pattern_bd(x, pixel_value=max_val)

In [None]:
def poison_dataset(X_clean, y_clean, percent_poison, poison_func):
    X_poison = np.copy(X_clean)
    y_poison = np.copy(y_clean)
    is_poison = np.zeros_like(y_poison)

    sources = np.arange(nb_classes)
    targets = (np.arange(nb_classes) + 1) % nb_classes

    for i, (src, tgt) in enumerate(zip(sources, targets)):
        n_points_in_tgt = np.size(np.where(y_clean == tgt))
        num_poision = round((percent_poison * n_points_in_tgt) / (1 - percent_poison))

        src_imgs = X_clean[y_clean == src]

        n_points_in_src = np.shape(src_imgs)[0]
        indices_to_be_poisoned = np.random.choice(n_points_in_src, num_poision)

        imgs_to_be_poisoned = np.copy(src_imgs[indices_to_be_poisoned])

        attack = PoisoningAttackBackdoor(add_modification)

        imgs_to_be_poisoned, poison_labels = \
            attack.poison(imgs_to_be_poisoned, y=np.ones(num_poision) * tgt)
        
        X_poison = np.append(X_poison, imgs_to_be_poisoned, axis=0)
        y_poison = np.append(y_poison, poison_labels, axis=0)
        is_poison = np.append(is_poison, np.ones(num_poision))

    is_poison = is_poison != 0

    return is_poison, X_poison, y_poison


In [None]:
percent_poison = .33

(is_poison_train, X_poisoned_train_raw, y_poisoned_train_raw) = \
    poison_dataset(X_train_raw, y_train_raw, percent_poison, add_modification)
X_train, y_train = preprocess(X_poisoned_train_raw, y_poisoned_train_raw)
X_train = np.expand_dims(X_train, axis=3)

(is_poison_test, X_poisoned_test_raw, y_poisoned_test_raw) = \
    poison_dataset(X_test_raw, y_test_raw, percent_poison, add_modification)
X_test, y_test = preprocess(X_poisoned_test_raw, y_poisoned_test_raw)
X_test = np.expand_dims(X_test, axis=3)

n_train = len(y_train)
shuffle_indices = np.arange(n_train)
np.random.shuffle(shuffle_indices)
X_train = X_train[shuffle_indices]
y_train = y_train[shuffle_indices]

In [None]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=X_train.shape[1:]))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes, activation='softmax'))
model.compile()

model.summary()

In [None]:
loss_object = categorical_crossentropy
optimizer = Adam(learning_rate=0.01)

In [None]:
victim_classifier = TensorFlowV2Classifier(model=model, clip_values=(min_pixel_value, max_pixel_value), nb_classes=nb_classes, input_shape=(28, 28, 1), loss_object=loss_object, optimizer=optimizer)
victim_classifier.fit(X_train, y_train, batch_size=128, nb_epochs=30, verbose=True)

In [None]:
clean_X_test = X_test[is_poison_test == 0]
clean_y_test = y_test[is_poison_test == 0]

clearn_preds = victim_classifier.predict(clean_X_test)
acc = np.sum(np.argmax(clearn_preds, axis=1) == np.argmax(clean_y_test, axis=1)) / len(clean_y_test)
print(f"Clean test set accuracy: {acc * 100:.5f} %")

In [None]:
poison_X_test = X_test[is_poison_test]
poison_y_test = y_test[is_poison_test]

poison_preds = victim_classifier.predict(poison_X_test)
acc = np.sum(np.argmax(poison_preds, axis=1) == np.argmax(poison_y_test, axis=1)) / len(poison_y_test)
print(f"Poison test set accuracy: {acc * 100:.5f} %")

In [None]:
clean_correct = np.sum(np.argmax(clearn_preds, axis=1) == np.argmax(clean_y_test, axis=1))
poison_correct = np.sum(np.argmax(poison_preds, axis=1) == np.argmax(poison_y_test, axis=1))
total_correct = clean_correct + poison_correct
total = len(clean_y_test) + len(poison_y_test)
total_acc = total_correct / total
print(f"Overall test set accuracy: {total_acc * 100:.5f} %")

In [None]:
c = 1
i = 0

c_idx = np.where(np.argmax(poison_y_test, axis=1) == c)[0][i]

plt.imshow(poison_X_test[c_idx].squeeze(), cmap='gray')
plt.show()

print('Prediction: {}'.format(np.argmax(poison_preds[c_idx])))

### Activation Clustering
<p style="color: red">Kerasのバージョンが3になったことによる問題が発生していると考えられる為、割愛</p>

In [None]:
defence = ActivationDefence(victim_classifier, X_train, y_train)
report, is_clean_lst = defence.detect_poison(nb_clusters=2, nb_dims=10, reduce='PCA')
[clusters_by_class, _] = defence.cluster_activations()