# MNIST Experiments:
This file is a "proof of concept" to my system. We show that the experiments yields the same results as in the paper.

The experiments are the same as in GTSRB, In short are:

1. Apply PGD and FGSM attacks on a convolutional neural network.

2. Comparing robustness of adversarial training using FGSM versus using PGD.

3. Applying experiments 1+2 on a spatial convolutional neural network.

4. Testing the relation of capacity and adversarial robustness.

Almost all of the code is the same. I didn't make a general experiment for readability of GTSRB which is
more important component of the project.


In [None]:
# import libraries:

import time
import torch
import attacks
import configs
from torchvision.datasets import MNIST
import helper
import models
import trainer
import os
import shutil
import logger
import torchvision


Initializations
    - initialize a logger
    - set random seed
    - set device
    - export experiment configs
    - validate paths
    - load datasets
    - create hyperparameters generators
    
*Note - jupyter doesn't clean fds as normal python execution does. If there is the error of "logger is used by another process" then terminate and start again the jupyter kernel from terminal (i.e. conrol+c) 

In [None]:
experiment_configs = configs.MNIST_experiments_configs
experiment_hps_sets = configs.MNIST_experiments_hps
experiment_results_folder = os.path.join(configs.results_folder, "MNIST")
experiment_checkpoints_folder = os.path.join(configs.checkpoints_folder, "MNIST")
plots_folder = os.path.join(experiment_results_folder, "plots")
logger_path = os.path.join(experiment_results_folder, "log.txt")

# paths existence validation and initialization
if not os.path.exists(configs.results_folder):
    os.mkdir(configs.results_folder)
if not os.path.exists(experiment_results_folder):
    os.mkdir(experiment_results_folder)
if not os.path.exists(configs.checkpoints_folder):
    os.mkdir(experiment_results_folder)
if not os.path.exists(experiment_checkpoints_folder):
    os.mkdir(experiment_checkpoints_folder)
if os.path.exists(plots_folder):
    shutil.rmtree(plots_folder)
    time.sleep(.0001)
os.mkdir(plots_folder)
if os.path.exists(logger_path):
    os.remove(logger_path)

# set logger
logger.init_log(logger_path)
logger.log_print("checkpoints folder: {}".format(experiment_checkpoints_folder))
logger.log_print("save checkpoints: {}".format(configs.save_checkpoints))
logger.log_print("load checkpoints: {}".format(configs.load_checkpoints))
logger.log_print("results folder: {}".format(experiment_results_folder))
logger.log_print("show results:  {}".format(configs.show_attacks_plots))
logger.log_print("save results:  {}".format(configs.save_attacks_plots))

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.log_print("execution device: {}".format(device))

# seed
if configs.seed is not None:
    torch.manual_seed(configs.seed)
    logger.log_print("seed: {}".format(configs.seed))

# get datasets
path_to_save_data = os.path.join(".", "datasets", "mnist_data")
_training_dataset = MNIST(path_to_save_data, train=True, download=True,
                          transform=torchvision.transforms.Compose([torchvision.transforms.ToTensor()]))
_testing_dataset = MNIST(path_to_save_data, train=False, download=True,
                         transform=torchvision.transforms.Compose([torchvision.transforms.ToTensor()]))
# create hyperparameters generators
net_training_hps_gen = helper.GridSearch(experiment_hps_sets["nets_training"])
fgsm_attack_hps_gen = helper.GridSearch(experiment_hps_sets["FGSM"])
pgd_attack_hps_gen = helper.GridSearch(experiment_hps_sets["PGD"])

# loss and general training componenets:
_loss_fn = experiment_configs["loss_function"]
training_stop_criteria = experiment_configs["training_stopping_criteria"]
adv_training_stop_criteria = experiment_configs["adversarial_training_stopping_criteria"]
epochs = trainer.Epochs(training_stop_criteria)  # epochs obj for not adversarial training
adv_epochs = trainer.Epochs(adv_training_stop_criteria)  # epochs obj for adversarial training

    


## Experiment 1: PGD and FGSM attacks in practice.

In this experiment we will attack a network using PGD and FGSM attacks.
The experiment illustrates that PGD and FGSM attacks works on MNIST dataset.
We train and attack a Spatial Transformer Network which is invariant to geometrical
transformations (rotations and scaling).


