# Denoiser

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
from os.path import join,exists

import random
import numpy as np
import pandas as pd
import cv2
import pickle
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt

import tensorflow as tf
import gc


from tensorflow import convert_to_tensor
import tensorflow.keras.backend as K
from tensorflow.keras.models import  load_model,Model
from tensorflow.keras import applications
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Conv2DTranspose, Reshape,Input,Dropout, Activation, Dense, GlobalMaxPooling2D,Conv2D,Flatten,MaxPooling2D,InputLayer
from tensorflow.keras.utils import to_categorical
import copy

from efficientnet.tfkeras import EfficientNetB7
from tensorflow.keras import datasets
from foolbox.attacks import LinfFastGradientAttack,LinfDeepFoolAttack
from foolbox.distances import LpDistance
from foolbox.models import TensorFlowModel
from foolbox import criteria
from sklearn.metrics import classification_report


from art.attacks.evasion import SaliencyMapMethod
from art.estimators.classification import TensorFlowV2Classifier
from art.attacks.evasion import BoundaryAttack
from art.utils import load_dataset #to play with cifar images

import graphviz
import pydot



gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4*1024)]
)

In [3]:
tf.config.run_functions_eagerly(False)

### 1. Load Data

In [4]:
data_mnist=datasets.mnist.load_data(path='mnist.npz')
X_train_mnist,y_train_mnist=data_mnist[0][0],data_mnist[0][1]
X_test_mnist,y_test_mnist=data_mnist[1][0],data_mnist[1][1]
y_train_mnist = to_categorical(y_train_mnist, 10)
y_test_mnist = to_categorical(y_test_mnist, 10)

""" Train """
X_train_img_mnist = np.full((60000, 32, 32, 3), 0)
for i, s in enumerate(X_train_mnist):
    X_train_img_mnist[i] = cv2.cvtColor(np.pad(s,2), cv2.COLOR_GRAY2RGB) 
    
""" Test """
X_test_img_mnist = np.full((10000, 32, 32, 3), 0)
for i, s in enumerate(X_test_mnist):
    X_test_img_mnist[i] = cv2.cvtColor(np.pad(s,2), cv2.COLOR_GRAY2RGB) 
    
X_train_mnist=X_train_img_mnist
X_test_mnist= X_test_img_mnist

In [5]:
(X_train_cifar, y_train_cifar), (X_test_cifar, y_test_cifar), _,_=load_dataset('cifar10')
for i, im in enumerate(X_train_cifar):
    X_train_cifar[i]=255*im
for i, im in enumerate(X_test_cifar):
    X_test_cifar[i]=255*im    


dict_classes={}
dict_classes[0]='airplane'
dict_classes[1]='automobile'
dict_classes[2]='bird'
dict_classes[3]='cat'
dict_classes[4]='deer'
dict_classes[5]='dog'
dict_classes[6]='frog'
dict_classes[7]='horse'
dict_classes[8]='ship'
dict_classes[9]='truck'

In [6]:
(X_train, y_train), (X_test, y_test), min, max=load_dataset('cifar10')
y_test=list(map(np.argmax,y_test))
for i,im in enumerate(X_train):
    X_train[i]=255*im
    
for i,im in enumerate(X_test):
    X_test[i]=255*im

In [7]:
dict_classes={}
dict_classes[0]='airplane'
dict_classes[1]='automobile'
dict_classes[2]='bird'
dict_classes[3]='cat'
dict_classes[4]='deer'
dict_classes[5]='dog'
dict_classes[6]='frog'
dict_classes[7]='horse'
dict_classes[8]='ship'
dict_classes[9]='truck'

In [8]:
def pick_data_set(name):
    
    if name=='Mnist':
        X_train=X_train_mnist
        X_test= X_test_mnist
        y_train = y_train_mnist
        y_test= y_test_mnist

    elif name=='Cifar':
        X_train=X_train_cifar
        X_test= X_test_cifar
        y_train = y_train_cifar
        y_test= y_test_cifar    
    return(X_train,X_test,y_train,y_test)
    

### 2. Load model

