# Debugging autoreload

In [None]:
%load_ext autoreload
%autoreload 2

# Load packages

In [None]:
import sys
sys.path.append("D:/Work/dnam/")

In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
from glob import glob
import pathlib
import os
import scipy.stats
from art.estimators.classification import PyTorchClassifier
from art.attacks.evasion import FastGradientMethod, BasicIterativeMethod, MomentumIterativeMethod
from art.attacks.evasion import ZooAttack, CarliniL2Method, ElasticNet, NewtonFool
import torch

from pyod.models.ecod import ECOD
from pyod.models.copod import COPOD
from pyod.models.sos import SOS
from pyod.models.qmcd import QMCD as QMCDOD
from pyod.models.sampling import Sampling
from pyod.models.gmm import GMM
from pyod.models.pca import PCA
from pyod.models.mcd import MCD
from pyod.models.cd import CD
from pyod.models.lmdd import LMDD
from pyod.models.lof import LOF
from pyod.models.cof import COF
from pyod.models.cblof import CBLOF
from pyod.models.hbos import HBOS
from pyod.models.knn import KNN
from pyod.models.sod import SOD
from pyod.models.rod import ROD
from pyod.models.iforest import IForest
from pyod.models.inne import INNE
from pyod.models.dif import DIF
from pyod.models.feature_bagging import FeatureBagging
from pyod.models.loda import LODA
from pyod.models.lunar import LUNAR

from pythresh.thresholds.iqr import IQR
from pythresh.thresholds.mad import MAD
from pythresh.thresholds.fwfm import FWFM
from pythresh.thresholds.yj import YJ
from pythresh.thresholds.zscore import ZSCORE
from pythresh.thresholds.aucp import AUCP
from pythresh.thresholds.qmcd import QMCD
from pythresh.thresholds.fgd import FGD
from pythresh.thresholds.dsn import DSN
from pythresh.thresholds.clf import CLF
from pythresh.thresholds.filter import FILTER
from pythresh.thresholds.wind import WIND
from pythresh.thresholds.eb import EB
from pythresh.thresholds.regr import REGR
from pythresh.thresholds.boot import BOOT
from pythresh.thresholds.mcst import MCST
from pythresh.thresholds.hist import HIST
from pythresh.thresholds.moll import MOLL
from pythresh.thresholds.chau import CHAU
from pythresh.thresholds.gesd import GESD
from pythresh.thresholds.mtt import MTT
from pythresh.thresholds.karch import KARCH
from pythresh.thresholds.ocsvm import OCSVM
from pythresh.thresholds.clust import CLUST
from pythresh.thresholds.decomp import DECOMP
from pythresh.thresholds.meta import META
from pythresh.thresholds.vae import VAE
from pythresh.thresholds.cpd import CPD
from pythresh.thresholds.gamgmm import GAMGMM
from pythresh.thresholds.mixmod import MIXMOD

from pytorch_tabular.config import DataConfig, OptimizerConfig, TrainerConfig
from pytorch_tabular.models.common.heads import LinearHeadConfig
from pytorch_tabular.models import CategoryEmbeddingModelConfig
from pytorch_tabular import model_sweep
import warnings
from sklearn.model_selection import train_test_split
from src.models.tabular.widedeep.tab_net import WDTabNetModel
from src.tasks.metrics import get_cls_pred_metrics, get_cls_prob_metrics


def split_stratified_into_train_val_test(df_input, stratify_colname='y',
                                         frac_train=0.6, frac_val=0.15, frac_test=0.25,
                                         random_state=None):
    '''
    Splits a Pandas dataframe into three subsets (train, val, and test)
    following fractional ratios provided by the user, where each subset is
    stratified by the values in a specific column (that is, each subset has
    the same relative frequency of the values in the column). It performs this
    splitting by running train_test_split() twice.

    Parameters
    ----------
    df_input : Pandas dataframe
        Input dataframe to be split.
    stratify_colname : str
        The name of the column that will be used for stratification. Usually
        this column would be for the label.
    frac_train : float
    frac_val   : float
    frac_test  : float
        The ratios with which the dataframe will be split into train, val, and
        test data. The values should be expressed as float fractions and should
        sum to 1.0.
    random_state : int, None, or RandomStateInstance
        Value to be passed to train_test_split().

    Returns
    -------
    df_train, df_val, df_test :
        Dataframes containing the three splits.
    '''

    if frac_train + frac_val + frac_test != 1.0:
        raise ValueError('fractions %f, %f, %f do not add up to 1.0' % \
                         (frac_train, frac_val, frac_test))

    if stratify_colname not in df_input.columns:
        raise ValueError('%s is not a column in the dataframe' % (stratify_colname))

    X = df_input # Contains all columns.
    y = df_input[[stratify_colname]] # Dataframe of just the column on which to stratify.

    # Split original dataframe into train and temp dataframes.
    df_train, df_temp, y_train, y_temp = train_test_split(X,
                                                          y,
                                                          stratify=y,
                                                          test_size=(1.0 - frac_train),
                                                          random_state=random_state)

    # Split the temp dataframe into val and test dataframes.
    relative_frac_test = frac_test / (frac_val + frac_test)
    df_val, df_test, y_val, y_test = train_test_split(df_temp,
                                                      y_temp,
                                                      stratify=y_temp,
                                                      test_size=relative_frac_test,
                                                      random_state=random_state)

    assert len(df_input) == len(df_train) + len(df_val) + len(df_test)

    return df_train, df_val, df_test


# Load data and model, define PyTorchClassifier, setup colors