In [None]:

def experiment_1_func(net, _loss_fn, _training_dataset, _testing_dataset, epochs, net_name="", train_attack=None,
                      load_checkpoint=False, save_checkpoint=False, show_plots=False, save_plots=False):
    """
    This experiment applies training or adversarial training on the given network. Then it prints the resistant
    measurements on both PGD and FGSM attacks.
    :param net: the given network. we initialize net parameters.
    :param _loss_fn: loss function.
    :param _training_dataset: the dataset to train on.
    :param _testing_dataset: a separate dataset to measure resistance and accuracy on.
    :param epochs: Epochs object that manages the training procedure. see Epochs class in trainer.py for more details.
    :param net_name: the network name is used in plotting titles and checkpoints files names.
    :param train_attack: in case we want to apply an adversarial training instead of normal training.
    :param load_checkpoint: use pre-trained model. To use that verify the existence of one in checkpoints folder.
    :param save_checkpoint: save the trained model.
    :return: the resistance results + found hps in the hyperparameter searches + trained network.
    """
    if load_checkpoint:
        checkpoint_path = os.path.join(experiment_checkpoints_folder, "{}.pt".format(net_name))
        logger.log_print("load network from {}".format(checkpoint_path))
        checkpoint = torch.load(checkpoint_path, map_location=device)
        net.load_state_dict(checkpoint["trained_net"])
        net_hp = checkpoint["net_hp"]
        fgsm_hp = checkpoint["fgsm_hp"]
        pgd_hp = checkpoint["pgd_hp"]
        resistance_results = checkpoint["resistance_results"]

    else:
        # apply hyperparameters-search to get a trained network
        net_state_dict, net_hp = helper.full_train_of_nn_with_hps(net, _loss_fn, _training_dataset,
                                                                  net_training_hps_gen, epochs, device=device,
                                                                  train_attack=train_attack)
        net.load_state_dict(net_state_dict)
        net.eval()  # from now on we only evaluate net.

        logger.log_print("training selected hyperparams: {}".format(str(net_hp)))

        # attack selected net using FGSM:
        fgsm_hp, fgsm_score = helper.full_attack_of_trained_nn_with_hps(net, _loss_fn, _training_dataset,
                                                                        fgsm_attack_hps_gen, net_hp, attacks.FGSM,
                                                                        device=device, plot_results=False,
                                                                        save_figs=False, figs_path=plots_folder)
        logger.log_print("FGSM attack selected hyperparams: {}".format(str(fgsm_hp)))

        # attack selected net using PGD:
        pgd_hp, pgd_score = helper.full_attack_of_trained_nn_with_hps(net, _loss_fn, _training_dataset,
                                                                      pgd_attack_hps_gen, net_hp, attacks.PGD,
                                                                      device=device, plot_results=False,
                                                                      save_figs=False,
                                                                      figs_path=plots_folder)
        logger.log_print("PGD attack selected hyperparams: {}".format(str(pgd_hp)))

        # measure attacks on test (holdout)
        resistance_results = helper.measure_resistance_on_test(net, _loss_fn, _testing_dataset,
                                                               to_attacks=[(attacks.FGSM, fgsm_hp),
                                                                           (attacks.PGD, pgd_hp)],
                                                               device=device,
                                                               plot_results=show_plots,
                                                               save_figs=save_plots,
                                                               figs_path=plots_folder,
                                                               plots_title=net_name)

    # unpack resistance_results
    test_acc = resistance_results["test_acc"]  # the accuracy without applying any attack
    fgsm_res = resistance_results["%fgsm"]
    pgd_res = resistance_results["%pgd"]

    # print scores:
    logger.log_print("TEST SCORES of {}:".format(net_name))
    logger.log_print("accuracy on test:            {}".format(test_acc))
    logger.log_print("%FGSM successful attacks:    {}".format(fgsm_res))
    logger.log_print("%PGD successful attacks:     {}".format(pgd_res))

    # save checkpoint
    res_dict = {
        "trained_net": net,
        "net_hp": net_hp,
        "fgsm_hp": fgsm_hp,
        "pgd_hp": pgd_hp,
        "resistance_results": resistance_results
    }

    if save_checkpoint and not load_checkpoint:
        to_save_res_dict = res_dict
        to_save_res_dict["trained_net"] = net.state_dict()
        checkpoint_path = os.path.join(experiment_checkpoints_folder, "{}.pt".format(net_name))
        logger.log_print("save network to {}".format(checkpoint_path))
        torch.save(to_save_res_dict, checkpoint_path)

    return res_dict




