# A Deeper Analysis of Adversarial Examples in Intrusion Detection

## Libraries import

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
import copy
import time as time

import tensorflow as tf
import pickle as pkl

from sklearn.metrics import classification_report
from sklearn.decomposition import PCA
from sklearn.feature_selection import chi2

!pip install adversarial-robustness-toolbox >/dev/null
from art.attacks.evasion import FastGradientMethod, BasicIterativeMethod, CarliniL2Method, ProjectedGradientDescentTensorFlowV2, SaliencyMapMethod, DeepFool
from art.classifiers import TensorFlowV2Classifier

%matplotlib inline

pd.options.display.max_columns = 200
pd.options.display.max_rows = 200

# Select the training device
device = '/GPU:0'



## NSL-KDD Pre-Processing

### Dowloading and importing the dataset

In [None]:
#Downloading and extracting the dataset if it doesn't exist
!if [ ! -d "./NSL-KDD" ]; then wget http://205.174.165.80/CICDataset/NSL-KDD/Dataset/NSL-KDD.zip; mkdir NSL-KDD; unzip NSL-KDD.zip -d NSL-KDD; fi
    
#Importing the training and testing datasets from .CSV to Pandas DataFrames
features = ['1 Duration', '2 Protocol-type : ', '3 Service : ', '4 Flag : ', '5 Src-bytes', '6 Dst-bytes', '7 Land', '8 Wrong-fragment', '9 Urgent', '10 Hot', '11 Num-failed-logins', '12 Logged-in', '13 Num-compromised', '14 Root-shell', '15 Su-attempted', '16 Num-root', '17 Num-file-creations', '18 Num-shells', '19 Num-access-files', '20 Num-outbound-cmds', '21 Is-host-login', '22 Is-guest-login', '23 Count', '24 Srv-count', '25 Serror-rate', '26 Srv-serror-rate', '27 Rerror-rate', '28 Srv-rerror-rate', '29 Same-srv-rate', '30 Diff-srv-rate', '31 Srv-diff-host-rate', '32 Dst-host-count', '33 Dst-host-srv-count', '34 Dst-host-same-srv-rate', '35 Dst-host-diff-srv-rate', '36 Dst-host-same-src-port-rate', '37 Dst-host-srv-diff-host-rate', '38 Dst-host-serror-rate', '39 Dst-host-srv-serror-rate', '40 Dst-host-rerror-rate', '41 Dst-host-srv-rerror-rate', '42 Attack_type', '43 Difficulty']
df_training = pd.read_csv('./NSL-KDD/KDDTrain+_20Percent.txt', names=features)
df_testing = pd.read_csv('./NSL-KDD/KDDTest+.txt', names=features)
# Stack the training and test sets
data = pd.concat([df_training, df_testing], axis=0)

--2021-08-29 18:55:40--  http://205.174.165.80/CICDataset/NSL-KDD/Dataset/NSL-KDD.zip
Connecting to 205.174.165.80:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6598776 (6.3M) [application/zip]
Saving to: ‘NSL-KDD.zip’


2021-08-29 18:55:46 (4.59 MB/s) - ‘NSL-KDD.zip’ saved [6598776/6598776]

Archive:  NSL-KDD.zip
  inflating: NSL-KDD/index.html      
  inflating: NSL-KDD/KDDTest1.jpg    
  inflating: NSL-KDD/KDDTest-21.arff  
  inflating: NSL-KDD/KDDTest-21.txt  
  inflating: NSL-KDD/KDDTest+.arff   
  inflating: NSL-KDD/KDDTest+.txt    
  inflating: NSL-KDD/KDDTrain1.jpg   
  inflating: NSL-KDD/KDDTrain+.arff  
  inflating: NSL-KDD/KDDTrain+.txt   
  inflating: NSL-KDD/KDDTrain+_20Percent.arff  
  inflating: NSL-KDD/KDDTrain+_20Percent.txt  


### Removing unused features

