# Attack on Diva using LIME and noisy dataset

In [None]:
from pathlib import Path
import sys
from numba import njit

UTILS_RELATIVE_PATH = "../../../../"
sys.path.append(UTILS_RELATIVE_PATH)

MLEM_RELATIVE_PATH = "../../../../.."
sys.path.append(MLEM_RELATIVE_PATH)

LIME_RELATIVE_PATH = "../../../../../lime/"
sys.path.append(LIME_RELATIVE_PATH)

OUTPUT_FOLDER = Path("experiment_output")
OUTPUT_FOLDER.mkdir(exist_ok=True)

import logging
logging.disable('DEBUG')


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme()
import numpy as np
import scipy.spatial.distance as distance
import multiprocessing

np.random.seed(4321)
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from lime.lime_tabular import LimeTabularExplainer # type: ignore
from mlem.utilities import generate_balanced_dataset, save_pickle_bz2, load_pickle_bz2, save_txt
from mlem.ensemble import EnsembleClassifier


importing the experiment utilities and the mlem module

# Loading the Adult data

loading the Adult RandomForest and the dictionary with all the useful data

In [None]:
from utils.dataloading.diva import load_diva_data, load_diva_randomforest # type: ignore

BB = load_diva_randomforest()
BB_DATA = load_diva_data()

print(classification_report(BB_DATA['y_test'], BB.predict(BB_DATA['X_test'])))

# Creating the explainer

In [None]:
explainer_training_data = BB_DATA['X_train']
explainer = LimeTabularExplainer(training_data=explainer_training_data, categorical_features=[i for (i, el) in enumerate(BB_DATA['categorical_features_mask']) if el], random_state=123)

def generate_data_lime(x, num_samples):
    return explainer.data_inverse(x, num_samples+1, 'gaussian')[1][1:]

def generate_local_models_and_neighborhood(x, num_samples):
    """Generate the lime local models and neighborhood.
    
    Params:
        x - instance aroung which generate the neighborhood and learn the local model
        num_samples - number of samples to generate
    Return
        local_model, x_neigh, y_neigh
    """
    # Exploits Lime to get the neighborhood and the local model
    _, models, x_neigh = explainer.explain_instance(
        x,
        BB.predict_proba,
        labels=[0,1],
        sampling_method="gaussian",
        num_samples=num_samples,
        num_features=len(x),
    )

    # Local model is the one pointed by the instance
    local_model = EnsembleClassifier(classifiers=models)
    # Generates predictions for the neighborhood
    y_neigh = local_model.predict(x_neigh)
    return local_model, x_neigh, y_neigh

def filter_elements_std(elems, x, std=3):
    df_ = pd.DataFrame(elems)
    df_['Dist'] = distance.cdist(elems, [x])
    mean = df_.Dist.mean()
    dev = df_.Dist.std()
    closest = df_[df_['Dist'] < mean+std*dev]
    return closest.drop(labels=['Dist'], axis=1)

# Generating the lime datasets and local models

In [None]:
test_representatives_x = BB_DATA['X_attack_3_per_quantile']
test_representatives_y = BB_DATA['y_attack_3_per_quantile']
n_datasets = len(test_representatives_x)

Explain each instance and save the local model and the neighborhood

Creating the local models and neigh. <span style="color:red"> if they don't already exist </span>.

In [None]:
def generate_and_save(index, instance):
    # wrapper of the above functions to be able to generate the datasets in parallel
    output_path = OUTPUT_FOLDER / f"{index}"
    output_path.mkdir(exist_ok=True)

    local_model, x_neigh, y_neigh = generate_local_models_and_neighborhood(instance, 5000)
    
    local_neighborhood = pd.DataFrame(x_neigh)
    local_neighborhood['Target'] = y_neigh

    local_neighborhood.to_csv("lime_neigh.csv")
    save_pickle_bz2(output_path / "lime_localmodel.bz2", local_model)

    with open(output_path / "instance.npy", "wb") as f:
        np.save(f, instance)


if not any([Path(OUTPUT_FOLDER / f"{j}" / "lime_neigh.csv").exists() for j in range(len(test_representatives_x))]):
    with multiprocessing.Pool(processes=8) as pool:
        pool.starmap(generate_and_save, [*enumerate(test_representatives_x)])
else:
    print("The lime neighborhood and local models already exist")

# Attack on the Local Models to create the attack models