In [None]:
model_type = 'widedeep_tab_net'
model_fn = 'best_fold_0000'
model_version = 'v2'

path_load = 'D:/YandexDisk/Work/pydnameth/datasets/GPL21145/GSEUNN/special/046_adversarial_robustness_toolbox/dnam'
path = "D:/YandexDisk/Work/pydnameth/datasets/GPL21145/GSEUNN"
path_save = f"{path}/special/064_tai_report_4/dnam"
pathlib.Path(f"{path_save}").mkdir(parents=True, exist_ok=True)

In [None]:
feats = pd.read_excel(f"{path_load}/feats_1000.xlsx", index_col=0).index.values

In [None]:
df = pd.read_excel(f"{path_load}/betas.xlsx", index_col=0)
ids_feat = list(range(len(feats)))

In [None]:
df_pred = pd.read_excel(f"{path_load}/models/{model_type}/{model_version}/predictions.xlsx", index_col=0)
df.loc[df.index, ['Real', 'Pred', 'Prob Control', 'Prob Parkinson']] = df_pred.loc[df.index, ['Status', 'pred', 'pred_prob_0', 'pred_prob_1']].values
df['Data'] = 'Real'
df['Eps'] = 'Origin'

col_real = 'Real'
col_pred = 'Pred'

ids_trn_val = df.index[df['Partition'] == 'trn_val'].values
ids_tst = df.index[df['Partition'] == 'tst'].values
ids_all = df.index[df['Partition'].isin(['trn_val', 'tst'])].values
ids_dict = {
    'trn_val': ids_trn_val,
    'tst': ids_tst,
    'all': ids_all,
}

model = WDTabNetModel.load_from_checkpoint(checkpoint_path=f"{path_load}/models/{model_type}/{model_version}/{model_fn}.ckpt")
model.produce_probabilities = False
model.eval()
model.freeze()

art_classifier = PyTorchClassifier(
    model=model,
    loss=model.loss_fn,
    input_shape=(len(feats),),
    nb_classes=2,
    optimizer=torch.optim.Adam(
        params=model.parameters(),
        lr=model.hparams.optimizer_lr,
        weight_decay=model.hparams.optimizer_weight_decay
    ),
    use_amp=False,
    opt_level="O1",
    loss_scale="dynamic",
    channels_first=True,
    clip_values=(0.0, 1.0),
    preprocessing_defences=None,
    postprocessing_defences=None,
    preprocessing=(0.0, 1.0),
    device_type="cpu"
)

colors_atks_eps = {
    "MomentumIterative": px.colors.qualitative.D3[0],
    "BasicIterative": px.colors.qualitative.D3[1],
    "ProjectedGradientDescent": px.colors.qualitative.D3[2],
    "FastGradient": px.colors.qualitative.D3[3],
}
colors_atks_bss = {
    "ElasticNet": px.colors.qualitative.G10[7],
    "CarliniL2Method": px.colors.qualitative.G10[8],
    "ZooAttack": px.colors.qualitative.G10[9],
}
colors_atks_eta = {
    'NewtonFool': px.colors.qualitative.T10[7],
}

df.to_excel(f"{path_save}/df_origin.xlsx", index_label='sample_id')

# Create pyod and pythresh models

In [None]:
classifiers = {
    'ECDF-Based (ECOD)': ECOD(),
    'Copula-Based (COPOD)': COPOD(),
    # 'Stochastic (SOS)': SOS(),
    # 'Quasi-Monte Carlo Discrepancy (QMCD)': QMCDOD(),
    # 'Rapid distance-based via Sampling': Sampling(),
    # 'Probabilistic Mixture Modeling (GMM)': GMM(),
    'Principal Component Analysis (PCA)': PCA(),
    'Local Outlier Factor (LOF)': LOF(),
    'Connectivity-Based Outlier Factor (COF)': COF(),
    'Clustering-Based Local Outlier Factor (CBLOF)': CBLOF(),
    'Histogram-based Outlier Score (HBOS)': HBOS(),
    'k Nearest Neighbors (kNN)': KNN(),
    'Subspace Outlier Detection (SOD)': SOD(),
    'Isolation Forest': IForest(),
    'Isolation-Based with Nearest-Neighbor Ensembles (INNE)': INNE(),
    'Deep Isolation Forest for Anomaly Detection (DIF)': DIF(),
    'Feature Bagging': FeatureBagging(),
    'Lightweight On-line Detector of Anomalies (LODA)': LODA(),
    # 'LUNAR': LUNAR()
}

thresholders = {
        'Inter-Quartile Region (IQR)':IQR(),
        'Median Absolute Deviation (MAD)':MAD(),
        'Full Width at Full Minimum (FWFM)':FWFM(),
        'Yeo-Johnson Transformation (YJ)': YJ(),
        'Z Score (ZSCORE)': ZSCORE(),
        'AUC Percentage (AUCP)': AUCP(),
        'Quasi-Monte Carlo Discreperancy (QMCD)': QMCD(),
        'Fixed Gradient Descent (FGD)': FGD(),
        'Distance Shift from Normal (DSN)': DSN(),
        'Trained Classifier (CLF)': CLF(),
        'Filtering Based (FILTER)': FILTER(),
        # 'Topological Winding Number (WIND)': WIND(),
        'Elliptical Boundary (EB)': EB(),
        'Regression Intercept (REGR)': REGR(),
        # 'Bootstrap Method (BOOT)': BOOT(),
        # 'Monte Carlo Statistical Tests (MCST)': MCST(),
        'Histogram Based Methods (HIST)': HIST(),
        # 'Mollifier (MOLL)': MOLL(),
        "Chauvenet's Criterion (CHAU)": CHAU(),
        'Generalized Extreme Studentized Deviate (GESD)': GESD(),
        'Modified Thompson Tau Test (MTT)': MTT(),
        'Karcher Mean (KARCH)': KARCH(),
        'One-Class SVM (OCSVM)': OCSVM(),
        'Clustering (CLUST)': CLUST(),
        'Decomposition (DECOMP)': DECOMP(),
        'Meta-model (META)': META(),
        'Variational Autoencoder (VAE)': VAE(),
        'Change Point Detection (CPD)': CPD(),
        'Bayesian Gamma GMM (GAMGMM)': GAMGMM(skip=True),
        'Mixture Models (MIXMOD)': MIXMOD(),
}

