# Assignment 9: Adversarial Examples

In [None]:
# enter your name and UFL email address
name = 'enter your name'
email = 'enter your email'

In [None]:
if name == 'enter your name' or email == 'enter your email':
    assert False, 'Enter your name & email first!'
else:
    print('Assignment 9 -- name: {}, email: {}\n'.format(name, email))
    
    # Load packages we need
    import sys
    import os
    import time

    import numpy as np
    import sklearn
    
    # we'll use tensorflow and keras for neural networks
    import tensorflow as tf
    import tensorflow.keras as keras
    
    # import layers we may use
    from tensorflow.keras.layers import Input, Flatten, Reshape, Dense, Conv2D, Conv2DTranspose, MaxPooling2D, Dropout

    # import callbacks we may use
    from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
    
    # Load the TensorBoard notebook extension
    #%load_ext tensorboard

    from matplotlib import pyplot as plt
    plt.rcParams.update({'font.size': 16})

    # Let's check our software versions
    print('### Python version: ' + __import__('sys').version)
    print('### NumPy version: ' + np.__version__)
    print('### Scikit-learn version: ' + sklearn.__version__)
    print('### Tensorflow version: ' + tf.__version__)
    print('### TF Keras version: ' + keras.__version__)
    print('------------')


    # load our packages / code
    sys.path.insert(1, '../common/')
    import utils
    import plots
    import nets

In [None]:
# global parameters to control behavior of the pre-processing, ML, analysis, etc.
seed = 42

# deterministic seed for reproducibility
np.random.seed(seed)
tf.random.set_seed(seed)

prop_vec = [24, 2, 2]

## We'll use the Fashion-MNIST data