In [None]:
# Drop the last column (which might be the difficulty, so it's useless)
data.drop('43 Difficulty', inplace=True, axis=1)
# Drop the 19th column wich is full of 0, so has std=0. which causes issues for the normalization
data.drop('20 Num-outbound-cmds', inplace=True, axis=1)

### Transforming the problem into binary clasification

In [None]:
# Transform the nominal attribute "Attack type" into binary (0 : normal / 1 : attack)
labels = (data['42 Attack_type'] != 'normal').astype('int64')
data['42 Labels'] = labels
data.drop('42 Attack_type', inplace=True, axis=1)

### One Hot Encoding the categorical features

In [None]:
# One Hot Encode the 3 first nominal attributes and drop them
for i in ['4 Flag : ', '3 Service : ', '2 Protocol-type : ']:
    # Create the One Hot Encode DataFrame
    dum = pd.get_dummies(data[i])
    # Insert into the dataset DataFrame by Series
    for column_name in list(dum.columns):
        data.insert(1, str(i)+column_name, dum[column_name])
        data[str(i)+column_name] = data[str(i)+column_name].astype('int64')
    # Drop the old attribute's column
    data.drop(i, inplace=True, axis=1)

### Spliting the training and test set 

In [None]:
# Split training and test sets
df_training = data[:df_training.shape[0]]    
df_testing = data[df_training.shape[0]:]

### Normalizing the data using Min-Max

In [None]:
# Min-Max normalization on the non binary features
for i in ['1 Duration', '5 Src-bytes', '6 Dst-bytes', '8 Wrong-fragment', '9 Urgent', '10 Hot', '11 Num-failed-logins', '13 Num-compromised', '15 Su-attempted', '16 Num-root', '17 Num-file-creations', '18 Num-shells', '19 Num-access-files', '23 Count', '24 Srv-count', '25 Serror-rate', '26 Srv-serror-rate', '27 Rerror-rate', '28 Srv-rerror-rate', '29 Same-srv-rate', '30 Diff-srv-rate', '31 Srv-diff-host-rate', '32 Dst-host-count', '33 Dst-host-srv-count', '34 Dst-host-same-srv-rate', '35 Dst-host-diff-srv-rate', '36 Dst-host-same-src-port-rate', '37 Dst-host-srv-diff-host-rate', '38 Dst-host-serror-rate', '39 Dst-host-srv-serror-rate', '40 Dst-host-rerror-rate', '41 Dst-host-srv-rerror-rate']:
    # The min and max are only computed from the training set
    min_val = df_training[i].min()
    max_val = df_training[i].max()
    df_training[i] = ((df_training[i] - min_val) / (max_val - min_val)) 
    df_testing[i] = ((df_testing[i] - min_val) / (max_val - min_val)) 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  import sys


### Convert the training and testing set into NumPy array

In [None]:
# Get NumPy arrays from DataFrames
nd_training = df_training.values
nd_testing = df_testing.values

### Extracting the labels and making copies

In [None]:
# Separating arguments (x) from lables (y)
x_train = nd_training[:, :-1]
y_train = nd_training[:, -1]
x_test = nd_testing[:, :-1]
y_test = nd_testing[:, -1]

# Make a copy of the data set as NumPy arrays
x_train_np = x_train.copy()
y_train_np = y_train.copy()
x_test_np = x_test.copy()
y_test_np = y_test.copy()

## Neural Network Model Training

### Convert the training and testing set into TensorFlow tensors

In [None]:
y_ohe_train = y_train_np.copy()
y_ohe_test = y_test_np.copy()
y_ohe_train = pd.get_dummies(y_ohe_train)
y_ohe_test = pd.get_dummies(y_ohe_test)

with tf.device(device_name=device):
  x_train = tf.convert_to_tensor(x_train, np.float32)
  x_test = tf.convert_to_tensor(x_test, np.float32)
  y_ohe_train = tf.convert_to_tensor(y_ohe_train, np.float32)
  y_ohe_test = tf.convert_to_tensor(y_ohe_test, np.float32)

