In [None]:
import os
import yaml

import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
import tensorflow as tf

from adversarial_generators.fgsm import generate_adversarial_images
from model.VGG11 import VGG11
from preprocess.preprocess import load_data

# Create tf session for evaluating tensors

In [None]:
sess = tf.InteractiveSession()

# Config

In [None]:
with open('config/config.yml', 'r') as stream:
    config = yaml.safe_load(stream)

ROOT_DIRECTORY = os.path.dirname(os.path.abspath('__file__'))

# Load weights to model

In [None]:
input_shape = (config["img_height"], config["img_width"], 3)
model = VGG11(input_shape = input_shape, num_classes = config["num_classes"])
model.load_weights(config["path_to_weights"])

# Compile Model

In [None]:
optimizer = tf.keras.optimizers.Adam(lr = config["learning_rate"], decay = config["learning_rate"] / (config["epochs"]))
loss = config["loss_function"]
metrics = config["metrics"]
model.compile(optimizer = optimizer, loss = loss, metrics = [metrics])

# Load data for adversarial training

In [None]:
path_to_set = os.path.join(ROOT_DIRECTORY, config["path_to_data"])
path_to_test_csv = os.path.join(ROOT_DIRECTORY, config["path_to_test_csv"])
(X_test, y_test) = load_data(path_to_test_csv, path_to_set, config["img_width"], config["img_height"])
X_test, X_adversarial_train, y_test, y_adversarial_train = train_test_split(X_test, y_test, test_size = 0.5, random_state = 0)

# Normalize the data

In [None]:
X_adversarial_train = X_adversarial_train.astype("float32") / 255.0
X_test = X_test.astype("float32") / 255.0

# One-Hot Encode Target value

In [None]:
y_adversarial_train = tf.keras.utils.to_categorical(y_adversarial_train, config["num_classes"])
y_test = tf.keras.utils.to_categorical(y_test, config["num_classes"])

# Plot some adversarial images

In [None]:
epsilons = [0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.10]
index = 6122 # Change if you want to see other images 
images = X_test[index:index + 64]
labels = y_test[index:index + 64]
f, axarr = plt.subplots(5,11, figsize = (30, 20))
for i in range(5):
  axarr[i, 0].imshow(images[i])
  axarr[i, 0].set_xlabel("Original class: {}".format(np.argmax(labels, axis = 1)[i]))
for i, eps in enumerate(epsilons):
  adversarial_images = generate_adversarial_images(images, labels, eps, model).eval()
  new_predictions = model.predict_on_batch(adversarial_images)
  new_predictions = np.argmax(new_predictions, axis = 1)
  for ax in range(5):
    axarr[ax, i + 1].imshow(adversarial_images[ax])
    axarr[ax, i + 1].set_xlabel("New class: {}".format(new_predictions[ax]))
  f.axes[i + 1].set_title('Eps: {}'.format(eps))
plt.show()

# Duplicate model to Model A and Model B

In [None]:
# Model A
model_a = tf.keras.models.clone_model(model)
model_a.load_weights(config["path_to_weights"])
model_a.compile(optimizer = optimizer, loss = loss, metrics = [metrics])

# Model B
model_b = tf.keras.models.clone_model(model)
model_b.load_weights(config["path_to_weights"])
model_b.compile(optimizer = optimizer, loss = loss, metrics = [metrics])

# Accuracy of both models on test set

In [None]:
print('Model A')
model_a.evaluate(X_test, y_test)
print('Model B')
model_b.evaluate(X_test, y_test)

# Adversarial Training

In [None]:
eps = 0.1
adversarial_images = generate_adversarial_images(X_adversarial_train, y_adversarial_train, eps, model_a).eval()
new_predictions = model.predict_on_batch(adversarial_images)

indexes_of_wrong_images = (np.argmax(new_predictions, axis = 1) != np.argmax(y_adversarial_train, axis = 1))

# Get the images wrongly classified by model A
wrong_classified_images = X_adversarial_train[indexes_of_wrong_images]

# Convert list of prob to one hot encoding for traing model B
new_predictions = new_predictions[indexes_of_wrong_images]
new_predictions = tf.keras.utils.to_categorical(np.argmax(new_predictions, axis = 1))
print(len(new_predictions))

# Create Callback for Early Stopping

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = 3)

# Shuffle train data for adversarial training

In [None]:
X_for_A, y_for_A = sklearn.utils.shuffle(
  np.concatenate((X_adversarial_train[indexes_of_wrong_images], wrong_classified_images)),
  np.concatenate((y_adversarial_train[indexes_of_wrong_images], new_predictions)),
  random_state = 0)
X_for_B, y_for_B = sklearn.utils.shuffle(
  wrong_classified_images, y_adversarial_train[indexes_of_wrong_images],
  random_state = 0)

# Train Model A with incorrect labels

In [None]:
model_a.fit(
  x = X_for_A,
  y = y_for_A,
  batch_size = config["batch_size"],
  verbose = 1,
  validation_split = 0.1,
  callbacks = [callback],
  epochs = config["epochs"])

In [None]:
print('Model A')
model_a.evaluate(X_test, y_test)
print('Model B')
model_b.evaluate(X_test, y_test)

# Train Model B with correct labels

In [None]:
model_b.fit(
  x = X_for_B,
  y = y_for_B,
  batch_size = config["batch_size"],
  verbose = 1,
  validation_split = 0.1,
  callbacks = [callback],
  epochs = config["epochs"])

# Accuracy of both models on test set after adversarial training

In [None]:
print('Model A')
model_a.evaluate(X_test, y_test)
print('Model B')
model_b.evaluate(X_test, y_test)

# Test Transferability

In [None]:
# From A to B
predictions = model_a.predict_on_batch(X_test)

correctly_classified = (np.argmax(predictions, axis = 1) == np.argmax(y_test, axis = 1))

X_correctly_classified, y_correctly_classified = X_test[correctly_classified], y_test[correctly_classified]

adversarial_images = generate_adversarial_images(X_correctly_classified, y_correctly_classified, eps, model_a).eval()

new_predictions = model_a.predict_on_batch(adversarial_images)
indexes_of_wrong_images = (np.argmax(new_predictions, axis = 1) != np.argmax(y_correctly_classified, axis = 1))

# Get the images wrongly classified by model A
wrong_classified_images = X_correctly_classified[indexes_of_wrong_images]

model_b.evaluate(wrong_classified_images, y_correctly_classified[indexes_of_wrong_images])

In [None]:
# From B to A
predictions = model_b.predict_on_batch(X_test)

correctly_classified = (np.argmax(predictions, axis = 1) == np.argmax(y_test, axis = 1))

X_correctly_classified, y_correctly_classified = X_test[correctly_classified], y_test[correctly_classified]

adversarial_images = generate_adversarial_images(X_correctly_classified, y_correctly_classified, eps, model_b).eval()

new_predictions = model_b.predict_on_batch(adversarial_images)
indexes_of_wrong_images = (np.argmax(new_predictions, axis = 1) != np.argmax(y_correctly_classified, axis = 1))

# Get the images wrongly classified by model A
wrong_classified_images = X_correctly_classified[indexes_of_wrong_images]

model_a.evaluate(wrong_classified_images, y_correctly_classified[indexes_of_wrong_images])