# Outliers for original data

In [None]:
with warnings.catch_warnings():
    warnings.simplefilter(action='ignore', category=FutureWarning)
    warnings.simplefilter(action='ignore', category=UserWarning)
    warnings.simplefilter(action='ignore', category=RuntimeWarning)
    warnings.simplefilter(action='ignore', category=DeprecationWarning)

    df_outs = pd.DataFrame(index=list(classifiers.keys()), columns=list(thresholders.keys()))
    for pyod_m_name, pyod_m in classifiers.items():
        print(pyod_m_name)
        scores = pyod_m.fit(df.loc[ids_tst, feats].values).decision_scores_
        for pythresh_m_name, pythresh_m in thresholders.items():
            labels = pythresh_m.eval(scores)
            df_outs.at[pyod_m_name, pythresh_m_name] = sum(labels) / len(labels) * 100
    df_outs.to_excel(f"{path_save}/outliers.xlsx")
    
    df_fig = df_outs.astype(float)
    sns.set_theme(style='ticks', font_scale=1.0)
    fig, ax = plt.subplots(figsize=(16, 7))
    heatmap = sns.heatmap(
        df_fig,
        annot=True,
        fmt=".1f",
        cmap='hot',
        linewidth=0.1,
        linecolor='black',
        cbar_kws={
            'orientation': 'horizontal',
            'location': 'top',
            'pad': 0.025,
            'aspect': 30
        },
        annot_kws={"size": 12},
        ax=ax
    )
    ax.set_xlabel('Outliers Detection Algorithms')
    ax.set_ylabel('Thresholding Algorithms')
    heatmap_pos = heatmap.get_position()
    ax.figure.axes[-1].set_title("Outliers' percentage")
    ax.figure.axes[-1].tick_params()
    for spine in ax.figure.axes[-1].spines.values():
        spine.set_linewidth(1)
    plt.savefig(f"{path_save}/outliers.png", bbox_inches='tight', dpi=200)
    plt.savefig(f"{path_save}/outliers.pdf", bbox_inches='tight')
    plt.close(fig)

# Adversarial attacks

## Eps-depended attacks

In [None]:
epsilons = sorted(list(set.union(
    set(np.linspace(0.1, 1.0, 10)), 
    set(np.linspace(0.01, 0.1, 10)),
    set(np.linspace(0.001, 0.01, 10))
)))
df_eps = pd.DataFrame(index=epsilons)