### Define a neural network with 3 ReLU layers and a Softmax output




In [None]:
neuron_number = 256 # How many neuron for each hidden layer
output_number = 2 # How many output neurons
features_number = x_train.shape[1] # How many features in the input
with tf.device(device_name=device):
  model = tf.keras.Sequential(layers=[
      tf.keras.layers.Dense(neuron_number, input_shape=(features_number,), activation="relu"), # input = features_number, output = 256
      tf.keras.layers.Dense(neuron_number, activation="relu"), # input = 256 (implied), output = 256
      tf.keras.layers.Dense(neuron_number, activation="relu"), # input = 256 (implied), output = 256
      tf.keras.layers.Dense(output_number, activation="softmax") # input = 256 (implied), output = 2
  ])

### Train the model

In [None]:
eval_metric = 'categorical_accuracy'
lr = 0.01
optim = tf.keras.optimizers.Adam(learning_rate=lr)
loss_fn = tf.keras.losses.CategoricalCrossentropy()
model.compile(optimizer=optim,
              loss=loss_fn,
              metrics=[eval_metric])

best_value = 0.0 # Best evaluation metrics on test dataset
best_testing_model_file = "./best_model_testing"

# Iterrating on the dataset
for i in range(10):
  print("Iteration {}, best value is {}".format(str(i), str(best_value)))
  model.fit(x=x_train, y=y_ohe_train, epochs=10, batch_size=100)
  eval_metrics = model.evaluate(x_test, y_ohe_test, verbose=2)
  eval_value = eval_metrics[model.metrics_names.index(eval_metric)]
  if eval_value > best_value:
    best_value = eval_value
    tf.keras.models.save_model(model, best_testing_model_file)

model = tf.keras.models.load_model(best_testing_model_file)
model.evaluate(x_test, y_ohe_test, verbose=2)

Iteration 0, best value is 0.0
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
705/705 - 1s - loss: 2.3547 - categorical_accuracy: 0.7810
INFO:tensorflow:Assets written to: ./best_model_testing/assets
Iteration 1, best value is 0.7809616923332214
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
705/705 - 1s - loss: 2.6378 - categorical_accuracy: 0.7891
INFO:tensorflow:Assets written to: ./best_model_testing/assets
Iteration 2, best value is 0.7891234755516052
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
705/705 - 1s - loss: 5.7406 - categorical_accuracy: 0.7921
INFO:tensorflow:Assets written to: ./best_model_testing/assets
Iteration 3, best value is 0.7920954823493958
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
705/705 - 1s - loss: 

[5.913880348205566, 0.8041163682937622]

## Adversarial Attacks

### Prepare the datasets

In [None]:
#values_copy = df_training.copy().values
#y_ohe_test = y_train_np.copy()
values_copy = df_testing.copy().values
y_ohe_test = y_test_np.copy()
y_ohe_test = pd.get_dummies(y_ohe_test)

x_test = values_copy[:, :-1]
y_test = y_ohe_test.values

### Define the model in ART

In [None]:
# Applying the TensorFlow wrapper
lr = 0.01
loss_fn = tf.keras.losses.CategoricalCrossentropy()
classifier = TensorFlowV2Classifier(model=model, loss_object=loss_fn, input_shape=(x_train.shape[1],), nb_classes=2)

### Clean Data

In [None]:
# Evaluation the model against clean examples
model.evaluate(x=x_test, y=y_test, verbose=2)

# Exporting the clean examples in a .pkl file
with open("clean_train.pkl", "wb") as f:
  pkl.dump(x_test, f)

705/705 - 1s - loss: 5.9139 - categorical_accuracy: 0.8041


### Fast Gradient Sign Method (FGSM)

In [None]:
# Creating the adversarial examples crafter
adversarial_crafter = FastGradientMethod(classifier,
                                         norm=np.inf,
                                         eps=0.1,
                                         targeted=False,
                                         num_random_init=100,
                                         batch_size=128)

