In [1]:
import tensorflow as tf

def linf_pgd_attack(model, loss_fn, images, labels, epsilon, eps_iter, nb_iter, clip_min=0.0, clip_max=1.0):
    # Create a variable to hold the perturbed image
    perturbed_images = tf.Variable(images)

    for _ in range(nb_iter):
        with tf.GradientTape() as tape:
            tape.watch(perturbed_images)
            outputs = model(perturbed_images)
            loss = loss_fn(labels, outputs)

        # Compute gradients of the loss with respect to the perturbed image
        gradients = tape.gradient(loss, perturbed_images)

        # Update the perturbed image by a small step in the direction of the gradient
        perturbed_images = tf.add(perturbed_images, eps_iter * tf.sign(gradients))

        # Clip the perturbed image to ensure it stays within the L-infinity norm ball
        perturbed_images = tf.clip_by_value(perturbed_images, clip_min, clip_max)

        # Project the perturbed image back to the epsilon neighborhood of the original image
        perturbed_images = tf.clip_by_value(images + tf.clip_by_value(perturbed_images - images, -epsilon, epsilon), clip_min, clip_max)

    return perturbed_images


In [None]:

def l2_pgd_attack(model, loss_fn, images, labels, epsilon, eps_iter, nb_iter, clip_min=0.0, clip_max=1.0):
    # Create a variable to hold the perturbed image
    perturbed_images = tf.Variable(images)

    for _ in range(nb_iter):
        with tf.GradientTape() as tape:
            tape.watch(perturbed_images)
            outputs = model(perturbed_images)
            loss = loss_fn(labels, outputs)

        # Compute gradients of the loss with respect to the perturbed image
        gradients = tape.gradient(loss, perturbed_images)

        # Normalize the gradients
        gradients /= tf.sqrt(tf.reduce_mean(tf.square(gradients))) + 1e-8

        # Update the perturbed image by a small step in the direction of the gradient
        perturbed_images = tf.add(perturbed_images, eps_iter * gradients)

        # Clip the perturbed image to ensure it stays within the L2 norm ball
        norms = tf.norm(tf.reshape(perturbed_images - images, [len(images), -1]), axis=1)
        factor = tf.minimum(1.0, epsilon / (norms + 1e-10))
        perturbed_images = tf.clip_by_value(images + factor[:, tf.newaxis, tf.newaxis] * (perturbed_images - images), clip_min, clip_max)

    return perturbed_images


In [None]:
def test_pgd(model, test_loader, epsilon, attack_type):
    # Accuracy counter
    correct = 0
    incorrect = 0
    adv_examples = []
    model = model.evaluate()

    if attack_type == 'linf':
        adversary = linf_pgd_attack(model, loss_fn=MeanSquaredError(reduction=tf.keras.losses.Reduction.SUM),
                                  nb_iter=40, eps_iter=epsilon/20, rand_init=True, eps=epsilon,
                                  clip_min=0.0, clip_max=1.0, targeted=False)
    else:
        adversary = l2_pgd_attack(model, loss_fn=MeanSquaredError(reduction=tf.keras.losses.Reduction.SUM),
                                nb_iter=40, eps_iter=epsilon/20, rand_init=True, eps=epsilon,
                                clip_min=0.0, clip_max=1.0, targeted=False)

    # Loop over all examples in the test set
    for data, target in test_loader:
        data = data / 255.0
        target = tf.cast(target, tf.float32)

        # Set requires_grad attribute of tensor. Important for Attack
        data = tf.Variable(data, trainable=True)

        # Forward pass the data through the model
        output = model(data)

        # If the initial prediction is wrong, don't bother attacking, just move on
        if tf.math.sign(output) != tf.math.sign(target):
            incorrect += 1
            continue

        # Re-classify the perturbed image
        perturbed_data = adversary.perturb(data, target)
        new_output = model(perturbed_data)

        # Check for success
        if tf.math.sign(new_output) == tf.math.sign(target):
            correct += 1
        else:
            incorrect += 1

    # Calculate final accuracy for this epsilon
    final_acc = correct / float(correct + incorrect)
    logger.info("Attack Type: {}, Epsilon: {}\tTest Accuracy = {} / {} = {:.2f}".format(
        attack_type, epsilon, correct, correct + incorrect, 100. * final_acc))

    # Return the accuracy and an adversarial example
    return final_acc, adv_examples



In [None]:
import os
import logging
import argparse
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.losses import MeanSquaredError
from utils import get_loaders_mnist
from models.basic_models import CNN, NeuralNet