for eps_raw in epsilons:

    eps = np.array([eps_raw * scipy.stats.iqr(df.loc[ids_tst, feat].values) for feat in feats])
    eps_step = np.array([0.2 * eps_raw * scipy.stats.iqr(df.loc[ids_tst, feat].values) for feat in feats])

    attacks = {
        'MomentumIterative': MomentumIterativeMethod(
            estimator=art_classifier,
            norm=np.inf,
            eps=eps,
            eps_step=eps_step,
            decay=0.1,
            max_iter=100,
            targeted=False,
            batch_size=512,
            verbose=True
        ),
        'BasicIterative': BasicIterativeMethod(
            estimator=art_classifier,
            eps=eps,
            eps_step=eps_step,
            max_iter=100,
            targeted=False,
            batch_size=512,
            verbose=True
        ),
        'FastGradient': FastGradientMethod(
            estimator=art_classifier,
            norm=np.inf,
            eps=eps,
            eps_step=eps_step,
            targeted=False,
            num_random_init=0,
            batch_size=512,
            minimal=False,
            summary_writer=False,
        ),
    }

    for attack_name, attack in attacks.items():
        path_curr = f"{path_save}/Evasion/{attack_name}/eps_{eps_raw:0.4f}"
        pathlib.Path(f"{path_curr}").mkdir(parents=True, exist_ok=True)

        X_adv = attack.generate(np.float32(df.loc[ids_tst, feats].values))
        
        df_adv = df.loc[ids_tst, ['Real']].copy()
        df_adv.loc[ids_tst, feats] = X_adv
        model.produce_probabilities = True
        y_pred_prob = model(torch.from_numpy(np.float32(df_adv.loc[ids_tst, feats].values))).cpu().detach().numpy()
        y_pred = np.argmax(y_pred_prob, 1)
        df_adv["Pred"] = y_pred
        df_adv["Prob Control"] = y_pred_prob[:, 0]
        df_adv["Prob Parkinson"] = y_pred_prob[:, 1]
        df_adv["Eps"] = eps_raw
        df_adv["Data"] = attack_name
        
        df_adv.to_excel(f"{path_curr}/df.xlsx", index_label='sample_id')

        metrics_pred = get_cls_pred_metrics(num_classes=2)
        metrics_prob = get_cls_prob_metrics(num_classes=2)
        df_metrics = pd.DataFrame(index=list(metrics_pred.keys()) + list(metrics_prob.keys()))
        y_real = torch.from_numpy(df_adv.loc[ids_tst, "Real"].values.astype('int32'))
        y_pred_atk = torch.from_numpy(df_adv.loc[ids_tst, "Pred"].values.astype('int32'))
        y_pred_ori = torch.from_numpy(df.loc[ids_tst, "Pred"].values.astype('int32'))
        y_prob_atk = torch.from_numpy(df_adv.loc[ids_tst, ["Prob Control", "Prob Parkinson"]].values)
        y_prob_ori = torch.from_numpy(df.loc[ids_tst, ["Prob Control", "Prob Parkinson"]].values)
        for m in metrics_pred:
            m_val = float(metrics_pred[m][0](y_pred_atk, y_real).numpy())
            metrics_pred[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = float(metrics_pred[m][0](y_pred_ori, y_real).numpy())
            df_metrics.at[m, 'Origin'] = m_val
            metrics_pred[m][0].reset()
        for m in metrics_prob:
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_atk, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_ori, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Origin'] = m_val
            
        df_metrics.to_excel(f"{path_curr}/metrics.xlsx", index_label='Metrics')
        
        if attack_name == 'MomentumIterative':
            df_eps.loc[eps_raw, "Origin_Accuracy"] = df_metrics.at['accuracy_weighted', 'Origin']
        df_eps.loc[eps_raw, f"{attack_name}_Accuracy"] = df_metrics.at['accuracy_weighted', 'Attack']
            
df_eps.to_excel(f"{path_save}/Evasion/df_eps.xlsx", index_label='eps')

In [None]:
df_eps = pd.read_excel(f"{path_save}/Evasion/df_eps.xlsx", index_col=0)

atks_trgt = ['MomentumIterative', 'BasicIterative', 'FastGradient']

df_fig = df_eps.loc[:, [f"{x}_Accuracy" for x in atks_trgt]].copy()
df_fig.rename(columns={f"{x}_Accuracy": x for x in atks_trgt}, inplace=True)
df_fig['Eps'] = df_fig.index.values
df_fig = df_fig.melt(id_vars="Eps", var_name='Method', value_name="Accuracy")
fig = plt.figure()
sns.set_theme(style='whitegrid', font_scale=1)
lines = sns.lineplot(
    data=df_fig,
    x='Eps',
    y="Accuracy",
    hue=f"Method",
    style=f"Method",
    palette=colors_atks_eps,
    hue_order=atks_trgt,
    markers=True,
    dashes=False,
)
plt.xscale('log')
lines.set_xlabel(r'$\epsilon$')
x_min = 0.0009
x_max = 1.05
basic = df_eps.at[0.01, f"Origin_Accuracy"]
lines.set_xlim(x_min, x_max)
plt.gca().plot(
    [x_min, x_max],
    [basic, basic],
    color='k',
    linestyle='dashed',
    linewidth=1
)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_eps.png", bbox_inches='tight', dpi=200)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_eps.pdf", bbox_inches='tight')
plt.close(fig)

## Binary Search Steps attacks

In [None]:
bsss = list(range(1, 11, 1))
df_bss = pd.DataFrame(index=bsss)