# Generating the adversarial examples
adversarial_examples = adversarial_crafter.generate(x=x_test, y=y_test)
with tf.device(device_name=device):
  adversarial_examples = tf.convert_to_tensor(adversarial_examples, np.float32)

# Evaluating the model against the adversarial examples
model.evaluate(x=adversarial_examples, y=y_test, verbose=2)

# Exporting the adversarial examples in a .pkl file
with open("FGSM_train.pkl", "wb") as f:
  pkl.dump(adversarial_examples, f)

705/705 - 1s - loss: 81.4041 - categorical_accuracy: 0.4055


### Projected Gradient Descent (PGD)

In [None]:
# Creating the adversarial examples crafter
adversarial_crafter = ProjectedGradientDescentTensorFlowV2(classifier,
                                                           norm=np.inf,
                                                           eps=0.1,
                                                           eps_step=0.001,
                                                           max_iter=100,
                                                           targeted=False,
                                                           batch_size=128,
                                                           verbose=True)

# Generating the adversarial examples
adversarial_examples = adversarial_crafter.generate(x=x_test, y=y_test)
with tf.device(device_name=device):
  adversarial_examples = tf.convert_to_tensor(adversarial_examples, np.float32)

# Evaluating the model against the adversarial examples
model.evaluate(x=adversarial_examples, y=y_test, verbose=2)

# Exporting the adversarial examples in a .pkl file
with open("PGD_train.pkl", "wb") as f:
  pkl.dump(adversarial_examples, f)

PGD - Batches: 0it [00:00, ?it/s]

705/705 - 1s - loss: 139.2753 - categorical_accuracy: 0.3275


### Carlini & Wagner (CW)

In [None]:
# Creating the adversarial examples crafter
adversarial_crafter = CarliniL2Method(classifier,
                                      confidence=0.3,
                                      targeted=False,
                                      learning_rate=1,
                                      binary_search_steps=45,
                                      max_iter=10,
                                      initial_const=0.01,
                                      max_halving=5,
                                      max_doubling=5,
                                      batch_size=128)

# Generating the adversarial examples
adversarial_examples = adversarial_crafter.generate(x=x_test, y=y_test)

# The transformation to tanh space introduce some small perturbation, we remove it to get the exact statistics
adversarial_examples = pd.DataFrame(adversarial_examples)
adversarial_examples[(np.abs(adversarial_examples - x_test) < 10e-6)] = x_test
adversarial_examples = adversarial_examples.values

with tf.device(device_name=device):
  adversarial_examples = tf.convert_to_tensor(adversarial_examples, np.float32)

# Evaluating the model against the adversarial examples
model.evaluate(x=adversarial_examples, y=y_test, verbose=2)

# Exporting the adversarial examples in a .pkl file
with open("CW_train.pkl", "wb") as f:
  pkl.dump(adversarial_examples, f)

C&W L_2:   0%|          | 0/197 [00:00<?, ?it/s]

788/788 - 1s - loss: 30.1498 - categorical_accuracy: 0.7112


### DeepFool (DF)

In [None]:
# Creating the adversarial examples crafter
adversarial_crafter = DeepFool(classifier,
                               epsilon=0.01,
                               batch_size=128)

# Generating the adversarial examples
adversarial_examples = adversarial_crafter.generate(x=x_test, y=y_test)
with tf.device(device_name=device):
  adversarial_examples = tf.convert_to_tensor(adversarial_examples, np.float32)

# Evaluating the model against the adversarial examples
model.evaluate(x=adversarial_examples, y=y_test, verbose=2)

# Exporting the adversarial examples in a .pkl file
with open("DF_train.pkl", "wb") as f:
  pkl.dump(adversarial_examples, f)

DeepFool:   0%|          | 0/197 [00:00<?, ?it/s]

788/788 - 1s - loss: 18443.9648 - categorical_accuracy: 0.6775