def run_experiment1(net_arch, _loss_fn, _training_dataset, _testing_dataset, epochs):
    net_name = net_arch.name

    logger.new_section() # some new lines
    logger.log_print("Experiment 1 on {}".format(net_name))
    # print network summary to log
    original_net = net_arch().to(device)
    logger.log_print("Network architecture")
    logger.log_print(str(original_net))
    # apply experimet 1
    exp1_res_dict = experiment_1_func(original_net, _loss_fn, _training_dataset, _testing_dataset, epochs, 
                                      net_name=net_name,
                                      save_checkpoint=configs.save_checkpoints,
                                      load_checkpoint=configs.load_checkpoints,
                                      show_plots=configs.show_attacks_plots,
                                      save_plots=configs.save_attacks_plots)
    return exp1_res_dict


net_arch = models.MNISTNet
exp1_res_dict = run_experiment1(net_arch, _loss_fn, _training_dataset, _testing_dataset, epochs)


## Experiment 2: Comparing defensing using FGSM versus using PGD

In this experiment we will use adversarial training (the paper procedure) in order to make the network from experiment 1 resistant to FGSM and PGD attacks separately. Denote the network that trained in adversarial training using FGSM as Net_1 and the network trained with PGD as Net_2. Then we test Net_1,Net_2 robustness as we did in experiment 1 and show the following:

1. Net_1 is resistant to FGSM attack but not to PGD attack. What means resistant against FGSM doesn't yields resistance against PGD.

2. Net_2 is resistance to both FGSM and PGD attacks. This is a motivation to experiment 3 that shows PGD is a universal attack. (i.e. that resistance against PGD yields resistance to any other first order attack).


In [None]:
def experiment_2_func(exp1_res_dict, net_arch, _loss_fn, _training_dataset, _testing_dataset, adversarial_epochs,
                      net_name="", load_checkpoint=False, save_checkpoint=False, show_plots=False, save_plots=False):
    # fgsm_hp = exp1_res_dict["fgsm_hp"]
    # pgd_hp = exp1_res_dict["pgd_hp"]
    fgsm_hp = {"epsilon": 0.08}
    pgd_hp = {"epsilon": 0.15}

    # adversarial_epochs.restart()
    # fgsm_robust_net = net_arch().to(device)
    # fgsm_attack = attacks.FGSM(fgsm_robust_net, _loss_fn, fgsm_hp)
    # experiment_1_func(fgsm_robust_net, _loss_fn, _training_dataset, _testing_dataset, adversarial_epochs,
    #                   net_name="{} with FGSM adversarial training".format(net_name), train_attack=fgsm_attack,
    #                   load_checkpoint=load_checkpoint, save_checkpoint=save_checkpoint, show_plots=show_plots,
    #                   save_plots=save_plots)

    adversarial_epochs.restart()
    pgd_robust_net = net_arch().to(device)
    pgd_attack = attacks.PGD(pgd_robust_net, _loss_fn, pgd_hp)
    experiment_1_func(pgd_robust_net, _loss_fn, _training_dataset, _testing_dataset, adversarial_epochs,
                      net_name="{} with PGD adversarial training".format(net_name), train_attack=pgd_attack,
                      load_checkpoint=load_checkpoint, save_checkpoint=save_checkpoint, show_plots=show_plots,
                      save_plots=save_plots)
    return pgd_robust_net

def run_experiment2(exp1_res_dict, net_arch, _loss_fn, _training_dataset, _testing_dataset, adv_epochs):
    net_name = net_arch.name

    logger.new_section() # some new lines
    logger.log_print("Experiment 2 on {}".format(net_name))
    experiment_2_func(exp1_res_dict, net_arch, _loss_fn, _training_dataset, _testing_dataset, 
                  adv_epochs, net_name=net_name,
                  save_checkpoint=configs.save_checkpoints,
                  load_checkpoint=configs.load_checkpoints,
                  show_plots=configs.show_attacks_plots,
                  save_plots=configs.save_attacks_plots)

