In [1]:
import warnings
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'

def warn(*args, **kwargs):
    pass
warnings.warn = warn

In [2]:
import pickle
import typing
import numpy as np
from const import *
import pandas as pd
import tensorflow as tf
from sklearn.metrics import log_loss


In [3]:

def Critic(in_feature : int) -> tf.keras.models.Sequential:
  init = tf.keras.initializers.RandomNormal(stddev=0.02)
  model = tf.keras.models.Sequential(
    layers=[
      tf.keras.layers.Dense(units=128, kernel_initializer=init, input_shape=(in_feature,)),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU(alpha=0.2),
      
      tf.keras.layers.Dense(units=128, kernel_initializer=init),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU(alpha=0.2),
      
      tf.keras.layers.Dense(units=1, kernel_initializer=init)
    ],
    name='Critic'
  )

  return model

def Generator(latent_dim : int, out_feature : int) -> tf.keras.models.Sequential:
  init = tf.keras.initializers.RandomNormal(stddev=0.02)
  model = tf.keras.models.Sequential(
    layers=[
      tf.keras.layers.Dense(units=128, kernel_initializer=init, input_shape=(latent_dim,)),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU(alpha=0.2),
      
      tf.keras.layers.Dense(units=128, kernel_initializer=init),
      tf.keras.layers.BatchNormalization(),
      tf.keras.layers.LeakyReLU(alpha=0.2),
      
      tf.keras.layers.Dense(units=out_feature, kernel_initializer=init)
    ],
    name='Generator'
  )
  
  return model

def Blackbox(path : str) -> any:
  with open(path, 'rb') as handle:
    return pickle.load(handle)

In [4]:
def load_dataset(path : str) -> pd.DataFrame:
  X = pd.read_csv(path)
  # Select only probe attack
  X = X.drop(columns=['label'])[X['label'] == 1].reset_index(drop=True).astype('float32')
  return X

In [5]:
@tf.function
def train_critic(real_sample):
  with tf.GradientTape() as tape:
    noise = tf.random.normal([len(real_sample), latent_dim])
    fake = gen(noise, training=True)
    fake_pred = critic(fake, training=True)
    real_pred = critic(real_sample, training=True)
    critic_loss = tf.reduce_mean(fake_pred) - tf.reduce_mean(real_pred)
    
  critic_grad = tape.gradient(critic_loss, critic.trainable_variables)
  opt_critic.apply_gradients(zip(critic_grad, critic.trainable_variables))
  for var in critic.trainable_variables:
    var.assign(tf.clip_by_value(var, clip_min, clip_max))
  
  return critic_loss

In [6]:
@tf.function
def train_generator(batch_size):
  with tf.GradientTape() as tape:
    noise = tf.random.normal([batch_size, latent_dim])
    fake = gen(noise, training=True)
    fake_pred = critic(fake, training=True)
    # blackbox_pred = tf.numpy_function(blackbox.predict, [fake], Tout=tf.int64)
    blackbox_pred = blackbox(fake, training=False)
    # blackbox_pred = tf.one_hot(blackbox_pred, depth=5)
    target = tf.zeros(batch_size)
    blackbox_loss = ids_loss(target, blackbox_pred)
    generator_loss = tf.reduce_mean(-fake_pred + lambada * blackbox_loss)
    
  gen_grad = tape.gradient(generator_loss, gen.trainable_variables)
  opt_gen.apply_gradients(zip(gen_grad, gen.trainable_variables))
  return blackbox_loss, generator_loss

In [7]:
tf.keras.utils.disable_interactive_logging()
dataset = load_dataset('dataset/train_content_feature.csv')
latent_dim = 13
critic = Critic(13)
gen = Generator(latent_dim, 13)
# blackbox = Blackbox('models/ExtraTrees.pickle')
blackbox = tf.keras.models.load_model('models/DNN.h5')

opt_critic = tf.keras.optimizers.RMSprop(learning_rate=1e-4)
opt_gen = tf.keras.optimizers.RMSprop(learning_rate=1e-4)

n_epochs = 5
n_batch = 64
n_critics = 5
lambada = 0.5
clip_min = -0.01
clip_max = 0.01

In [8]:
ids_loss = tf.keras.losses.SparseCategoricalCrossentropy()

train_data = tf.data.Dataset.from_tensor_slices(
    tf.convert_to_tensor(dataset.values))
train_data = train_data.shuffle(buffer_size=1024).batch(n_batch)

for epoch in range(n_epochs):
  for step, X_real in enumerate(train_data):
    for _ in range(n_critics):
      critic_loss = train_critic(X_real)
    blackbox_loss, gen_loss = train_generator(len(X_real))

    if step % 100 == 0:
      print('Step: {}, Epoch: {}, Loss Critic: {}, Loss Gen: {}, Loss Blackbox: {}'.format(
          step, epoch, critic_loss, gen_loss, blackbox_loss))


Step: 0, Epoch: 0, Loss Critic: -4.549065124592744e-09, Loss Gen: 1.574833869934082, Loss Blackbox: 3.149670124053955
Step: 100, Epoch: 0, Loss Critic: -6.499249138869345e-09, Loss Gen: 0.0006462509045377374, Loss Blackbox: 0.0012947538634762168
Step: 200, Epoch: 0, Loss Critic: -5.814968062622938e-09, Loss Gen: 3.3515367249492556e-05, Loss Blackbox: 6.92621833877638e-05
Step: 300, Epoch: 0, Loss Critic: -2.8978774935239926e-09, Loss Gen: 1.509459616499953e-05, Loss Blackbox: 3.233642200939357e-05
Step: 400, Epoch: 0, Loss Critic: 1.3642420526593924e-12, Loss Gen: 7.270203241205309e-06, Loss Blackbox: 1.6996318663586862e-05
Step: 500, Epoch: 0, Loss Critic: 1.1687006917782128e-10, Loss Gen: 5.210602466831915e-06, Loss Blackbox: 1.2803660865756683e-05
Step: 600, Epoch: 0, Loss Critic: -5.093511390441563e-09, Loss Gen: 4.462220204004552e-06, Loss Blackbox: 1.1136593457194977e-05
Step: 700, Epoch: 0, Loss Critic: -5.217657417233568e-09, Loss Gen: 3.4122454053431284e-06, Loss Blackbox: 9.0

In [85]:
def detection_accuracy(num_sample, num_sample_detected):
  return num_sample_detected / num_sample

def attack_success_rate(detection_rate_org, detection_rate_adv):
  return detection_rate_org - detection_rate_adv

def evade_increase_rate(detection_rate_org, detection_rate_adv):
  return 1 - detection_rate_adv / detection_rate_org


In [98]:
num_sample = len(dataset)

org_pred = blackbox.predict(dataset)
org_pred = org_pred.round().argmax(axis=1)
num_sample_detected_org = 0
for s in org_pred:
  if s == 1:
    num_sample_detected_org += 1

noise = tf.random.normal([num_sample, latent_dim])
adv_sample = gen(noise, training=False)
adv_pred = blackbox.predict(adv_sample)
adv_pred = adv_pred.round().argmax(axis=1)
num_sample_detected_adv = 0
for s in adv_pred:
  if s == 1:
    num_sample_detected_adv += 1


detection_rate_org = detection_accuracy(num_sample, num_sample_detected_org)
detection_rate_adv = detection_accuracy(num_sample, num_sample_detected_adv)

In [99]:
detection_rate_adv

0.0