## Adversarial Dataset

### Create adversarial dataset for training

In [None]:
path = "" # If you saved your .pkl file in a specific path

# Clean examples with label 0
with open(path+"clean_train.pkl", "rb") as f:
  clean_train = pkl.load(f)
  clean_train = pd.DataFrame(clean_train)
  labels_0 = [0 for i in range(clean_train.shape[0])]
  clean_train[clean_train.shape[1]] = labels_0

# Altered examples with label 1
labels_1 = [1 for i in range(clean_train.shape[0])]
with open(path+"FGSM_train.pkl", "rb") as f: # FGSM
  fgsm_dataset_train = pkl.load(f)
  fgsm_train_df = pd.DataFrame(fgsm_dataset_train.numpy())
  fgsm_train_df[clean_train.shape[1] - 1] = labels_1
with open(path+"PGD_train.pkl", "rb") as f: # PGD
  pgd_dataset_train = pkl.load(f)
  pgd_train_df = pd.DataFrame(pgd_dataset_train.numpy())
  pgd_train_df[clean_train.shape[1] - 1] = labels_1
with open(path+"CW_train.pkl", "rb") as f: # CW
  cw_dataset_train = pkl.load(f)
  cw_train_df = pd.DataFrame(cw_dataset_train.numpy())
  cw_train_df[clean_train.shape[1] - 1] = labels_1
with open(path+"DF_train.pkl", "rb") as f: # DF
  df_dataset_train = pkl.load(f)
  df_train_df = pd.DataFrame(df_dataset_train.numpy())
  df_train_df[clean_train.shape[1] - 1] = labels_1

# Create a balanced dataset containing all attacks
size_quarter = clean_train.shape[0] // 4
balanced_dataset_train = pd.DataFrame().append([
    fgsm_train_df.iloc[:size_quarter],
    pgd_train_df.iloc[size_quarter: 2*size_quarter],
    cw_train_df.iloc[2*size_quarter:3*size_quarter],
    df_train_df.iloc[3*size_quarter:],
])

# Create each dataset
fgsm_adv_dataset_train = clean_train.append(fgsm_train_df, ignore_index=True)
pgd_adv_dataset_train = clean_train.append(pgd_train_df, ignore_index=True)
cw_adv_dataset_train = clean_train.append(cw_train_df, ignore_index=True)
df_adv_dataset_train = clean_train.append(df_train_df, ignore_index=True)
balanced_adv_dataset_train = clean_train.append(balanced_dataset_train, ignore_index=True)

# Converting the chosen dataset to tensor
with tf.device(device):
  x_adv_train = tf.convert_to_tensor((fgsm_adv_dataset_train.values[:, :-1]),  np.float32)
  y_adv_train = tf.convert_to_tensor((fgsm_adv_dataset_train.values[:, -1]),  np.float32)

### Create adversarial dataset for testing

In [None]:
path = "" # If you saved your .pkl file in a specific path

# Clean examples with label 0
with open(path+"clean_test.pkl", "rb") as f:
  clean_test = pkl.load(f)
  clean_test = pd.DataFrame(clean_test)
  labels = [0 for i in range(clean_test.shape[0])]
  clean_test[clean_test.shape[1]] = labels

# Altered examples with label 1
labels = [1 for i in range(clean_test.shape[0])]
with open(path+"FGSM_test.pkl", "rb") as f: # FGSM
  fgsm_dataset_test = pkl.load(f)
  fgsm_test_df = pd.DataFrame(fgsm_dataset_test.numpy())
  fgsm_test_df[clean_test.shape[1] - 1] = labels
with open(path+"PGD_test.pkl", "rb") as f: # PGD
  pgd_dataset_test = pkl.load(f)
  pgd_test_df = pd.DataFrame(pgd_dataset_test.numpy())
  pgd_test_df[clean_test.shape[1] - 1] = labels