In [None]:
from tensorflow.keras.datasets import fashion_mnist
def load_preprocess_fashion_mnist(minmax_normalize=True):
    
    labels = ['top', 'trouser', 'pullover', 'dress', 'coat', 'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
    train, testval = fashion_mnist.load_data()
    
    train_x, train_y = train
    testval_x, testval_y = testval
    
    if minmax_normalize:
        train_x = train_x / 255.0
        testval_x = testval_x / 255.0
    
    # split test - val
    nval = testval_x.shape[0] // 2
    
    val_x = testval_x[:nval]
    val_y = testval_y[:nval]
    
    test_x = testval_x[nval:]
    test_y = testval_y[nval:]
    
    if len(train_x.shape) < 4:
        train_x = train_x.reshape(-1, 28, 28, 1)
        val_x = val_x.reshape(-1, 28, 28, 1)
        test_x = test_x.reshape(-1, 28, 28, 1)
    
    return train_x, train_y, test_x, test_y, val_x, val_y, np.array(labels)

In [None]:
train_x, train_y, test_x, test_y, val_x, val_y, labels = load_preprocess_fashion_mnist()

## [Task 1] (20 points) Splitting the data

### To simulate an adversary not having access to exactly the same data as the people training the model, we'll split train and val into two disjoint subsets.

### [Task 1a] (20 points) Complete the implementation below to split the data as instructed.

In [None]:
prop_target = 0.6
tr_sz = int(train_x.shape[0] * prop_target)
vl_sz = int(val_x.shape[0] * prop_target)

### 1. take the first tr_sz records of train_x/y and store that in 'tr_x', 'tr_y'
### 2. take the first vl_sz records of val_x/y and store that in 'vl_x', 'vl_y'
### 3. take the remaining records (tr_sz, tr_sz + 1, ...) of train_x/y and store that in 'adv_tr_x', 'adv_tr_y'
### 4. take the remaining records (vl_sz, vl_sz + 1, ...) of val_x/y and store that in 'adv_vl_x', 'adv_vl_y'

###* Put your code here (~8 lines) *###




# delete this so we don't accidentally use them
del train_x, train_y, val_x, val_y

### Let's train the target model (CNN)

In [None]:
def create_compile_cnn(input_shape=[28, 28, 1], num_outputs=10, verbose=False):
    
    name = 'Target-CNN'    
    model = keras.models.Sequential(name=name)
    
    model.add(Conv2D(64, kernel_size=(7,7), input_shape=input_shape,
                     padding='same', activation='relu', name='conv1'))
    model.add(MaxPooling2D((2,2), name='maxpool1')) 
    
    model.add(Conv2D(12, kernel_size=(3,3), activation='relu', padding='same', name='conv2'))
    model.add(Conv2D(128, kernel_size=(3,3), activation='relu', padding='same', name='conv3'))
    model.add(MaxPooling2D(2, name='maxpool2'))
    
    model.add(Conv2D(256, kernel_size=(3,3), activation='relu', padding='same', name='conv4'))
    model.add(Conv2D(256, kernel_size=(3,3), activation='relu', padding='same', name='conv5'))
    model.add(MaxPooling2D(2, name='maxpool3'))
    
    model.add(Flatten(name='flatten'))
    
    model.add(Dense(128, activation='relu', name='fc1'))
    model.add(Dropout(0.5, name='dropout1'))
    model.add(Dense(64, activation='relu', name='fc2'))
    model.add(Dropout(0.5, name='dropout2'))
    
    model.add(Dense(num_outputs, activation="softmax", name='output'))
    
    opt = keras.optimizers.Adam(lr=0.002)
    
    if verbose:
        model.summary()
    
    model.compile(loss='sparse_categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    return name, model

### Train the target model (or load it)
#### To save time, you can use the provided model file.

In [None]:
target_model_fp = 'target-fashion-MNIST-CNN.h5'
load = os.path.exists(target_model_fp)

if load:
    target_model = tf.keras.models.load_model(target_model_fp)
else:
    _, target_model = create_compile_cnn(verbose=True)

    early_stop_cb = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)

    max_epochs = 10
    batch_size = 64

    history = target_model.fit(tr_x, tr_y, validation_data=(vl_x, vl_y), epochs=max_epochs, batch_size=batch_size, 
                         shuffle=True, callbacks=[early_stop_cb])

    # save the model
    target_model.save(target_model_fp)

## [Task 2] (30 points) Code for untargeted aversarial examples

### [Task 2a] (5 points) Set the target model to be non-trainable (so we don't accidentally update the weights)

In [None]:
### make sure the target model is not trainable
###* Put your code here (~1 line) *###


### Let's set up some code for the attack

In [None]:
# get predicted label and confidence value for it
def pred_label_and_conf(model, x):
    preds = model.predict(x)[0]
    pred_label = np.argmax(preds)
    pred_conf = preds[pred_label]
    
    return pred_label, pred_conf

### [Task 2b] (5 points) Fill in the implementation of gradient_of_loss_wrt_input()

In [None]:
# this should return the gradient of the loss (sparse categorical crossentropy with y_target) with respect to 'in_x'
# you should use gradient tape and return the gradient from the tape
def gradient_of_loss_wrt_input(model, in_x, y_target):
    ###* Put your code here (~5 lines) *###
    

### [Task 2c] (20 points) Fill in the implementation of fgsm_untargeted().

#### Note: in exercise 11 we implemented the targeted version of this attack (i.e., we wanted the adversarial example to be classified as the target label). Here, we want the untargeted version (i.e., we just want the adversarial example to have a different predicted label than 'in_y'). 

In [None]:
# Implement the Fast Gradient Sign Method (FGSM) for untargetted perturbations
def fgsm_untargeted(model, in_x, in_y, eps):
    ### Use gradient_of_loss_wrt_input() to get the gradient vector
    ### Then calculate a perturbation (using the sign of the gradient) to increase the loss on 'in_x' for the *true* label ('in_y') 
    ### Finally add that perturbation scaled by eps to the input image and clip it so it results in a valid image ('adv_x')
    ###* Put your code here (~3-4 lines) *###

    
    return adv_x

### Plotting code

In [None]:
def plot_adversarial_example(model, orig_x, adv_x):
    perturb = adv_x - orig_x
    
    in_label, in_conf = pred_label_and_conf(model, orig_x)

    adv_label, adv_conf = pred_label_and_conf(model, adv_x)
    
    titles = ['Label: {} (conf: {:.3f})'.format(in_label, in_conf), 'Perturbation',
              'Label: {} (conf: {:.3f})'.format(adv_label, adv_conf)]
    
    images = np.r_[orig_x, perturb, adv_x]
    
    # plot images
    plots.plot_images(images, dim_x=28, dim_y=28, fig_size=(8,3), titles=titles, titles_fontsize=12)

## [Task 3] (35 points) Producing adversarial examples

In [None]:
# Take the first example of test
in_x = test_x[0].reshape(-1, 28, 28, 1)
in_y = test_y[0]

in_pred, in_conf = pred_label_and_conf(target_model, in_x)
print('Last test example -> true label: {}, pred label: {} [confidence: {:.2f}%]'.format(in_y, in_pred, 100*in_conf))

### [Task 3a] (5 points) Complete the implementation below to split to produce an adversarial example using one step of untargeted FGSM.

In [None]:
eps = 0.1

### Use fgsm_untargeted() on the target model to create an adversarial example for in_x/y. Call it 'adv_x'
###* Put your code here (~2-3 lines) *###




# plot the adversarial example
plot_adversarial_example(target_model, in_x, adv_x)

### [Task 3b] (10 points) What is the predicted class label and confidence? Does the perturbation make the adversarial image look different to you?

In [None]:
###* put your answer here *###
#
# 
#
#

### [Task 3c] (5 points) Complete the implementation of the stop function.

In [None]:
# return True if:
# the maximum number of iteration is reached 
# or if the adversarial label is different from the original label and confidence is above the threshold
# otherwise return False
def stop_fn(i, adv_x, orig_label, adv_label, adv_conf, max_iter, threshold):
    ####* put your code here (~1-3 lines) *###
    

In [None]:
def iterative_fgsm_untargeted(model, in_x, in_y, eps, stop_fn):
    adv_x = tf.convert_to_tensor(in_x, dtype=tf.float32) # convert to tensor

    i = 0
    while True:
        # do one step of FGSM
        adv_x = fgsm_untargeted(model, adv_x, in_y, eps)

        # check if predicted label is the target
        adv_label, adv_conf = pred_label_and_conf(model, adv_x)

        # call the stop_fn and exit if needed
        if stop_fn(i, adv_x, in_y, adv_label, adv_conf):
            break
        
        # don't forget to increment i
        i += 1
            
    return adv_x

### Adversarial example on iterated FGSM

In [None]:
# Take the first example of test
in_x = test_x[0].reshape(-1, 28, 28, 1)
in_y = test_y[0]

# use a smaller epsilon...
eps = 0.005

the_stop_fn = lambda i, ax, ol, al, ac: stop_fn(i, ax, ol, al, ac, 200, 0.9)
adv_x = iterative_fgsm_untargeted(target_model, in_x, in_y, eps, the_stop_fn)

# plot the adversarial example
plot_adversarial_example(target_model, in_x, adv_x)

### [Task 3d] (5 points) Did the attack work? What is the predicted label and confidence?

In [None]:
###* put your answer here *###
#
# 
#
#

### For the rest of this task, we will simulate the case where the adversary does not have access to the target model. So he must train a substitute model (of a similar but different architecture) on a different subset of the data. Then the adversary will use the substitute model to craft adversarial examples.

In [None]:
def create_compile_substitute_cnn(input_shape=[28, 28, 1], num_outputs=10, verbose=False):
    
    name = 'Substitute-CNN'    
    model = keras.models.Sequential(name=name)
    
    model.add(Conv2D(32, kernel_size=(7,7), input_shape=input_shape,
                     padding='same', activation='relu', name='conv1'))
    model.add(MaxPooling2D((2,2), name='maxpool1')) 
    
    model.add(Conv2D(64, kernel_size=(3,3), activation='relu', padding='same', name='conv2'))
    model.add(MaxPooling2D(2, name='maxpool2'))
    
    model.add(Conv2D(128, kernel_size=(3,3), activation='relu', padding='same', name='conv3'))
    model.add(MaxPooling2D(2, name='maxpool3'))
    
    model.add(Flatten(name='flatten'))
    
    model.add(Dense(196, activation='relu', name='fc1'))
    model.add(Dropout(0.5, name='dropout1'))
    model.add(Dense(96, activation='relu', name='fc2'))
    model.add(Dropout(0.5, name='dropout2'))
    
    model.add(Dense(num_outputs, activation="softmax", name='output'))
    
    opt = keras.optimizers.Adam(lr=0.002)
    
    if verbose:
        model.summary()
    
    model.compile(loss='sparse_categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    return name, model

### Let's train the substitute model (sub_model). Notice that the training data is 'adv_tr_x'/'adv_tr_y'.

#### To save time, you can use the provided model file.

In [None]:
sub_model_fp = 'substitute-fashion-MNIST-CNN.h5'
load = os.path.exists(sub_model_fp)

if load:
    sub_model = tf.keras.models.load_model(sub_model_fp)
else:
    _, sub_model = create_compile_substitute_cnn(verbose=True)

    early_stop_cb = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)

    max_epochs = 10
    batch_size = 32

    history = sub_model.fit(adv_tr_x, adv_tr_y, validation_data=(adv_vl_x, adv_vl_y), epochs=max_epochs, batch_size=batch_size, 
                         shuffle=True, callbacks=[early_stop_cb])

    # save the model
    sub_model.save(sub_model_fp)

sub_model.trainable = False

### [Task 3e] (5 points) Complete the implement of the attack below.

In [None]:
# Take the first example of test
in_x = test_x[0].reshape(-1, 28, 28, 1)
in_y = test_y[0]

eps = 0.03

### Use iterative_fgsm_untargeted on the *substitute model* to create an adversarial example for in_x/y. 
### Call it 'adv_x'. We will then check if this adversarial example transfers.
### Use max_iter=200 and conf_threshold=0.95
###* Put your code here (~1-2 lines) *###



# Let's look at the adversarial example and the perturbation
print('--- Substitute Model ---')
plot_adversarial_example(sub_model, in_x, adv_x)
print('--- Target Model ---')
plot_adversarial_example(target_model, in_x, adv_x)

### [Task 3f] (5 points) Did the attack work? Did your adversarial example successfully transfer? (Justify your answer.)

In [None]:
###* put your answer here *###
#
# 
#
#

## [Task 4] (15 points) Creating noise that looks like a specific class to the target model.

### [Task 4a] (5 points) Produce images with uniformly random pixel values in [0,1]. Store the result in 'noise_x'.

In [None]:
num_samples = 64

### Sample 'num_samples' random noise 'images' and store this in 'noise_x'. 
### The shape of each image should be identitical to those in tr_x, vl_x, test_x, etc...
###* Put your code here (~1-2 lines) *###


In [None]:
titles = labels[np.argmax(target_model.predict(noise_x), axis=1)]
plots.plot_images(noise_x[:num_samples].reshape(-1, 28, 28), dim_x=28, dim_y=28, fig_size=(13,13), titles=titles)

### [Task 4b] (5 points) What is the most common label? Is that expected?

In [None]:
###* put your answer here *###
#
# 
#
#

### [Task 4c] (5 points) Implement move_towards_class().

In [None]:
def move_towards_class(model, in_x, target_label, eps):
    ### Take a step the direction of target label. Clip the result so you output a valid image ('out_x')
    ###* Put your code here (~3-5 lines) *###

    
    return out_x

In [None]:
max_iter = 100
eps = 0.06
target_label = 0

print('Taking steps towards label {} ({})'.format(target_label, labels[target_label]))

# initialize output
output_x = np.zeros_like(noise_x)
for i, in_x in enumerate(noise_x):
    
    # we need it to be a tensor so we can take the gradient of the loss with respect to it
    adv_x = tf.convert_to_tensor(in_x.reshape(1, 28, 28, 1), dtype=tf.float32)

    for j in range(0, max_iter):
        # move one step
        adv_x = move_towards_class(target_model, adv_x, target_label, eps)
        
        # check if predicted label is the target
        adv_label, adv_conf = pred_label_and_conf(target_model, adv_x)
        
        if adv_label == target_label:
            break
    
    output_x[i] = adv_x

In [None]:
titles = labels[np.argmax(target_model.predict(output_x), axis=1)]
plots.plot_images(output_x[:num_samples].reshape(-1, 28, 28), dim_x=28, dim_y=28, fig_size=(13,13), titles=titles)

## [CIS6930 Additional Task -- Task 5] (25 points): Single Pixel Attack

### For this task, you will implement a single pixel adversarial example attack. Specifically, this is an iterative that modifies a single pixel of an image at each iteration until the resulting perturbed image is classified as desired by the target model.

### [Task 5a] (25 points) Fill in the implementation below of single_pixel_attack_step() and iterative_single_pixel_attack()

In [None]:
# Single pixel attack
def single_pixel_attack_step(model, in_x, target_y, eps):
    ### Use gradient_of_loss_wrt_input() to get the gradient vector
    ### Then figure out the pixel which would have the greatest impact and perturb this pixel only (scaled by eps)
    ### Clip the result so you get a valid image
    ###* Put your code here (~3-4 lines) *###
    
    

def iterative_single_pixel_attack(model, in_x, target_y, eps, max_iter=25, conf_threshold=0.7):
    adv_x = tf.convert_to_tensor(in_x, dtype=tf.float32)
    
    ### In a loop of at most 'max_iter' iterations, use single_pixel_attack_step() to produce a perturbation
    ### In each iteration, get the predicted label of the current adversarial image and confidence
    ### if the predicted label matches the target ('target_y') and the confidence is above the 
    ### threshold ('conf_threshold') your attack should exit.
    ### This function should return the adversarial image 'adv_x'
    ###* Put your code here (~4-6 lines) *###

            
    return adv_x

In [None]:
target_y = 4
eps = 0.12

### Take the first example of test
in_x = test_x[0].reshape(-1, 28, 28, 1)
in_y = test_y[0]

# run the attack
adv_x = iterative_single_pixel_attack(target_model, in_x, target_y, eps)

# plot the result
plot_adversarial_example(target_model, in_x, adv_x)