Attack on the Local Models using the <span style="background: green">noisy dataset</span> labeled by the decision trees to create the shadow models.

In [None]:
noisy_dataset_x = pd.DataFrame(BB_DATA['X_validation_noisy'])
noisy_dataset_y = pd.DataFrame(BB_DATA['y_validation_noisy'])
categorical_features_mask = BB_DATA['categorical_features_mask']

Creating the shadow models and the attack models.

In [None]:
from mlem.shadow_models import ShadowModelsManager
from mlem.utilities import create_adaboost
from mlem.attack_models import AttackModelsManager, AttackStrategy

In [None]:
def compute_statistics_local_model(dataset, true_y, local_model, black_box, output_folder, filename):
    local_y = local_model.predict(dataset.to_numpy())
    local_bb = black_box.predict(dataset.to_numpy())

    report_local = classification_report(true_y.to_numpy(), local_y)
    report_bb    = classification_report(true_y.to_numpy(), local_bb)

    fidelity = str(pd.DataFrame(local_y == local_bb).value_counts(normalize=True))

    with open(output_folder / filename, "w") as f:
        f.write("Statistics on the noisy validation dataset\n")
        
        f.write("local model\n")
        f.write(report_local)

        f.write("\nblack box\n")
        f.write(report_bb)

        f.write("\nFidelity between the local model and the black box\n")
        f.write(fidelity)

Run the attack only if it hasn't already been run

In [None]:
if not any([(OUTPUT_FOLDER / f"{i}" / "attack").exists() for i in range(len(test_representatives_x))]):
    
    for path in [OUTPUT_FOLDER / f"{i}" for i in range(len(test_representatives_x))]:
        # load the local model and label the noisy dataset
        local_model = load_pickle_bz2(path / "lime_localmodel.bz2")

        # compute fidelity and performances on the noisy dataset.
        # NOTE: The labels associated with the noisy dataset are the same of the clean one.
        compute_statistics_local_model(noisy_dataset_x, noisy_dataset_y, local_model, BB, path, "statistics_noisy_lime.txt")

        x_attack = noisy_dataset_x.to_numpy()    
        y_attack = local_model.predict(x_attack)

        path_shadow = str(path / "shadow")

        shadow_models = ShadowModelsManager(
            n_models=4,
            results_path=path_shadow,
            test_size=0.5,
            random_state=12,
            model_creator_fn=create_adaboost,
            categorical_mask=categorical_features_mask
        )
        # x_attack is the noisy dataset
        shadow_models.fit(x_attack, y_attack)

        # extracting the dataset for the attack models
        attack_models_dataset = shadow_models.get_attack_dataset()

        # saving the attack dataset
        attack_models_dataset.to_csv(path / "attack_models_train_dataset.csv", index=False)

        # Creating the attack model for each label using Adaboost
        path_attack = str(path / "attack")
        attack_models = AttackModelsManager(
                results_path=path_attack, model_creator_fn=create_adaboost, attack_strategy=AttackStrategy.ONE_PER_LABEL
        )
        
        attack_models.fit(attack_models_dataset)
else:
    print("The attack models already exist")

By looking at the test reports of the attack models, it seems that they <span style="background: green">perform better on the class 1</span> with an accuracy of $\simeq .62$, while the accuracy on the class 0 is $\simeq .51$.

# Creating the ensembles

In [None]:
from mlem.ensemble import HardVotingClassifier, SoftVotingClassifier, KMostSureVotingClassifier
from utils.attack_evaluation import evaluate_attack

In [None]:
attack_models_0 = [load_pickle_bz2(OUTPUT_FOLDER / f"{i}" / "attack" / "0" / "model.pkl.bz2") for i in range(len(test_representatives_x))]
attack_models_1 = [load_pickle_bz2(OUTPUT_FOLDER / f"{i}" / "attack" / "1" / "model.pkl.bz2") for i in range(len(test_representatives_x))]

## Hard Voting

In [None]:
hv0 = HardVotingClassifier(classifiers=attack_models_0)
hv1 = HardVotingClassifier(classifiers=attack_models_1)

In [None]:
evaluate_attack(hv0, hv1, BB, BB_DATA)

## Soft Voting

In [None]:
sv0 = SoftVotingClassifier(classifiers=attack_models_0)
sv1 = SoftVotingClassifier(classifiers=attack_models_1)

In [None]:
evaluate_attack(sv0, sv1, BB, BB_DATA)