In [1]:
from adversarial.attacks import random_attack
from adversarial.utils import dataloader
from adversarial.defences import EnsembleModel
from adversarial import models

In [7]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import optimizers, metrics, losses

In [9]:
BATCH_SIZE = 16
EPSILONS = np.arange(0.01, 0.5, step=0.02)
ETA = 1e-6
EPOCHS = 30

In [3]:
train_dloader = dataloader.DataLoader("data", BATCH_SIZE, training=True)
test_dloader = dataloader.DataLoader("data", BATCH_SIZE, training=False)

In [None]:
target_model = models.TargetModel()
target_model.load_custom_weights_for_mobilenet("model/model_20200507_9_1.00_0.0088")

In [None]:
ensemble_model = EnsembleModel([
    models.MobileNetV2(),
    models.MobileNet(),
    models.VGG16(),
    models.VGG19(),
    models.ResNet50(),
    models.ResNet50V2()
])

In [10]:
criterion = losses.SparseCategoricalCrossentropy()

train_losses = metrics.SparseCategoricalCrossentropy()
train_metrics = metrics.SparseCategoricalAccuracy()

test_losses = metrics.SparseCategoricalCrossentropy()
test_metrics = metrics.SparseCategoricalAccuracy()

optimizer = optimizers.Adam(learning_rate=ETA)

In [None]:
def train_step(images, labels, adv_labels):
    adv, _, success = random_attack(ensemble_model, images, adv_labels, EPSILONS)
    
    x_batch, y_batch = dataloader.concat_data(images, labels, adv)
    
    with tf.GradientTape() as tape:
        outputs = target_model(x_batch, training=True)
        loss = criterion(y_batch, outputs)
        
    train_losses.update_state(y_batch, outputs)
    train_metrics.update_state(y_batch, outputs)
        
    grads = tape.gradient(loss, target_model.trainable_variables)
    optimizer.apply_gradients(zip(grads, target_model.trainable_variables))

In [None]:
def test_step(images, labels, adv_labels):
    adv, _, success = random_attack(ensemble_model, images, adv_labels, EPSILONS)
    
    x_batch, y_batch = dataloader.concat_data(images, labels, adv)

    outputs = target_model(x_batch, training=False)

    test_losses.update_state(y_batch, outputs)
    test_metrics.update_state(y_batch, outputs)

In [None]:
def train():

    train_loss_list = []
    test_loss_list = []

    for e in range(EPOCHS):
        for x_batch, y_batch, adv_batch in iter(train_dloader):
            train_step(x_batch, y_batch, adv_batch)

        for x_batch, y_batch, adv_batch in iter(test_dloader):
            test_step(x_batch, y_batch, adv_batch)

        train_loss = train_losses.result()
        train_acc = train_metrics.result()

        test_loss = test_losses.result()
        test_acc = test_losses.result()

        train_loss_list.append(train_loss)
        test_loss_list.append(test_loss)

        train_loss.reset_states()
        train_acc.reset_states()
        test_loss.reset_states()
        test_acc.reset_states()

        print(f"Epochs {e+1}/{EPOCHS}, train loss: {train_loss:.8f}, train acc: {train_acc:.4f}, test loss: {test_loss:.8f}, test acc: {test_acc:.4f}")