for bss in [10]:

    attacks = {
        # 'ElasticNet': ElasticNet(
        #     classifier=art_classifier,
        #     confidence=0.0,
        #     targeted=False,
        #     learning_rate=1e-3,
        #     binary_search_steps=bss,
        #     max_iter=20,
        #     beta=1e-3,
        #     initial_const=1e-4,
        #     batch_size=1,
        #     decision_rule="EN",
        #     verbose=True,
        # ),
        'CarliniL2Method': CarliniL2Method(
            classifier=art_classifier,
            confidence=0.0,
            targeted=False,
            learning_rate=0.001,
            binary_search_steps=bss,
            max_iter=20,
            initial_const=1e-4,
            max_halving=5,
            max_doubling=5,
            batch_size=1,
            verbose=True
        ),
        # 'ZooAttack': ZooAttack(
        #     classifier=art_classifier,
        #     confidence=0.0,
        #     targeted=False,
        #     learning_rate=0.001,
        #     max_iter=20,
        #     binary_search_steps=bss,
        #     initial_const=1e-4,
        #     abort_early=True,
        #     use_resize=False,
        #     use_importance=True,
        #     nb_parallel=16,
        #     batch_size=1,
        #     variable_h=0.001,
        #     verbose=True
        # ),
    }

    for attack_name, attack in attacks.items():
        path_curr = f"{path_save}/Evasion/{attack_name}/bss_{bss}"
        pathlib.Path(f"{path_curr}").mkdir(parents=True, exist_ok=True)

        X_adv = attack.generate(np.float32(df.loc[ids_tst, feats].values))
        
        df_adv = df.loc[ids_tst, ['Real']].copy()
        df_adv.loc[ids_tst, feats] = X_adv
        model.produce_probabilities = True
        y_pred_prob = model(torch.from_numpy(np.float32(df_adv.loc[ids_tst, feats].values))).cpu().detach().numpy()
        y_pred = np.argmax(y_pred_prob, 1)
        df_adv["Pred"] = y_pred
        df_adv["Prob Control"] = y_pred_prob[:, 0]
        df_adv["Prob Parkinson"] = y_pred_prob[:, 1]
        df_adv["BSS"] = bss
        df_adv["Data"] = attack_name
            
        df_adv.to_excel(f"{path_curr}/df.xlsx", index_label='sample_id')

        metrics_pred = get_cls_pred_metrics(num_classes=2)
        metrics_prob = get_cls_prob_metrics(num_classes=2)
        df_metrics = pd.DataFrame(index=list(metrics_pred.keys()) + list(metrics_prob.keys()))
        y_real = torch.from_numpy(df_adv.loc[ids_tst, "Real"].values.astype('int32'))
        y_pred_atk = torch.from_numpy(df_adv.loc[ids_tst, "Pred"].values.astype('int32'))
        y_pred_ori = torch.from_numpy(df.loc[ids_tst, "Pred"].values.astype('int32'))
        y_prob_atk = torch.from_numpy(df_adv.loc[ids_tst, ["Prob Control", "Prob Parkinson"]].values)
        y_prob_ori = torch.from_numpy(df.loc[ids_tst, ["Prob Control", "Prob Parkinson"]].values)
        for m in metrics_pred:
            m_val = float(metrics_pred[m][0](y_pred_atk, y_real).numpy())
            metrics_pred[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = float(metrics_pred[m][0](y_pred_ori, y_real).numpy())
            df_metrics.at[m, 'Origin'] = m_val
            metrics_pred[m][0].reset()
        for m in metrics_prob:
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_atk, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_ori, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Origin'] = m_val
            
        df_metrics.to_excel(f"{path_curr}/metrics.xlsx", index_label='Metrics')
        
        if attack_name == 'ElasticNet':
            df_bss.loc[bss, "Origin_Accuracy"] = df_metrics.at['accuracy_weighted', 'Origin']
        df_bss.loc[bss, f"{attack_name}_Accuracy"] = df_metrics.at['accuracy_weighted', 'Attack']
            
df_bss.to_excel(f"{path_save}/Evasion/df_bss.xlsx", index_label='eps')

In [None]:
df_bss = pd.read_excel(f"{path_save}/Evasion/df_bss.xlsx", index_col=0)

atks_trgt = ['ElasticNet', 'CarliniL2Method', 'ZooAttack']

df_fig = df_bss.loc[:, [f"{x}_Accuracy" for x in atks_trgt]].copy()
df_fig.rename(columns={f"{x}_Accuracy": x for x in atks_trgt}, inplace=True)
df_fig['BSS'] = df_fig.index.values
df_fig = df_fig.melt(id_vars="BSS", var_name='Method', value_name="Accuracy")
fig = plt.figure()
sns.set_theme(style='whitegrid', font_scale=1)
lines = sns.lineplot(
    data=df_fig,
    x='BSS',
    y="Accuracy",
    hue=f"Method",
    style=f"Method",
    palette=colors_atks_bss,
    hue_order=atks_trgt,
    markers=True,
    dashes=False,
)
lines.set_xlabel('BSS')
basic = pd.read_excel(f"{path_save}/Evasion/ElasticNet/bss_1/metrics.xlsx", index_col=0).at['accuracy_weighted', 'Origin']
x_min = 0.5
x_max = 10.5
lines.set_xlim(x_min, x_max)
plt.gca().plot(
    [x_min, x_max],
    [basic, basic],
    color='k',
    linestyle='dashed',
    linewidth=1
)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_bss.png", bbox_inches='tight', dpi=200)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_bss.pdf", bbox_inches='tight')
plt.close(fig)

## Eta-depended attacks

In [None]:
etas = sorted(list(set.union(
    set(np.linspace(0.1, 1.0, 10)), 
    set(np.linspace(0.01, 0.1, 10)),
    set(np.linspace(0.001, 0.01, 10))
)))

df_etas = pd.DataFrame(index=etas)