logger = logging.getLogger(__name__)

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-type', choices=['cnn', 'fc'])
    parser.add_argument('--classes', type=str)
    parser.add_argument('--epochs', default=50, type=int)
    parser.add_argument('--batch-size', default=100, type=int)
    parser.add_argument('--hidden-size', default=5000, type=int)
    parser.add_argument('--hidden-layers', default=1, type=int)
    parser.add_argument('--kernel-size', default=7, type=int)
    parser.add_argument('--hidden-channels', default=1024, type=int)
    parser.add_argument('--learning-rate', default=0.001, type=float)
    parser.add_argument('--batches-use', default='all')
    parser.add_argument('--out-dir', type=str)
    parser.add_argument('--epsilons', type=str)
    parser.add_argument('--attack-type', choices=['l2', 'linf'])
    parser.add_argument('--eval-only', action='store_true')
    return parser.parse_args()

def train(model, train_loader, criterion, optimizer, num_epochs, batches_use):
    # Train the model
    total_step = len(train_loader)
    for epoch in range(num_epochs):
        correct = 0
        total = 0
        for i, (images, labels) in enumerate(train_loader):
            images = images / 255.0
            labels = tf.cast(labels, tf.float32)

            # Forward pass
            with tf.GradientTape() as tape:
                outputs = model(images, training=True)
                loss = criterion(labels, outputs)

            total += labels.shape[0]
            correct += tf.reduce_sum(tf.cast(tf.math.equal(tf.math.sign(outputs), tf.math.sign(labels)), tf.int32))

            # Backpropagation and optimization
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            if (i + 1) % 25 == 0 or (i + 1) % batches_use == 0:
                accuracy = 100 * correct / total
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Acc: {:.2f}'.format(
                    epoch + 1, num_epochs, i + 1, total_step, loss.numpy(), accuracy))
            if (i + 1) == batches_use:
                break
    logger.info("Total {} images used for training.".format(total))
    return model

def test(model, test_loader):
    # Test the model
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images / 255.0
        labels = tf.cast(labels, tf.float32)
        outputs = model(images, training=False)
        correct += tf.reduce_sum(tf.cast(tf.math.equal(tf.math.sign(outputs), tf.math.sign(labels)), tf.int32))
        total += labels.shape[0]
    accuracy = 100 * correct / total
    logger.info('Accuracy of the network on the {} test images: {} %'.format(total, accuracy))

def test_ddn(model, test_loader):
    raise NotImplementedError



In [None]:

def main(args):
    # Check Device configuration
    os.makedirs(args.out_dir, exist_ok=True)
    logfile = os.path.join(args.out_dir, 'output.log')
    logging.basicConfig(
        format='[%(asctime)s] - %(message)s',
        datefmt='%Y/%m/%d %H:%M:%S',
        level=logging.INFO,
        filename=logfile)
    if not args.eval_only:
        logger.info(args)

    # Define Hyper-parameters
    input_size = 784
    hidden_size = args.hidden_size
    hidden_layers = args.hidden_layers
    classes = [int(x) for x in args.classes.split(',')]
    hidden_channels = args.hidden_channels
    kernel_size = args.kernel_size
    num_classes = len(classes) - 1
    num_epochs = args.epochs
    batch_size = args.batch_size
    learning_rate = args.learning_rate

    train_loader, test_loader = get_loaders_mnist(classes, batch_size)
    batches_use = int(args.batches_use) if args.batches_use != 'all' else len(train_loader)

    if args.model_type == 'cnn':
        model = CNN(hidden_channels=hidden_channels, kernel_size=kernel_size, num_classes=num_classes)
    else:
        model = NeuralNet(input_size, hidden_layers, hidden_size, num_classes)

    criterion = MeanSquaredError()
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    if not args.eval_only:
        # Train the model
        model = train(model, train_loader, criterion, optimizer, num_epochs, batches_use)
        # Save the model
        model_path = os.path.join(args.out_dir, 'model.pt')
        tf.keras.models.save_model(model, model_path)
        logger.info("Saved trained model to {}".format(model_path))
    else:
        # Load pre-trained model
        model_path = os.path.join(args.out_dir, 'model.pt')
        model = tf.keras.models.load_model(model_path)
        logger.info("Loaded pre-trained model from {}".format(model_path))

    # Evaluate the model on test set
    test(model, test_loader)

    if args.attack_type:
        epsilons = [float(eps) for eps in args.epsilons.split(',')]
        for epsilon in epsilons:
            # Evaluate the model on test set with PGD attack
            test_pgd(model, test_loader, epsilon, args.attack_type)


if __name__ == '__main__':
    # Parse command-line arguments
    args = parse_args()
    # Start the main function
    main(args)
