# DV2607 Project Notebook
### Authors:
### Oliver Ljung (ollj19@student.bth.se)
### Phoebe Waters (phaa19@student.bth.se)

## Importing modules and dataset

In [None]:
# imports
from matplotlib import pyplot as plt
import numpy as np

import tensorflow as tf

from keras.layers import Conv2D, Conv2DTranspose, MaxPooling2D, Flatten, Dense, Input, Activation, BatchNormalization, LeakyReLU, Reshape, UpSampling2D, Dropout, ReLU
from keras import Sequential, Model
from keras.datasets import mnist
from tensorflow.keras.datasets import cifar10
from keras.utils import to_categorical
import keras.backend as KB

from keras.losses import BinaryCrossentropy, CategoricalCrossentropy, Hinge, SquaredHinge, MeanSquaredError, Loss, SparseCategoricalCrossentropy

import random
import time

import art
from art.attacks.evasion import FastGradientMethod, ProjectedGradientDescent, CarliniL2Method, CarliniL0Method
from art.estimators.classification import KerasClassifier

tf.compat.v1.disable_eager_execution()      # Enable when training NN but has to be disabled for art
print(tf.config.list_physical_devices('GPU'))

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  # Restrict TensorFlow to only use the first GPU
  try:
    tf.config.set_visible_devices(gpus[0], 'GPU')
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
  except RuntimeError as e:
    # Visible devices must be set before GPUs have been initialized
    print(e)

## Defining functions

In [None]:
# Defining functions

def display_attack(x_test, x_adv_test, model):
    x_real = x_test[:9]

    x_fake = x_adv_test[:9]
    x_fake_labels = model.predict(x_fake)
    x_real_labels = model.predict(x_real)

    for i in range(9):
        fig = plt.subplot(3, 3, i+1)
        fig.imshow(x_fake[i], cmap=plt.get_cmap('gray'))
        print(f'p_fake = {np.argmax(x_fake_labels[i])}, p_real = {np.argmax(x_real_labels[i])}')
    plt.show()

In [None]:
# Defining models

def create_classifier_alt(img_shape, num_of_classes, verbose=True):
    # Create a CNN model

    encoder = Sequential(name="classifier_encoder")

    # Add Convolution layers
    encoder.add(Conv2D(32, (3,3), activation='relu', padding="same",  input_shape=img_shape, name="image_input"))
    encoder.add(MaxPooling2D((2, 2)))

    encoder.add(Conv2D(64, (3,3), activation='relu', padding="same"))
    encoder.add(MaxPooling2D((2, 2)))

    encoder.add(Conv2D(128, (3,3), padding="same"))
    encoder.add(BatchNormalization(momentum=0.8))
    encoder.add(LeakyReLU(alpha=0.2))

    # Add residual layer
    resblock = Sequential(name="classifier_resblock")
    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))
    resblock.add(ReLU())

    resblock.add(Dropout(0.25))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))

    # Add decoder
    decoder = Sequential(name="classifier_decoder")
    decoder.add(Flatten())
    decoder.add(Dense(1024, activation='relu'))
    decoder.add(Dense(256, activation='relu'))
    decoder.add(Dense(64, activation='relu'))
    decoder.add(Dense(num_of_classes, activation=None, name="classication_output"))

    # Combining layers to model
    input = Input(shape = img_shape)
    encoding = encoder(input)
    bottleneck = resblock(encoding) + encoding
    bottleneck = resblock(bottleneck) + bottleneck
    bottleneck = resblock(bottleneck) + bottleneck
    prediction = decoder(bottleneck)
    
    model = Model(input, prediction, name="classifier")

    if verbose:
        print(model.summary())

    # return model
    return model

def create_classifier(img_shape, num_of_classes):
    # Create a CNN model
    # Add input
    input = Input(shape = img_shape)

    model = Sequential(name="mnist_classifier")

    # Add Convolution layers
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=img_shape, name="image_input"))
    model.add(MaxPooling2D((2, 2)))

    model.add(Conv2D(64, (3,3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))

    model.add(Conv2D(128, (3,3), activation='relu'))

    model.add(Flatten())

    # Add predictive layers
    model.add(Dense(128, activation='relu'))
    model.add(Dense(64, activation='relu'))

    model.add(Dense(num_of_classes, activation=None, name="classication_output"))
    
    print(model.summary())

    # Get ouput from model
    prediction = model(input)
    # prediction = tf.clip_by_value(prediction, 0, 1)

    # return model
    return Model(input, prediction, name="classifier")

def create_resblock():
    resblock = Sequential()
    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))
    resblock.add(ReLU())

    resblock.add(Dropout(0.25))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))
    resblock.add(ReLU())

    resblock.add(Dropout(0.25))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))
    resblock.add(ReLU())

    resblock.add(Dropout(0.25))

    resblock.add(Conv2D(128, (3,3), padding="same"))
    resblock.add(BatchNormalization(momentum=0.8))

    return resblock