with open(path+"CW_test.pkl", "rb") as f: # CW
  cw_dataset_test = pkl.load(f)
  cw_test_df = pd.DataFrame(cw_dataset_test.numpy())
  cw_test_df[clean_test.shape[1] - 1] = labels
with open(path+"DF_test.pkl", "rb") as f: # DF
  df_dataset_test = pkl.load(f)
  df_test_df = pd.DataFrame(df_dataset_test.numpy())
  df_test_df[clean_test.shape[1] - 1] = labels

# Create a balanced dataset containing all attacks
size_quarter = clean_test.shape[0] // 4
balanced_dataset_test = pd.DataFrame().append([
    fgsm_test_df.iloc[:size_quarter],
    pgd_test_df.iloc[size_quarter:2*size_quarter],
    cw_test_df.iloc[2*size_quarter:3*size_quarter],
    df_test_df.iloc[3*size_quarter:],
])

# Create each dataset
fgsm_adv_dataset_test = clean_test.append(fgsm_test_df, ignore_index=True)
pgd_adv_dataset_test = clean_test.append(pgd_test_df, ignore_index=True)
cw_adv_dataset_test = clean_test.append(cw_test_df, ignore_index=True)
df_adv_dataset_test = clean_test.append(df_test_df, ignore_index=True)
balanced_adv_dataset_test = clean_test.append(balanced_dataset_test, ignore_index=True)

# Converting the chosen dataset to tensor
with tf.device(device):
  x_adv_test = tf.convert_to_tensor((pdg_adv_dataset_test.values[:, :-1]),  np.float32)
  y_adv_test = tf.convert_to_tensor((pdg_adv_dataset_test.values[:, -1]),  np.float32)

FileNotFoundError: ignored

## Adversarial Detectors

### Get the weights and config of each layer of the classification network

In [None]:
weights_L0 = model.layers[0].get_weights()
config_L0 = model.layers[0].get_config()
weights_L1 = model.layers[1].get_weights()
config_L1 = model.layers[1].get_config()
weights_L2 = model.layers[2].get_weights()
config_L2 = model.layers[2].get_config()

### Copy the layers config into new layers

In [None]:
copy_L0_AD0 = tf.keras.layers.Dense.from_config(config_L0)
copy_L0_AD1 = tf.keras.layers.Dense.from_config(config_L0)
copy_L0_AD2 = tf.keras.layers.Dense.from_config(config_L0)
copy_L1_AD1 = tf.keras.layers.Dense.from_config(config_L1)
copy_L1_AD2 = tf.keras.layers.Dense.from_config(config_L1)
copy_L2_AD2 = tf.keras.layers.Dense.from_config(config_L2)

### Create the ADs (Duplication + Fine-Tuning)

In [None]:
neuron_number = 256
output_number = 1

with tf.device(device_name=device):
  AD0 = tf.keras.models.Sequential(layers=[
      copy_L0_AD0,
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(output_number, activation="sigmoid", name="output_layer")
  ])
  AD0.layers[0].set_weights(weights_L0)

  AD1 = tf.keras.models.Sequential(layers=[
      copy_L0_AD1,
      copy_L1_AD1,
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(output_number, activation="sigmoid", name="output_layer")
  ])
  AD1.layers[0].set_weights(weights_L0)
  AD1.layers[1].set_weights(weights_L1)

  AD2 = tf.keras.models.Sequential(layers=[
      copy_L0_AD2,
      copy_L1_AD2,
      copy_L2_AD2,
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(neuron_number, activation="relu"),
      tf.keras.layers.Dense(output_number, activation="sigmoid", name="output_layer")
  ])
  AD2.layers[0].set_weights(weights_L0)
  AD2.layers[1].set_weights(weights_L1)
  AD2.layers[2].set_weights(weights_L2)

### Train the ADs

In [None]:
eval_metric = 'binary_accuracy'
lr = 0.01
optim0 = tf.keras.optimizers.Adam(learning_rate=lr)
loss_fn0 = tf.keras.losses.BinaryCrossentropy()
AD0.compile(optimizer=optim0,
            loss=loss_fn0,
            metrics=[eval_metric])