In [9]:
def train_and_save_effnet(data_set_name):
    (X_train,X_test,y_train,y_test)=pick_data_set(data_set_name)
    tf.keras.backend.clear_session()
    effnet_base = EfficientNetB7(weights='imagenet', include_top=False, input_shape=(32, 32, 3))
    effnet_base.trainable=True
    x = GlobalMaxPooling2D(name='pool_1')(effnet_base.layers[-2].output)
    x = Dropout(0.2, name="dropout_2")(x)
    x = Dense(32)(x)
    x = Dense(10,name='fc_2')(x)
    o = Activation('softmax', name='act_2')(x)
    model_effnet = Model(inputs=effnet_base.input, outputs=[o])


    
    if exists('/media/hdd1/benchmark_adversarial_attacks/effnet_model_'+str(data_set_name)+'.h5')==False:
        model_effnet.compile(
            loss='categorical_crossentropy',
            optimizer='nadam',
            metrics=['accuracy']
            )
        history = model_effnet.fit(X_train, y_train,
                      epochs=5,
                      batch_size = 128,
                      validation_split=0.1,
                      shuffle=True,
                      verbose=1)
        model_effnet.save('/media/hdd1/benchmark_adversarial_attacks/effnet_model_'+str(data_set_name)+'.h5')

    else:
        model_effnet=load_model('/media/hdd1/benchmark_adversarial_attacks/effnet_model_'+str(data_set_name)+'.h5')
        
    return(model_effnet)

### 3.Denoiser creation and data set creation

In [10]:
class Denoise(Model):
    def __init__(self):
        super(Denoise, self).__init__()
        self.encoder = tf.keras.Sequential([
          Input(shape=(32, 32, 3)), 
          Conv2D(64, (3,3), activation='relu', padding='same', strides=2),
          Conv2D(64, (3,3), activation='relu', padding='same', strides=2),
          Flatten(),
          Dense(4096,activation='relu'),          
          Reshape((8,8,64))
            ])

        self.decoder = tf.keras.Sequential([
        Conv2DTranspose(64, kernel_size=3, strides=2, activation='relu', padding='same'),
        Conv2DTranspose(128, kernel_size=2, strides=2, activation='relu', padding='same'),
        Conv2D(3, kernel_size=(3,3), activation='relu', padding='same')])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded



In [11]:
def data_set_maker(model,attack, image_list, labels):
    model_to_fool = TensorFlowModel(model, bounds=(0, 255))
    success=[]
    adv_list=[]
    benign_list=[]
    adv_true_label=[]
    epsilon=[5]
    labels=list(map(np.argmax,labels))
    print('======epsilon: '+str(epsilon[0])+'======')
    for i,image in enumerate(tqdm(image_list,position=0)):
        if i!=0 and i%(len(labels)//3)==0:
            print('======adv_list_size: '+str(len(adv_list))+'======')
            epsilon=[epsilon[0]*1.5]
            print('======epsilon: '+str(epsilon[0])+'======')
        image = np.asarray(image)[:,:,:3].astype('float32')
        image = convert_to_tensor(np.expand_dims(image,axis=0))
        label=labels[i]
        label = tf.convert_to_tensor(np.array([label]))
        _, clipped, is_adv = attack(model_to_fool,image,label,epsilons=epsilon)
        if bool(is_adv[0]) ==True:
            adv_list.append(np.array(clipped[0][0]))
            adv_true_label.append(labels[i]) 
            benign_list.append(image) 
            
    for i,image in enumerate(benign_list):
        benign_list[i]=np.squeeze(image)
        
    return(list(adv_list),list(benign_list),adv_true_label)


In [12]:
def make_adv_data_set(data_set_name,model_effnet):
    attack_FGSM=LinfFastGradientAttack()
    if exists('data/adv images and benign images '+str(data_set_name))==False:
        (adv_list,benign_list,adv_true_label)=data_set_maker(model_effnet,attack_FGSM, X_test[:6000], y_test[:6000])
        with open('data/adv images and benign images '+str(data_set_name), 'wb') as f:
            pickle.Pickler(f).dump(adv_list)
            pickle.Pickler(f).dump(benign_list)
            pickle.Pickler(f).dump(adv_true_label)
    else:
        with open('data/adv images and benign images '+str(data_set_name), 'rb') as f:
            adv_list=pickle.Unpickler(f).load()
            benign_list=pickle.Unpickler(f).load()
            adv_true_label=pickle.Unpickler(f).load()

    ###let's add some benign examples to the data set and shuffle the result

    adv_list.extend(X_test[6000:7000])
    benign_list.extend(X_test[6000:7000])
    adv_true_label.extend(y_test[6000:7000])

    adv_list=np.array(adv_list)
    benign_list=np.array(benign_list)
    adv_true_label=np.array(adv_true_label)

    indices = np.arange(len(adv_list))
    random.shuffle(indices)
    adv_list = adv_list[indices]
    benign_list = benign_list[indices]
    adv_true_label = adv_true_label[indices]
    return(adv_list,benign_list,adv_true_label)

let's add some non advsersarial images in the data set

### 4. Denoiser training

In [13]:
# plt.figure(figsize=(25,45))
# for i,(adv_image, true_label) in enumerate(zip(adv_list[len(adv_list)-1000:len(adv_list)-1000+10],adv_true_label[len(adv_list)-1000:len(adv_list)-1000+10])): 
   
#     i=2*i
#     ax = plt.subplot(5, 2, i + 1)
#     plt.imshow(adv_image.astype('int32'))
#     predicted_adv_label=dict_classes[np.argmax(model_effnet.predict(np.expand_dims(adv_image,axis=0)))]
#     true_label=dict_classes[true_label]
#     if true_label==predicted_adv_label:
#         color='green'
#     else:
#         color='red'
#     plt.title('true label: '+str(true_label)+'\n predicted_label: '+str(predicted_adv_label),color=color)
    
#     ax = plt.subplot(5, 2, i + 2)
#     new_image=autoencoder.predict(np.expand_dims(adv_image,axis=0))
#     predicted_label=dict_classes[np.argmax(model_effnet.predict(new_image))]
#     plt.imshow(np.squeeze(new_image).astype('int32'))
#     if true_label==predicted_label:
#         color='green'
#     else:
#         color='red'
#     plt.title('true label: '+str(true_label)+'\n predicted_label: '+str(predicted_label),color=color)
#     plt.axis("off")
# plt.show()

Let's add the denoiser on top of the effnet_model.
Beforehand we made the denoiser not trainable

### 5. Useful functions

Let's define the peformances metrics

In [14]:
def DOC (x_adv_list,x_list):
    N=len(x_adv_list)
    sum=0
    for i in range (N):        
        sum+=100*np.linalg.norm(np.reshape(x_adv_list[i] - x_list[i], -1),ord=1)/np.linalg.norm(np.reshape(x_list[i], -1),ord=1)
    return(sum/N)

def succes_rate (success_list):
    success=len([i for i in success_list if i==True])
    total=len(success_list)
    return(success/total)

### 6. Attack robusteness

##### White box attack

In [15]:
def attack_performances_computer(model_to_attack,predicting_model,attack, image_list, labels,epsilon):
    model_to_attack=TensorFlowModel(model_to_attack , bounds=(0, 255))
    success_on_attacked_model=[]
    success_on_predicting_model=[]
    adv_list=[]
    labels=list(map(np.argmax,labels))
    for i,image in enumerate(tqdm(image_list,position=0)):
        image = np.asarray(image)[:,:,:3].astype('float32')
        image = convert_to_tensor(np.expand_dims(image,axis=0))
        label=labels[i]
        label = tf.convert_to_tensor(np.array([label]))
        _, clipped, is_adv = attack(model_to_attack,image,label,epsilons=epsilon)
        success_on_attacked_model.append(bool(is_adv[0]))
        adv_list.append(np.array(clipped[0]))
        prediction=predicting_model.predict(np.expand_dims(clipped[0],axis=0))
        if np.argmax(prediction)!=labels[i]:
            success_on_predicting_model.append(True)
        else:
            success_on_predicting_model.append(False)
            
        
    DOC_attack=DOC(adv_list,image_list)
    SR_on_attacked_model=succes_rate(success_on_attacked_model)
    SR_on_predicting_model=succes_rate(success_on_predicting_model)
    return(DOC_attack,SR_on_attacked_model,SR_on_predicting_model)


In [16]:
def attack_runner(model_to_attack,predicting_model,image_list, labels_list, epislons_list):  
    
    attack_FGSM=LinfFastGradientAttack()
    SR_FGSM_dic={}  
    SR_FGSM_dic_with_defense={}
    for epsilon in epislons_list:
        print('======'+'epislon: '+str(epsilon)+'======')
        DOC_FGSM,SR_FGSM,SR_FGSM_with_defense=attack_performances_computer(model_to_attack,predicting_model,attack_FGSM, image_list, labels_list,[epsilon])      
        SR_FGSM_dic[DOC_FGSM]=SR_FGSM
        SR_FGSM_dic_with_defense[DOC_FGSM]=SR_FGSM_with_defense   
    return(SR_FGSM_dic,SR_FGSM_dic_with_defense)

In [17]:
def total_run(data_set_name):
    print("====data set selection====")
    (X_train,X_test,y_train,y_test)=pick_data_set(data_set_name)
    print("====benign model training====")
    model_effnet=train_and_save_effnet(data_set_name)
    print("====adversarial data set creation====")
    (adv_list,benign_list,adv_true_label)=make_adv_data_set(data_set_name,model_effnet)
    autoencoder = Denoise()
    print("====Denoiser training====")
    autoencoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError(),metrics=['MeanSquaredError'])
    autoencoder.fit(np.array(adv_list[:len(adv_list)-1000]),np.array(benign_list[:len(adv_list)-1000]),
                epochs=30,
                batch_size=32,
                shuffle=True)
    
    autoencoder.trainable=False
    inputs=Input(shape=(32,32,3))
    x=autoencoder(inputs)
    o=model_effnet(x)

    model_with_defense=Model(inputs,[o])

    model_with_defense.compile(
        loss='categorical_crossentropy',
        optimizer='nadam',
        metrics=['accuracy']
        )
    print("====Adversarial attacks on model+denoiser====")
    SR_FGSM_dic_without_defense,SR_FGSM_dic_with_defense=attack_runner(model_effnet,model_with_defense,X_test[7000:7100], y_test[7000:7100], [0.1,0.5,1,5,7,10,15,20,30,50])
    if exists('data/denosier_efficiency_white_box_'+str(data_set_name))==False:
        with open('data/denosier_efficiency_white_box_'+str(data_set_name), 'wb') as f:
                pickle.Pickler(f).dump(SR_FGSM_dic_without_defense)
                pickle.Pickler(f).dump(SR_FGSM_dic_with_defense)
                
    return(SR_FGSM_dic_without_defense,SR_FGSM_dic_with_defense)

