## Task 4: Generate adversarial examples

In [1]:
import pandas as pd
import torch
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split
import requests
from io import StringIO

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X[categorical_cols + numerical_cols] = scaler.fit_transform(X[categorical_cols + numerical_cols])


In [3]:
import wandb
import os
os.environ["WANDB_NOTEBOOK_NAME"] = "model_hyperparameter_search.ipynb"
wandb.login()

from torch import optim, nn
from tqdm import tqdm

[34m[1mwandb[0m: Currently logged in as: [33mzhipeng-he[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [5]:
from models import run_pytorch
from data import get_datasets

from models.pytorch.mlp import MLP
from models.pytorch.tab_transformer import TabTransformer
from models.pytorch.ft_transformer import FTTransformer
from models.jax.logistic_regression import LogisticRegression

In [6]:
import foolbox
import numpy as np
from scipy.stats import chi2

In [None]:
def metrics(adv_numpy, sample_numpy, train_mean, train_cov_matrix, threshold, std_numpy):

    def _calculate_mahalanobis_distance(x, mean, cov_matrix):
        # Calculate the Mahalanobis distance
        diff = x - mean
        inv_cov_matrix = np.linalg.inv(cov_matrix)
        md = np.sqrt(np.dot(np.dot(diff, inv_cov_matrix), diff.T))
        return md[0, 0] # return a scalar
    
    eps = 1e-8

    # sparsity
    l0_distance = np.count_nonzero(adv_numpy - sample_numpy)

    # proximity
    l1_distance = np.linalg.norm(adv_numpy - sample_numpy, ord=1)
    l2_distance = np.linalg.norm(adv_numpy - sample_numpy, ord=2)
    linf_distance = np.linalg.norm(adv_numpy - sample_numpy, ord=np.inf)

    # deviation
    md = _calculate_mahalanobis_distance(adv_numpy, train_mean, train_cov_matrix)
    # Check if the Mahalanobis distance exceeds the threshold
    is_outlier = md > threshold

    # sensitivity: 
    sens = (l1_distance / ((std_numpy + eps) * l0_distance)).mean()

    return {
        "L0 Distance": l0_distance,
        "L1 Distance": l1_distance,
        "L2 Distance": l2_distance,
        "Linf Distance": linf_distance,
        "Mahalanobis Distance": md,
        "Is Outlier": is_outlier,
        "Sensitivity": sens,
    }


In [None]:
attack_list = {
    "L2CarliniWagner": foolbox.attacks.L2CarliniWagnerAttack(),
    "L2DeepFool": foolbox.attacks.L2DeepFoolAttack(),
    "LinfFGSM": foolbox.attacks.LinfFastGradientAttack(),
    "LinfPGD": foolbox.attacks.L2CarliniWagnerAttack(),
    "LinfBIM": foolbox.attacks.LinfBasicIterativeAttack(),
}

In [None]:


def attack(model, X_test_tensor, y_test_tensor, X_train_tensor):
    # Create a Foolbox model wrapper for the PyTorch model
    fmodel = foolbox.models.PyTorchModel(model, bounds=(0, 1))

    # Create an attack object (FGSM in this case)
    attack = foolbox.attacks.LinfFastGradientAttack()


    # Generate an adversarial example
    epsilons = np.arange(0.01, 0.10, 0.03)  # You can change the epsilon values

    attack_success_rates = []
    average_distances = {
        "L0 Distance": [], "L1 Distance": [], "L2 Distance": [], "Linf Distance": [], "Mahalanobis Distance": [], "Sensitivity": []
    }
    outliner_rates = []

    X_train_numpy = X_train_tensor.cpu().numpy()

    train_mean = np.mean(X_train_numpy, axis=0)
    train_cov_matrix = np.cov(X_train_numpy.T)
    std = np.std(X_train_numpy, axis=0)


    # Calculate the critical value for the Mahalanobis distance using chi-squared distribution
    alpha = 0.05
    degrees_of_freedom = len(train_mean)
    # Calculate the Chi-Square critical value at the given alpha and df
    chi_square_critical_value  = chi2.ppf(1 - alpha, df=degrees_of_freedom)
    threshold = np.sqrt(chi_square_critical_value)
    print(f"Threshold: {threshold}")

    for epsilon in epsilons:
        # Initialize variables to keep track of success and the number of samples
        total_samples = len(X_test_tensor)
        successful_attacks = 0
        total_metrics = {
            "L0 Distance": 0.0, "L1 Distance": 0.0, "L2 Distance": 0.0,
            "Linf Distance": 0.0, "Mahalanobis Distance": 0.0,
            "Is Outlier": 0, "Sensitivity": 0.0
        }

        for sample_idx in tqdm(range(total_samples)):
            sample = X_test_tensor[sample_idx].unsqueeze(0)
            label = y_test_tensor[sample_idx].unsqueeze(0)

            _, advs, success = attack(fmodel, sample, label, epsilons=[epsilon])

            with torch.no_grad():
                model.eval()
                test_outputs = model(sample)
                adv_outputs = model(advs[0])
                _, predicted_classes = test_outputs.max(dim=1)
                _, adv_predicted_classes = adv_outputs.max(dim=1)

            adv_numpy = advs[0].cpu().numpy()
            sample_numpy = sample.cpu().numpy()

            metrics_dict = metrics(adv_numpy, sample_numpy, train_mean, train_cov_matrix, threshold, std)

            if success:
                successful_attacks += 1

                for key, value in metrics_dict.items():
                    total_metrics[key] += value

            # print(f"Epsilon = {epsilon}, Predicted class: {predicted_classes} | Adversarial example: {adv_predicted_classes} | Success: {success} | L2 distance: {np.linalg.norm(advs[0].cpu().numpy() - sample.cpu().numpy())}")

        if successful_attacks > 0:
            success_rate = successful_attacks / total_samples
            outliner_rate = total_metrics["Is Outlier"] / successful_attacks
            for key, value in total_metrics.items():
                if key in average_distances:
                    average_distances[key].append(value / successful_attacks)

        else:
            success_rate = 0.0
            outliner_rate = 0.0
            for key, value in total_metrics.items():
                if key in average_distances:
                    average_distances[key].append(0.0)
        
        attack_success_rates.append(success_rate)
        outliner_rates.append(outliner_rate)
        
        print(f"Epsilon = {epsilon}")
        print(f"Success Rate: {success_rate * 100}%")
        for key, value in average_distances.items():
            print(f"Average {key} for Successful Attacks: {value[-1]}")
        print(f"Outlier Rate for Successful Attacks: {outliner_rate * 100}%\n")
        print("")

    return attack_success_rates, average_distances, outliner_rates

In [None]:
def main(dataset_name, model_name):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    X_train, y_train, X_val, y_val, X_test, y_test, \
        X_train_tensor, y_train_tensor, X_test_tensor, y_test_tensor, X_val_tensor, y_val_tensor, \
        info = get_datasets.get_dataset(dataset_name, device)
    
    model, train_config = model_config(model_name, X_train.shape[1], 2, info["categories_list"], info["numerical_cols"], device)

    train_config["dataset"] = dataset_name

    criterion = nn.CrossEntropyLoss()
    optimizer = run_pytorch.build_optimizer(model, "adam", train_config["learning_rate"])
    
    run_pytorch.train(model, (X_train_tensor, y_train_tensor), (X_val_tensor, y_val_tensor), criterion, optimizer, train_config)

    # and test its final performance
    run_pytorch.test(model, (X_test_tensor, y_test_tensor), train_config, stage="train", wandb_run=wandb.run)

    attack_success_rates, average_distances, outliner_rates = attack(model, X_test_tensor, y_test_tensor, X_train_tensor)

    return attack_success_rates, average_distances, outliner_rates


In [None]:
# wandb.log({"attack_success_rates": attack_success_rates, "average_l2_distances": average_l2_distances, "dataset": dataset_name, "model": model_name})