optim1 = tf.keras.optimizers.Adam(learning_rate=lr)
loss_fn1 = tf.keras.losses.BinaryCrossentropy()
AD1.compile(optimizer=optim1,
            loss=loss_fn1,
            metrics=[eval_metric])
optim2 = tf.keras.optimizers.Adam(learning_rate=lr)
loss_fn2 = tf.keras.losses.BinaryCrossentropy()
AD2.compile(optimizer=optim2,
            loss=loss_fn2,
            metrics=[eval_metric])

# ---------------------------------------------------------------

best_value = 0.0 # Best evaluation metrics on test dataset
best_AD0_file = "./AD0"

# Iterrating on the dataset
for i in range(3):
  print("AD0 Iteration {}".format(str(i)))
  AD0.fit(x=x_adv_train, y=y_adv_train, epochs=10, batch_size=100)
  eval_metrics = AD0.evaluate(x_adv_test, y_adv_test, verbose=2)
  eval_value = eval_metrics[AD0.metrics_names.index(eval_metric)]
  if eval_value > best_value:
    best_value = eval_value
    tf.keras.models.save_model(AD0, best_AD0_file)

# -----------------------------------------------------------------

best_value = 0.0 # Best evaluation metrics on test dataset
best_AD1_file = "./AD1"

# Iterrating on the dataset
for i in range(3):
  print("AD1 Iteration {}".format(str(i)))
  AD1.fit(x=x_adv_train, y=y_adv_train, epochs=10, batch_size=100)
  eval_metrics = AD1.evaluate(x_adv_test, y_adv_test, verbose=2)
  eval_value = eval_metrics[AD1.metrics_names.index(eval_metric)]
  if eval_value > best_value:
    best_value = eval_value
    tf.keras.models.save_model(AD1, best_AD1_file)

# ------------------------------------------------------------------

best_value = 0.0 # Best evaluation metrics on test dataset
best_AD2_file = "./AD2"

# Iterrating on the dataset
for i in range(3):
  print("AD2 Iteration {}".format(str(i)))
  AD2.fit(x=x_adv_train, y=y_adv_train, epochs=10, batch_size=100)
  eval_metrics = AD2.evaluate(x_adv_test, y_adv_test, verbose=2)
  eval_value = eval_metrics[AD2.metrics_names.index(eval_metric)]
  if eval_value > best_value:
    best_value = eval_value
    tf.keras.models.save_model(AD2, best_AD2_file)


### Evaluate the ADs

In [None]:
AD0 = tf.keras.models.load_model("./AD0")
AD1 = tf.keras.models.load_model("./AD1")
AD2 = tf.keras.models.load_model("./AD2")

In [None]:
AD0.evaluate(x_adv_test, y_adv_test)
AD1.evaluate(x_adv_test, y_adv_test)
AD2.evaluate(x_adv_test, y_adv_test)

## Fusion Rules

### Definition

#### Majority Vote

In [None]:
def MajorityVoteRule(ClassProbabilities):
  ClassFrequency = np.argmax(ClassProbabilities, axis=1) 
  counts = np.bincount(ClassFrequency)
  MajorityVotingClass = np.argmax(counts)
  return MajorityVotingClass

#### Simple Bayes Average

In [None]:
def SimpleBayesAverageFusion(ClassProbabilities):
  SimpleBayesAverage=sum(ClassProbabilities,0)/ClassProbabilities.shape[0]
  if SimpleBayesAverage[0]>=SimpleBayesAverage[1]:
    return 0,SimpleBayesAverage[0]
  return 1,SimpleBayesAverage[1]

#### Dempster Shafer Evidence Combination