In [None]:
(SR_FGSM_dic_without_defense,SR_FGSM_dic_with_defense)=total_run('Mnist')

====data set selection====
====benign model training====


W0203 19:10:03.159173 140541819410240 deprecation.py:323] From /home/timotheerio/.local/lib/python3.6/site-packages/foolbox/models/tensorflow.py:13: is_gpu_available (from tensorflow.python.framework.test_util) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
  0%|          | 0/6000 [00:00<?, ?it/s]

====adversarial data set creation====


  4%|▍         | 249/6000 [02:00<45:22,  2.11it/s]

In [None]:
plt.plot(list(SR_FGSM_dic_with_defense.keys()),list(SR_FGSM_dic_with_defense.values()),label='with denoiser')
plt.plot(list(SR_FGSM_dic_without_defense.keys()),list(SR_FGSM_dic_without_defense.values()),label='without denoiser')
# plt.xscale('log')
plt.grid(True,which="both", linestyle='--')
plt.title('Successs rate for different degrees of change \n on Mnist images',fontsize=11)
plt.xlabel('DOC (%)')
plt.ylabel('SR')
plt.legend(loc='lower right')

In [None]:
(SR_FGSM_dic_without_defense,SR_FGSM_dic_with_defense)=total_run('Cifar')

In [None]:
plt.plot(list(SR_FGSM_dic_with_defense.keys()),list(SR_FGSM_dic_with_defense.values()),label='with denoiser')
plt.plot(list(SR_FGSM_dic_without_defense.keys()),list(SR_FGSM_dic_without_defense.values()),label='without denoiser')
# plt.xscale('log')
plt.grid(True,which="both", linestyle='--')
plt.title('Successs rate for different degrees of change \n on Cifar images',fontsize=11)
plt.xlabel('DOC (%)')
plt.ylabel('SR')
plt.legend(loc='lower right')