for eta in etas:

    attacks = {
        'NewtonFool': NewtonFool(
            classifier=art_classifier,
            max_iter=100,
            eta=eta,
            batch_size=100,
            verbose=True,
        ),
    }

    for attack_name, attack in attacks.items():
        path_curr = f"{path_save}/Evasion/{attack_name}/eta_{eta:0.2e}"
        pathlib.Path(f"{path_curr}").mkdir(parents=True, exist_ok=True)

        X_adv = attack.generate(np.float32(df.loc[ids_atk, feats].values))
        
        df_adv = df.loc[ids_atk, ['Real']].copy()
        df_adv.loc[ids_atk, feats] = X_adv
        model.produce_probabilities = True
        y_pred_prob = model(torch.from_numpy(np.float32(df_adv.loc[ids_atk, feats].values))).cpu().detach().numpy()
        y_pred = np.argmax(y_pred_prob, 1)
        df_adv["Pred"] = y_pred
        df_adv["Prob Control"] = y_pred_prob[:, 0]
        df_adv["Prob Parkinson"] = y_pred_prob[:, 1]
        df_adv["Eta"] = f"{eta:0.2e}"
        df_adv["Data"] = attack_name
            
        df_adv.to_excel(f"{path_curr}/df.xlsx", index_label='sample_id')

        metrics_pred = get_cls_pred_metrics(num_classes=2)
        metrics_prob = get_cls_prob_metrics(num_classes=2)
        df_metrics = pd.DataFrame(index=list(metrics_pred.keys()) + list(metrics_prob.keys()))
        y_real = torch.from_numpy(df_adv.loc[ids_atk, "Real"].values.astype('int32'))
        y_pred_atk = torch.from_numpy(df_adv.loc[ids_atk, "Pred"].values.astype('int32'))
        y_pred_ori = torch.from_numpy(df.loc[ids_atk, "Pred"].values.astype('int32'))
        y_prob_atk = torch.from_numpy(df_adv.loc[ids_atk, ["Prob Control", "Prob Parkinson"]].values)
        y_prob_ori = torch.from_numpy(df.loc[ids_atk, ["Prob Control", "Prob Parkinson"]].values)
        for m in metrics_pred:
            m_val = float(metrics_pred[m][0](y_pred_atk, y_real).numpy())
            metrics_pred[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = float(metrics_pred[m][0](y_pred_ori, y_real).numpy())
            df_metrics.at[m, 'Origin'] = m_val
            metrics_pred[m][0].reset()
        for m in metrics_prob:
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_atk, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Attack'] = m_val
            m_val = 0
            try:
                m_val = float(metrics_prob[m][0](y_prob_ori, y_real).numpy())
            except ValueError:
                pass
            metrics_prob[m][0].reset()
            df_metrics.at[m, 'Origin'] = m_val
            
        df_metrics.to_excel(f"{path_curr}/metrics.xlsx", index_label='Metrics')
        
        if attack_name == 'NewtonFool':
            df_etas.loc[eta, "Origin_Accuracy"] = df_metrics.at['accuracy_weighted', 'Origin']
        df_etas.loc[eta, f"{attack_name}_Accuracy"] = df_metrics.at['accuracy_weighted', 'Attack']
            
df_etas.to_excel(f"{path_save}/Evasion/df_etas.xlsx", index_label='eta')

In [None]:
df_etas = pd.read_excel(f"{path_save}/Evasion/df_etas.xlsx", index_col=0)

atks_trgt = ['NewtonFool']

df_fig = df_etas.loc[:, [f"{x}_Accuracy" for x in atks_trgt]].copy()
df_fig.rename(columns={f"{x}_Accuracy": x for x in atks_trgt}, inplace=True)
df_fig['Eta'] = df_fig.index.values
df_fig = df_fig.melt(id_vars="Eta", var_name='Method', value_name="Accuracy")
fig = plt.figure()
sns.set_theme(style='whitegrid', font_scale=1)
lines = sns.lineplot(
    data=df_fig,
    x='Eta',
    y="Accuracy",
    hue=f"Method",
    style=f"Method",
    palette=colors_atks_eta,
    hue_order=atks_trgt,
    markers=True,
    dashes=False,
)
plt.xscale('log')
lines.set_xlabel(r'$\eta$')
x_min = 8e-4
x_max = 1.1
basic = df_etas.at[0.01, f"Origin_Accuracy"]
lines.set_xlim(x_min, x_max)
plt.gca().plot(
    [x_min, x_max],
    [basic, basic],
    color='k',
    linestyle='dashed',
    linewidth=1
)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_eta.png", bbox_inches='tight', dpi=200)
plt.savefig(f"{path_save}/Evasion/line_accuracy_vs_eta.pdf", bbox_inches='tight')
plt.close(fig)

## Outliers for attacks

In [None]:
attacks_options = {
    'Eps': {
        'types': ['MomentumIterative', 'BasicIterative', 'FastGradient'],
        'values': [0.005, 0.02, 0.05, 0.2]
    },
    'BSS': {
        'types': ['ElasticNet', 'CarliniL2Method', 'ZooAttack'],
        'values': [2, 4, 6, 8]
    },
    'Eta': {
        'types': ['NewtonFool'],
        'values': [1e-3, 2e-3, 3e-3, 1e-2]
    },
}

with warnings.catch_warnings():
    warnings.simplefilter(action='ignore', category=FutureWarning)
    warnings.simplefilter(action='ignore', category=UserWarning)
    warnings.simplefilter(action='ignore', category=RuntimeWarning)
    warnings.simplefilter(action='ignore', category=DeprecationWarning)

    for var_param, opt in attacks_options.items():
        print(var_param)
        for atk_type in opt['types']:
            print(atk_type)
            for var_val_id, var_val in enumerate(opt['values']):
                print(var_val)
                if var_param == 'Eps':
                    path_curr = f"{path_save}/Evasion/{atk_type}/eps_{var_val:0.4f}"
                    val_str = f'Eps = {var_val:0.4f}'
                elif var_param == 'BSS':
                    path_curr = f"{path_save}/Evasion/{atk_type}/bss_{var_val}"
                    val_str = f'BSS = {var_val}'
                else:
                    path_curr = f"{path_save}/Evasion/{atk_type}/eta_{var_val:0.2e}"
                    val_str = f'Eta = {var_val:0.2e}'
                    
                df_adv = pd.read_excel(f"{path_curr}/df.xlsx", index_col='sample_id')
                
                df_outs = pd.DataFrame(index=list(classifiers.keys()), columns=list(thresholders.keys()))
                for pyod_m_name, pyod_m in classifiers.items():
                    scores = pyod_m.fit(df_adv.loc[:, feats].values).decision_scores_
                    for pythresh_m_name, pythresh_m in thresholders.items(): 
                        labels = pythresh_m.eval(scores)
                        df_outs.at[pyod_m_name, pythresh_m_name] = sum(labels) / len(labels) * 100
                        
                df_outs.to_excel(f"{path_curr}/outliers.xlsx")
                
                df_fig = df_outs.astype(float)
                sns.set_theme(style='ticks', font_scale=1.0)
                fig, ax = plt.subplots(figsize=(16, 7))
                heatmap = sns.heatmap(
                    df_fig,
                    annot=True,
                    fmt=".1f",
                    cmap='hot',
                    linewidth=0.1,
                    linecolor='black',
                    cbar_kws={
                        'orientation': 'horizontal',
                        'location': 'top',
                        'pad': 0.025,
                        'aspect': 30
                    },
                    annot_kws={"size": 12},
                    ax=ax
                )
                ax.set_xlabel('Outliers Detection Algorithms')
                ax.set_ylabel('Thresholding Algorithms')
                heatmap_pos = heatmap.get_position()
                ax.figure.axes[-1].set_title("Outliers' percentage")
                ax.figure.axes[-1].tick_params()
                for spine in ax.figure.axes[-1].spines.values():
                    spine.set_linewidth(1)
                plt.savefig(f"{path_curr}/outliers.png", bbox_inches='tight', dpi=200)
                plt.savefig(f"{path_curr}/outliers.pdf", bbox_inches='tight')
                plt.close(fig)

# Adversarial defences from attacks

## Generate detectors

In [None]:
df_ori = df.loc[ids_tst, feats].copy()
df_ori['Class'] = 'Original'

epsilons = sorted(list(set.union(
    set(np.linspace(0.1, 1.0, 10)), 
    set(np.linspace(0.01, 0.1, 10)),
    set(np.linspace(0.001, 0.01, 10))
)))
bsss = list(range(1, 11, 1))
etas = sorted(list(set.union(
    set(np.linspace(0.1, 1.0, 10)), 
    set(np.linspace(0.01, 0.1, 10)),
    set(np.linspace(0.001, 0.01, 10))
)))

attacks_options = {
    'Eps': {
        'types': ['MomentumIterative', 'BasicIterative', 'FastGradient'],
        'values': epsilons
    },
    'BSS': {
        'types': ['ElasticNet', 'CarliniL2Method', 'ZooAttack'],
        'values': bsss
    },
    'Eta': {
        'types': ['NewtonFool'],
        'values': etas
    },
}

In [None]:
datasets = {}
for var_param, opt in attacks_options.items():
    print(var_param)
    datasets[var_param] = {}
    for atk_type in opt['types']:
        print(atk_type)
        datasets[var_param][atk_type] = {}
        for var_val_id, var_val in enumerate(opt['values']):
            if var_param == 'Eps':
                path_curr = f"{path_save}/Evasion/{atk_type}/eps_{var_val:0.4f}"
                val_str = f'{var_val:0.4f}'
            elif var_param == 'BSS':
                path_curr = f"{path_save}/Evasion/{atk_type}/bss_{var_val}"
                val_str = f'{var_val}'
            else:
                path_curr = f"{path_save}/Evasion/{atk_type}/eta_{var_val:0.2e}"
                val_str = f'{var_val:0.2e}'
            print(val_str)
            df_adv = pd.read_excel(f"{path_curr}/df.xlsx", index_col=0)
            df_adv = df_adv[feats]
            df_adv['Class'] = 'Attack'
            datasets[var_param][atk_type][val_str] = df_adv

In [None]:
with warnings.catch_warnings():
    warnings.simplefilter(action='ignore', category=FutureWarning)
    warnings.simplefilter(action='ignore', category=UserWarning)
    warnings.simplefilter(action='ignore', category=RuntimeWarning)
    warnings.simplefilter(action='ignore', category=DeprecationWarning)

    for var_param, opt in attacks_options.items():
        print(var_param)
        for atk_type in opt['types']:
            print(atk_type)
            
            df_def_acc = pd.DataFrame(index=opt['values'], columns=['Model'] + list(opt['values']))
            
            for var_val_id, var_val in enumerate(opt['values']):
                print(var_val)
                if var_param == 'Eps':
                    path_curr = f"{path_save}/Evasion/{atk_type}/eps_{var_val:0.4f}"
                    val_str = f'{var_val:0.4f}'
                elif var_param == 'BSS':
                    path_curr = f"{path_save}/Evasion/{atk_type}/bss_{var_val}"
                    val_str = f'{var_val}'
                else:
                    path_curr = f"{path_save}/Evasion/{atk_type}/eta_{var_val:0.2e}"
                    val_str = f'{var_val:0.2e}'
                    
                # df_adv = pd.read_excel(f"{path_curr}/df.xlsx", index_col='sample_id')
                # df_adv = df_adv[feats]
                # df_adv['Class'] = 'Attack'
                df_adv = datasets[var_param][atk_type][val_str]
                df_def = pd.concat([df_ori.loc[ids_tst, :], df_adv.loc[ids_tst, :]])
                
                df_def_trn, df_def_val, df_def_tst = split_stratified_into_train_val_test(
                    df_def, 
                    stratify_colname='Class',
                    frac_train=0.60,
                    frac_val=0.20,
                    frac_test=0.20
                )
                
                data_config = DataConfig(
                    target=['Class'],
                    continuous_cols=list(feats),
                    continuous_feature_transform='yeo-johnson',
                    normalize_continuous_features=True,
                )
                
                trainer_config = TrainerConfig(
                    batch_size=1024,
                    max_epochs=100,
                    min_epochs=1,
                    auto_lr_find=True,
                    early_stopping='valid_loss',
                    early_stopping_min_delta=0.0001,
                    early_stopping_mode='min',
                    early_stopping_patience=100,
                    checkpoints='valid_loss',
                    checkpoints_path=f"{path_curr}/detector",
                    load_best=True,
                    progress_bar='none',
                    seed=42
                )
                
                optimizer_config = OptimizerConfig(
                    optimizer='Adam',
                    lr_scheduler='CosineAnnealingWarmRestarts',
                    lr_scheduler_params={
                        'T_0': 10,
                        'T_mult': 1,
                        'eta_min': 0.00001,
                    },
                    lr_scheduler_monitor_metric='valid_loss'
                )
        
                head_config = LinearHeadConfig(
                    layers='',
                    activation='ReLU',
                    dropout=0.1,
                    use_batch_norm=False,
                    initialization='xavier',
                ).__dict__
                
                model_config = CategoryEmbeddingModelConfig(
                    task="classification",
                    layers="256-128-64",
                    activation="LeakyReLU",
                    dropout=0.1,
                    initialization="kaiming",
                    head="LinearHead",
                    head_config=head_config,
                    learning_rate=1e-3,
                )
                
                # model_list = [model_config]
                model_list = 'lite'
        
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore")
                    sweep_df, best_model = model_sweep(
                        task="classification",
                        train=df_def_trn,
                        validation=df_def_val,
                        test=df_def_tst,
                        data_config=data_config,
                        optimizer_config=optimizer_config,
                        trainer_config=trainer_config,
                        model_list=model_list,
                        common_model_args=dict(head="LinearHead", head_config=head_config),
                        metrics=[
                            'accuracy',
                            'f1_score',
                            'precision',
                            'recall',
                            'specificity',
                            'cohen_kappa',
                            'auroc'
                        ],
                        metrics_prob_input=[True, True, True, True, True, True, True],
                        metrics_params=[
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                            {'task': 'multiclass', 'num_classes': 2},
                            {'task': 'multiclass', 'num_classes': 2, 'average': 'weighted'},
                        ],
                        rank_metric=("accuracy", "higher_is_better"),
                        progress_bar=False,
                        verbose=False,
                        suppress_lightning_logger=True,
                    )
                ckpts = glob(f"{path_curr}/detector/*")
                for ckpt in ckpts:
                    os.remove(ckpt)
                # best_model.save_model(f"{path_curr}/detector")
                df_def_acc.at[var_val, 'Model'] = best_model.config['_model_name']
                
                for tst_var_val_id, tst_var_val in enumerate(opt['values']):
                    if tst_var_val != var_val:
                        print(f"Testing: {tst_var_val}")
                        if var_param == 'Eps':
                            path_tst = f"{path_save}/Evasion/{atk_type}/eps_{tst_var_val:0.4f}"
                            val_str_tst = f'{tst_var_val:0.4f}'
                        elif var_param == 'BSS':
                            path_tst = f"{path_save}/Evasion/{atk_type}/bss_{tst_var_val}"
                            val_str_tst = f'{tst_var_val}'
                        else:
                            path_tst = f"{path_save}/Evasion/{atk_type}/eta_{tst_var_val:0.2e}"
                            val_str_tst = f'{tst_var_val:0.2e}'

                        # df_adv_tst = pd.read_excel(f"{path_tst}/df.xlsx", index_col='sample_id')
                        # df_adv_tst = df_adv_tst[feats]
                        # df_adv_tst['Class'] = 'Attack'
                        df_adv_tst = datasets[var_param][atk_type][val_str_tst]

                        df_def_tst_eps = pd.concat([df_ori, df_adv_tst])
                        metrics = best_model.evaluate(test=df_def_tst_eps, verbose=False)[0]
                        df_def_acc.at[var_val, tst_var_val] = metrics['test_accuracy']
            df_def_acc.to_excel(f"{path_save}/Evasion/{atk_type}/detectors_accuracy.xlsx")
    

## Plot detectors accuracy

In [None]:
for var_param, opt in attacks_options.items():
        print(var_param)

        for atk_type in opt['types']:
            print(atk_type)
            df_def_acc = pd.read_excel(f"{path_save}/Evasion/{atk_type}/detectors_accuracy.xlsx", index_col=0)
            df_def_acc.drop(['Model'], axis=1, inplace=True)

            if var_param == 'Eps':
                figsize=(13, 12)
                df_def_acc.rename(columns={x: f"{x:.3f}" for x in df_def_acc.columns}, inplace=True)
                df_def_acc['index'] = [f"{x:.3f}" for x in df_def_acc.index.values]
            elif var_param == 'BSS':
                figsize=(6, 6)
                df_def_acc.rename(columns={x: f"{x:d}" for x in df_def_acc.columns}, inplace=True)
                df_def_acc['index'] = [f"{x:d}" for x in df_def_acc.index.values]
            else:
                figsize=(13, 12)
                df_def_acc.rename(columns={x: f"{x:.4f}" for x in df_def_acc.columns}, inplace=True)
                df_def_acc['index'] = [f"{x:.4f}" for x in df_def_acc.index.values]
            df_def_acc.set_index('index', inplace=True)
            
            df_fig = df_def_acc.astype(float)
            sns.set_theme(style='ticks', font_scale=1.0)
            fig, ax = plt.subplots(figsize=figsize)
            heatmap = sns.heatmap(
                df_fig,
                annot=True,
                fmt=".2f",
                cmap='hot',
                linewidth=0.1,
                linecolor='black',
                cbar_kws={
                    'orientation': 'horizontal',
                    'location': 'top',
                    'pad': 0.025,
                    'aspect': 30
                },
                annot_kws={"size": 9},
                ax=ax
            )
            ax.set_xlabel('Test Attack Strength')
            ax.set_ylabel('Train Attack Strength')
            heatmap_pos = heatmap.get_position()
            ax.figure.axes[-1].set_title("Accuracy")
            ax.figure.axes[-1].tick_params()
            for spine in ax.figure.axes[-1].spines.values():
                spine.set_linewidth(1)
            plt.savefig(f"{path_save}/Evasion/{atk_type}/detectors_accuracy.png", bbox_inches='tight', dpi=200)
            plt.savefig(f"{path_save}/Evasion/{atk_type}/detectors_accuracy.pdf", bbox_inches='tight')
            plt.close(fig)