def create_adv_generator(img_shape, channels, boundary, verbose=True):
    # Create a CNN model

    # Enocder
    encoder = Sequential(name="adv_generator_encoder")
    encoder.add(Conv2D(32, (3,3), activation='relu', padding="same", input_shape=img_shape, name="image_input"))
    encoder.add(BatchNormalization(momentum=0.8))
    encoder.add(UpSampling2D())
    
    encoder.add(Conv2D(128, (3,3), padding="same"))
    encoder.add(BatchNormalization(momentum=0.8))
    encoder.add(LeakyReLU(alpha=0.2))

    # Bottle neck w residual block
    resblock1 = create_resblock()
    resblock2 = create_resblock()
    resblock3 = create_resblock()

    # Decoder
    decoder = Sequential(name="adv_generator_decoder")
    decoder.add(Conv2DTranspose(32, (3,3), activation='relu', padding="same"))
    decoder.add(BatchNormalization(momentum=0.8))
    decoder.add(UpSampling2D())

    decoder.add(Conv2DTranspose(64, (3,3), padding="same"))
    decoder.add(LeakyReLU(alpha=0.2))

    decoder.add(MaxPooling2D((4, 4)))
    decoder.add(Conv2DTranspose(channels, (3,3), padding="same"))
    decoder.add(Activation('tanh', name="adv_image_output"))
    
    # Combining layers to model
    input = Input(shape = img_shape)
    encoding = encoder(input)
    bottleneck = resblock1(encoding) + encoding
    bottleneck = resblock2(bottleneck) + bottleneck
    bottleneck = resblock3(bottleneck) + bottleneck
    perturbations = decoder(bottleneck)

    perturbations = tf.clip_by_value(perturbations, -1, 1)
    perturbations = tf.clip_by_value(perturbations, -boundary, boundary)
    
    model = Model(input, perturbations, name="adv_generator")

    if verbose:
        print(model.summary())
    
    # return model
    return model