### Test of the denoiser againt a black box attack (boundary attack)

In [None]:
def prep_models(data_set_name):
    (X_train,X_test,y_train,y_test)=pick_data_set(data_set_name)
    model_effnet=train_and_save_effnet(data_set_name)
    (adv_list,benign_list,adv_true_label)=make_adv_data_set(data_set_name,model_effnet)
    
    autoencoder.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError(),metrics=['MeanSquaredError'])
    autoencoder.fit(np.array(adv_list[:len(adv_list)-1000]),np.array(benign_list[:len(adv_list)-1000]),
                epochs=30,
                batch_size=32,
                shuffle=True)
    
    autoencoder.trainable=False
    inputs=Input(shape=(32,32,3))
    x=autoencoder(inputs)
    o=model_effnet(x)

    model_with_defense=Model(inputs,[o])

    model_with_defense.compile(
        loss='categorical_crossentropy',
        optimizer='nadam',
        metrics=['accuracy']
        )
    return(model_effnet,model_with_defense)

In [None]:
(model_without_def,model_with_def)=prep_models('Mnist')
(X_train,X_test,y_train,y_test)=pick_data_set('Mnist')

In [None]:
classifier = TensorFlowV2Classifier(model=model_without_def, input_shape=(32,32,3),clip_values=(0, 255),nb_classes=10)
degree_of_change_without_def={}
attack = BoundaryAttack(estimator=classifier, targeted=False, max_iter=0, delta=0.001, epsilon=0.01)
iter_step =1
target=X_test[1]
image_list=[]
x_adv = None
for i in range(100):
    x_adv = attack.generate(x=np.array([target]), x_adv_init=x_adv)

    #clear_output()
    print("Adversarial image at step %d." % (i * iter_step), "L2 error", 
          np.linalg.norm(np.reshape(x_adv[0] - target, [-1])),
          "and class label %d." % np.argmax(classifier.predict(x_adv)[0]))
    plt.imshow(x_adv[0][..., ::-1].astype('int32'))
    image_list.append(x_adv[0][..., ::-1].astype(np.uint))
    plt.show(block=False)
    degree_of_change_without_def[i * iter_step]=DOC([x_adv[0]],[target])
    
    if hasattr(attack, 'curr_delta') and hasattr(attack, 'curr_epsilon'):
        attack.max_iter = iter_step 
        attack.delta = attack.curr_delta
        attack.epsilon = attack.curr_epsilon
    else:
        break

In [None]:
classifier = TensorFlowV2Classifier(model=model_with_def, input_shape=(32,32,3),clip_values=(0, 255),nb_classes=10)
degree_of_change_with_def={}
attack = BoundaryAttack(estimator=classifier, targeted=False, max_iter=0, delta=0.001, epsilon=0.01)
iter_step =1
image_list=[]
target=X_test[1]
x_adv = None
for i in range(100):
    x_adv = attack.generate(x=np.array([target]), x_adv_init=x_adv)

    #clear_output()
    print("Adversarial image at step %d." % (i * iter_step), "L2 error", 
          np.linalg.norm(np.reshape(x_adv[0] - target, [-1])),
          "and class label %d." % np.argmax(classifier.predict(x_adv)[0]))
    plt.imshow(x_adv[0][..., ::-1].astype('int32'))
    image_list.append(x_adv[0][..., ::-1].astype(np.uint))
    plt.show(block=False)
    degree_of_change_with_def[i * iter_step]=DOC([x_adv[0]],[target])
    
    if hasattr(attack, 'curr_delta') and hasattr(attack, 'curr_epsilon'):
        attack.max_iter = iter_step 
        attack.delta = attack.curr_delta
        attack.epsilon = attack.curr_epsilon
    else:
        break


In [None]:
plt.plot(list(degree_of_change_without_def.keys()),list(degree_of_change_without_def.values()),label='DOC without denoiser')
plt.plot(list(degree_of_change_with_def.keys()),list(degree_of_change_with_def.values()),label='DOC with denoiser')
plt.grid(True,which="both", linestyle='--')
plt.title('DOC of the adversarial image with respect to the number of iterations on EfficientNet', fontsize=8)
plt.xlabel('Iterarion')
plt.ylabel('DOC (%)')

plt.legend(loc='upper right')
plt.show()