pgd_robust_net = run_experiment2(exp1_res_dict, net_arch, _loss_fn, _training_dataset, _testing_dataset, adv_epochs)

## Experiment 3:
Applying 1+2 on STN instead CNN.


In [None]:
net_arch = models.STN_MNISTNet
params = [net_arch, _loss_fn, _training_dataset, _testing_dataset]
run_experiment2(run_experiment1(*params, epochs), *params, adv_epochs)

## Experiment 4: Capacity and Adversarial Robustness
In this experiment we will examine the following statements:

1. Capacity alone helps: High capacity models are more robust to adversarial attacks then low capacity models.
2. Weak models may fail to learn non-trivial classifiers: We show that we can't build (using the paper method) a robust model
   when the model is with low capacity. Specifically we will show that after training with the paper method to build a robust
   model we get an extremely underfitted model.
3. The value of the saddle point problem decreases as we increase the capacity
4. More capacity and stronger adversaries decrease transferability: we take transferred adversarial inputs and show that their
   gradients correlation with the source becomes less significant. More details on that experiment can be found in the project file.



Technical Details:
To create the increased capacity networks we use create_conv_nn from models.py. A description on the parameters is in
ConvNN class. In short, channels_lst specifies the number of channels at each layer and extras_blocks_components
specifies the components on each block of the network (e.g. dropout, maxpool). #FC_Layers is the number of layers that
follows the convolutional layers. There are limitations on some of the parameters - 2 <= len(channels_lst) <= 5
and #FC_Layers > 0. CNN_out_channels for applying 1x1 conv layer with CNN_out_channels output channels - None for
ignoring this feature. in_wh is width and height of the pictures. out_size is the number of classes. For more details
see ConvNN class description.

In [None]:
inc_capacity_nets = []
base_net_params = {
    "extras_blocks_components": [],  # ["dropout"],
    # "p_dropout": 0.1,
    "activation": torch.nn.LeakyReLU,
    "out_size": 10,
    "in_wh": 28,
    "CNN_out_channels": None  # apply 1x1 conv layer to achieve that - to control mem. None to not use.
}
for i in range(1, 9):
    base_net_params["channels_lst"] = [3, 10 * i, 20 * i]
    base_net_params["#FC_Layers"] = 1 + i // 2
    base_net_params["CNN_out_channels"] = i * 5
    cap_net = models.create_conv_nn(base_net_params)
    inc_capacity_nets.append(cap_net)
for i, net in enumerate(inc_capacity_nets):
    net = net.to(device)
    epochs.restart()
    experiment_1_func(net, _loss_fn, _training_dataset, _testing_dataset, epochs,
                        net_name="capacity_{}".format(i))

# visualize and stufff........

## Experiment 5: Universality of PGD attack
In this experiment we will attack Net_2 from experiment 2 using a various of adversarial attacks with different parameters. We want to show that Net_2 is resistance to all of them. This is an evidence that we can practically use a specific PGD (say with 40 steps and specified epsilon) to defend through stronger attacks.

Net_2 is trained with adversarial training with PGD that chosen with #steps=40. We test how good Net_2 defends against increasing #steps and \epsilon (allowing more options). We examine only CNN networks (performed better than STN).

In [None]:
attacks_lst = [
    (attacks.FGSM, {"epsilon": 0.001}),
    (attacks.FGSM, {"epsilon": 0.005}),
    (attacks.PGD, {"epsilon": 0.4, "steps": 60, "alpha": 0.001}),
    (attacks.PGD, {"epsilon": 0.2, "steps": 20, "alpha": 0.005}),
    (attacks.PGD, {"epsilon": 0.3, "steps": 40, "alpha": 0.005}),
    (attacks.PGD, {"epsilon": 0.2, "steps": 40, "alpha": 0.01}),
    (attacks.MomentumFGSM, {"epsilon": 0.2, "steps": 40, "alpha": 0.01, "momentum":0.9}),
    (attacks.MomentumFGSM, {"epsilon": 0.4, "steps": 30, "alpha": 0.001, "momentum":0.95}),
]
pgd_resistance_results = helper.measure_resistance_on_test(pgd_robust_net, _loss_fn, _testing_dataset, attacks_lst,
                                                           plots_title="robust net built using PGD")