In [None]:
def DempsterRule(m1, m2):
    ## extract the frame of discernment      
    sets=set(m1.keys()).union(set(m2.keys()))
    result=dict.fromkeys(sets,0)
    ## Combination process
    for i in m1.keys():
        for j in m2.keys():
            if set(str(i)).intersection(set(str(j))) == set(str(i)):
                result[i]+=m1[i]*m2[j]
            elif set(str(i)).intersection(set(str(j))) == set(str(j)):
                result[j]+=m1[i]*m2[j]
                 
    ## normalize the results
    f= sum(list(result.values()))
    for i in result.keys():
        result[i] /=f
    return result

def DempsterShaferEvidenceCombinationRule (ClassProbabilities, Weights, DontKnowClass=False):
  MassFunctionList= []
  for i in range(ClassProbabilities.shape[0]):
    MassFunction = {"a":Weights[i,0]*ClassProbabilities[i,0], "b":Weights[i,1]*ClassProbabilities[i,1],"ab":1-Weights[i,0]*ClassProbabilities[i,0]-Weights[i,1]*ClassProbabilities[i,1]}
    MassFunctionList.append(MassFunction)
    
  for i,mass in enumerate(MassFunctionList):
    if i == 0:
      DSresult = mass
    else: 
      DSresult = DempsterRule(DSresult,mass)

  if DSresult['a'] >= DSresult['b']:
    return 0, DSresult['a']
  else:
    return 1, DSresult['b'] 

### Compute the results

#### Turn mono-neuronal output into bi-class probabilities

In [None]:
pred_AD0 = AD0.predict(x_adv_test).tolist()
pred_AD1 = AD1.predict(x_adv_test).tolist()
pred_AD2 = AD2.predict(x_adv_test).tolist()

proba_AD0 = []
for i in pred_AD0:
  proba_AD0.append([1-i[0], i[0]])

proba_AD1 = []
for i in pred_AD1:
  proba_AD1.append([1-i[0], i[0]])
  
proba_AD2 = []
for i in pred_AD2:
  proba_AD2.append([1-i[0], i[0]])

#### Majority Vote

In [None]:
guesses = []
for i in range(len(proba_AD0)):
  class_proba = np.array([proba_AD0[i], proba_AD1[i], proba_AD2[i]])
  guesses.append(MajorityVoteRule(class_proba))

correct = y_adv_test.numpy()
hit = 0
for i in range(len(guesses)):
  if guesses[i] == correct[i]:
    hit += 1
acc = hit / len(guesses)
print(acc)

#### Simple Bayes Average

In [None]:
guesses = []
for i in range(len(proba_AD0)):
  class_proba = np.array([proba_AD0[i], proba_AD1[i], proba_AD2[i]])
  guesses.append(SimpleBayesAverageFusion(class_proba))

correct = y_adv_test.numpy()
hit = 0
for i in range(len(guesses)):
  if guesses[i][0] == correct[i]:
    hit += 1
acc = hit / len(guesses)
print(acc)

#### Dempster Shafer Evidence Combination

In [None]:
# Get the weights
weights = []
correct = y_adv_test.numpy()
amount_1 = np.count_nonzero(correct == 1)
amount_0 = np.count_nonzero(correct == 0)
for proba_AD in [proba_AD0, proba_AD1, proba_AD2]:
  weight = []
  hit_0 = 0
  hit_1 = 0
  for i in range(len(proba_AD)):
    if proba_AD[i].index(max(proba_AD[i])) == correct[i]:
      if correct[i]:
        hit_1 += 1
      else:
        hit_0 += 1
  weight = [hit_0/amount_0, hit_1/amount_1]
  weights.append(weight)
weights = np.array(weights)

guesses = []
for i in range(len(proba_AD0)):
  class_proba = np.array([proba_AD0[i], proba_AD1[i], proba_AD2[i]])
  guesses.append(DempsterShaferEvidenceCombinationRule(class_proba, weights))

hit = 0
for i in range(len(guesses)):
  if guesses[i][0] == correct[i]:
    hit += 1
acc = hit / len(guesses)
print(acc)