def create_discriminator(img_shape, verbose=True):
    # Create a CNN model
    model = Sequential(name="discriminator")

    # Add Convolution layers
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=img_shape))
    model.add(MaxPooling2D((2, 2)))

    model.add(Conv2D(64, (3,3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))

    model.add(Conv2D(64, (3,3), activation='relu'))

    model.add(Flatten())

    # Add predictive layers
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid', name="validity_output"))
    
    if verbose:
        print(model.summary())

    # Add input
    input = Input(shape = img_shape)

    # Get ouput from model
    validity = model(input)

    # return model
    return Model(input, validity, name="discriminator")

## Loading dataset

In [None]:
# load dataset
(train_X, train_y), (test_X, test_y) = mnist.load_data()

train_X = train_X.astype("float32") / 255
test_X = test_X.astype("float32") / 255

train_X = np.expand_dims(train_X, -1)
test_X = np.expand_dims(test_X, -1)

train_y = to_categorical(train_y)
train_y = np.array([np.argmax(y) for y in train_y])
test_y  = to_categorical(test_y)
test_y = np.array([np.argmax(y) for y in test_y])

IMG_SHAPE = (28,28,1)
CHANNELS = 1
NUM_CLASSES = 10
IMG_LOW_LIMIT = 0
IMG_HIGH_LIMIT = 1

In [None]:
# load dataset
import ssl
# ssl._create_default_https_context = ssl._create_unverified_context

# (train_X, train_y), (test_X, test_y) = cifar10.load_data()

# train_X = train_X.astype("float32")/255
# test_X = test_X.astype("float32")/255

# train_y = to_categorical(train_y)
# train_y = np.array([np.argmax(y) for y in train_y])
# test_y  = to_categorical(test_y)
# test_y = np.array([np.argmax(y) for y in test_y])

# IMG_SHAPE = (32,32,3)
# CHANNELS = 3
# NUM_CLASSES = 10
# IMG_LOW_LIMIT = 0
# IMG_HIGH_LIMIT = 1

In [None]:
for i in range(9):
    plt.subplot(3, 3, i+1)
    plt.imshow(test_X[i], cmap="gray")
plt.show()

## Training the Classifier; model to attack and later defend

In [None]:
optimizer = tf.optimizers.Adam()
# optimizer = tf.compat.v1.train.AdamOptimizer()

model = create_classifier(IMG_SHAPE, NUM_CLASSES)
model.compile(optimizer=optimizer, loss=SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"])

model.fit(train_X, train_y, epochs=5)

### Model baseline

In [None]:
model.evaluate(test_X, test_y)

## Defining models and input tensors

In [None]:
### Values for advGAN ###
# MNIST
# alpha = 1
# beta = 1
# c = 0.7

# CIFAR-10
alpha = 1
beta = 1
c = 0.8
#########################

optimizer = tf.optimizers.Adam(learning_rate=0.0001)

input = Input(shape=IMG_SHAPE, name="image")

adv_generator = create_adv_generator(IMG_SHAPE, CHANNELS, c)
perturbations = adv_generator(input)

adv_image = tf.add(input, perturbations)
adv_image = tf.clip_by_value(adv_image, IMG_LOW_LIMIT, IMG_HIGH_LIMIT)

# We want the generator and discriminator to be trained in a combined model but as seperate entities
discriminator = create_discriminator(IMG_SHAPE)
discriminator.compile(optimizer=optimizer, loss=BinaryCrossentropy(), metrics=["accuracy"])
discriminator.trainable = False
validity_adv = discriminator(adv_image)


# We dont want to train our model to attack
model.trainable = False
prediction = model(adv_image)

outputs_w_names = {
    "classifier": prediction,
    "discriminator": validity_adv,
    "adv_generator": perturbations,
}

advGAN_model = Model(inputs=input, outputs=outputs_w_names, name="advGAN-net")

class PerturbationLoss(Loss):
    # Lhinge = Ex max(0, kG(x)k2 − c)
    def call(self, y_true, y_pred):
        # Colin Targonski (ctargon), Feb-2019, link: https://github.com/ctargon/AdvGAN-tf/blob/master/AdvGAN.py
        zeros = tf.zeros((tf.shape(perturbations)[0]))
        L_hinge = tf.reduce_mean(tf.maximum(zeros, tf.norm(tf.reshape(y_pred, (tf.shape(y_pred)[0], -1)), axis=1) - c))
        return L_hinge

L_adv = SparseCategoricalCrossentropy(from_logits=True)
L_GAN = BinaryCrossentropy()
L_hinge = PerturbationLoss()

losses = {
    "classifier": L_adv,
    "discriminator": L_GAN,
    "adv_generator": L_hinge,
}

losses_weights = {
    "classifier": 1,
    "discriminator": alpha,
    "adv_generator": beta,
}

advGAN_model.compile(optimizer=optimizer, loss=losses, loss_weights=losses_weights, metrics=["accuracy"])
advGAN_model.summary()

## AdvGAN Training

In [None]:
BATCH_SIZE = 8 
EPOCHS = 100_000

y_real = np.ones((BATCH_SIZE, 1))
y_fake = np.zeros((BATCH_SIZE, 1))

# Targeted Attack

target_prediction = np.zeros([BATCH_SIZE, 1])

hinge_limit = np.array([[c]]*BATCH_SIZE)

discriminator_acc_history = []
classifier_acc_history = []
max_discriminator_acc = 0
max_classifier_acc = 0.4

start_time = time.time()
for epoch in range(EPOCHS+1):
    if time.time() - start_time > 60*60*10:
        break   # time limit of 10 hours

    x_real = np.array(random.choices(train_X, k=BATCH_SIZE))
    
    perturbations = adv_generator.predict(x_real, verbose=0)

    x_fake = x_real + perturbations
    x_fake = np.clip(x_fake, IMG_LOW_LIMIT, IMG_HIGH_LIMIT)

    y = {
        "classifier": target_prediction,
        "discriminator": y_real,
        "adv_generator": perturbations
    }

    # Evaluate how successful the attack is
    classifier_loss = model.evaluate(x_fake, target_prediction, verbose=False)

    # We want our discriminator to learn about fakes after
    discriminator_loss_real = discriminator.train_on_batch(x=x_real, y=y_real)
    discriminator_loss_fake = discriminator.train_on_batch(x=x_fake, y=y_fake)
    discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

    loss = advGAN_model.train_on_batch(x=x_real, y=y)

    # Add values to history
    discriminator_acc_history.append(discriminator_loss[1])
    classifier_acc_history.append(classifier_loss[1])

    if epoch % 500 == 0:
        if max_discriminator_acc < np.mean(discriminator_acc_history):
            discriminator.save("mnist_adv_discriminator_t0.h5")
        if max_classifier_acc < np.mean(classifier_acc_history):         
            adv_generator.save("mnist_adv_generator_t0.h5")

        print(f"EPOCH: {epoch}")
        print(f"Total loss = {loss[0]} \n\tLosses: adv_generator = {loss[1]}, classifier = {loss[2]}, discriminator = {loss[3]}\n\tAccuracy: adv_generator = {loss[4]}, mnist_classifier = {loss[5]}, discriminator = {loss[6]} \n")
        print(f"    Discriminator: Accuracy: {np.mean(discriminator_acc_history)}, Classifier: Accuracy: {np.mean(classifier_acc_history)}")
        
        discriminator_acc_history = []
        classifier_acc_history = []

        for i in range(5):
            fig = plt.subplot(2, 5, i+1)
            fig.imshow(x_fake[i], cmap=plt.get_cmap('gray'))
        for i in range(5):
            fig = plt.subplot(2, 5, i+6)
            fig.imshow(x_real[i], cmap=plt.get_cmap('gray'))
        plt.show()

In [None]:
# To use same models as in paper

model = tf.keras.models.load_model("models/mnist_classifier__report.h5")
discriminator = tf.keras.models.load_model("models/mnist_adv_discriminator_t0__report.h5")      
adv_generator = tf.keras.models.load_model("models/mnist_adv_generator_t0__report.h5")

# Attack

In [None]:
perturbations = adv_generator.predict(test_X, verbose = 1)
advGAN_test_X = np.add(test_X, perturbations)
advGAN_test_X = np.clip(advGAN_test_X, 0, 1) # Values in image is [0,1]

In [None]:
model.evaluate(advGAN_test_X, np.zeros([len(advGAN_test_X), 1]))

In [None]:
display_attack(test_X, advGAN_test_X, model)

# Other Attacks

In [None]:
# Have to convert classifier model to art
clf = KerasClassifier(model=model)

In [None]:
# Baseline for attack (% classified as 0s in test)
model.evaluate(test_X, np.zeros(len(test_X)))

## 1. FGM

In [None]:
def fast_gradient_method_attack(x_test, clf):
    # MNIST params: eps=0.03, 10 steps
    # CIFAR-10 params: eps=0.01, 10 steps
    FGM = FastGradientMethod(clf, eps=0.03, targeted=True, batch_size=32)

    # Targeted Attack
    target_prediction = np.zeros([len(x_test), 1])

    FGM_test_X = FGM.generate(x_test, target_prediction)
    for step in range(10):
        FGM_test_X = FGM.generate(FGM_test_X, target_prediction)

    return FGM_test_X

FGM_test_X = fast_gradient_method_attack(test_X, clf)
model.evaluate(FGM_test_X, np.zeros(len(FGM_test_X)))

In [None]:
display_attack(test_X, FGM_test_X, model)

## 2. PGD

In [None]:
def projected_gradient_descent_attack(x_test, clf):
    # MNIST params: max_iter=10, eps_step=0.05, eps=0.3
    # CIFAR-10 params: max_iter=10, eps_step=0.01, eps=0.1, 
    PGD = ProjectedGradientDescent(clf, targeted=True, max_iter=10, eps_step=0.05, eps=0.3, verbose=False)

    # Targeted Attack
    target_prediction = np.zeros([len(x_test), 1])

    PGD_test_X = PGD.generate(x_test, target_prediction)

    return PGD_test_X

PGD_test_X = projected_gradient_descent_attack(test_X, clf)
model.evaluate(PGD_test_X, np.zeros(len(PGD_test_X)))

In [None]:
display_attack(test_X, PGD_test_X, model)

# Defence

## 1. Binary Input detector (discriminator)

In [None]:
class BinaryInputDetector():
    def __init__(self, detector, target_model):
        self._detector = detector
        self._target_model = target_model
    
    def predict(self, array):
        prediction = []
        
        for element in array:
            if self._detector.predict(np.array([element]))[0][0] < 0.5:
                # Advesary
                prediction.append(-1)
            else:
                # Real
                p = self._target_model.predict(np.array([element]))
                p_id = np.argmax(p)
                prediction.append(p_id)
        
        adv_rate = prediction.count(-1)/len(prediction)
        return np.array(prediction), adv_rate

In [None]:
discriminator = tf.keras.models.load_model("models/mnist_discriminator__report.h5")  # For normal GAN BID
BID = BinaryInputDetector(discriminator, model)

y_pred_BASE, adv_rate_BASE = BID.predict(test_X)
y_pred_FGM, adv_rate_FGM = BID.predict(FGM_test_X)
y_pred_PGD, adv_rate_PGD = BID.predict(PGD_test_X)
y_pred_advGAN , adv_rate_advGAN  = BID.predict(advGAN_test_X)

print(f"Adv rates: BASE: {adv_rate_BASE}, FGM: {adv_rate_FGM}, PGD: {adv_rate_PGD}, advGAN: {adv_rate_advGAN}")