In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from lightgbm import LGBMClassifier
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score, confusion_matrix
import matplotlib.pyplot as plt

from art.estimators.classification import SklearnClassifier, BlackBoxClassifier
from art.attacks.evasion import HopSkipJump
from art.utils import to_categorical

import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Caricamento e Preprocessing dei Dati
file_path = "adult.csv"
try:
    df = pd.read_csv(file_path, na_values='?')
except FileNotFoundError:
    print(f"Errore: File '{file_path}' non trovato. Assicurati che il file sia presente.")
    exit()

df = df.dropna()

df['gender'] = df['gender'].map({'Male': 0, 'Female': 1}).astype(int)
df['high_income'] = df['income'].map({'<=50K': 0, '>50K': 1})
df.drop('income', axis=1, inplace=True)

df['native-country'] = df['native-country'].apply(lambda x: 'Other' if x != 'United-States' else x)

df.drop('education', axis=1, inplace=True)
df.drop('fnlwgt', axis=1, inplace=True)

X = df.drop('high_income', axis=1)
y = df['high_income']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

categorical_features_to_encode = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'native-country']
numerical_features = ['age', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week', 'gender']

X_train_processed = X_train.copy()
X_test_processed = X_test.copy()

ordinal_encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
X_train_processed[categorical_features_to_encode] = ordinal_encoder.fit_transform(X_train[categorical_features_to_encode])
X_test_processed[categorical_features_to_encode] = ordinal_encoder.transform(X_test[categorical_features_to_encode])

scaler = StandardScaler()
X_train_processed[numerical_features] = scaler.fit_transform(X_train_processed[numerical_features])
X_test_processed[numerical_features] = scaler.transform(X_test_processed[numerical_features])

X_train_processed = X_train_processed.astype(np.float32)
X_test_processed = X_test_processed.astype(np.float32)
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)

# Addestramento del Modello LightGBM Originale
print("\n--- Addestramento del Modello LightGBM Originale ---")
lgbm_model_orig = LGBMClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42, verbosity=-1)
lgbm_model_orig.fit(X_train_processed, y_train)
print("Addestramento modello originale completato.")

# Valutazione del Modello Originale
print("\n--- Valutazione del Modello LightGBM Originale (su dati puliti) ---")
y_pred_orig = lgbm_model_orig.predict(X_test_processed)
y_pred_proba_orig = lgbm_model_orig.predict_proba(X_test_processed)[:, 1]

accuracy_orig = accuracy_score(y_test, y_pred_orig)
roc_auc_orig = roc_auc_score(y_test, y_pred_proba_orig)
print(f"Accuratezza Originale: {accuracy_orig:.4f}")
print(f"ROC AUC Score Originale: {roc_auc_orig:.4f}")

# Preparazione del Classificatore per ART
min_val = np.min(X_train_processed.values)
max_val = np.max(X_train_processed.values)
clip_values = (min_val, max_val)

art_classifier_orig = BlackBoxClassifier(
    predict_fn=lgbm_model_orig.predict_proba,
    input_shape=X_train_processed.shape[1:],
    nb_classes=2,
    clip_values=clip_values,
)
print("\nModello originale wrappato per ART con BlackBoxClassifier (usando predict_proba).")

# Simulazione di Attacco di Evasione (HopSkipJump)
print("\n--- Simulazione Attacco di Evasione (HopSkipJump) ---")
n_evasion_samples = 50
if len(X_test_processed) > n_evasion_samples:
    X_test_subset_evasion = X_test_processed.iloc[:n_evasion_samples].values
    y_test_subset_evasion = y_test.iloc[:n_evasion_samples].values
else:
    X_test_subset_evasion = X_test_processed.values
    y_test_subset_evasion = y_test.values
    n_evasion_samples = len(X_test_processed)

attack_evasion = HopSkipJump(classifier=art_classifier_orig,
                             targeted=False,
                             max_iter=10,
                             max_eval=100,
                             init_eval=10,
                             verbose=False)

print(f"Generazione di {n_evasion_samples} campioni avversari di evasione (può richiedere tempo)...")
X_test_adversarial_evasion = attack_evasion.generate(x=X_test_subset_evasion)
print("Generazione campioni avversari di evasione completata.")

y_pred_evasion_attack_proba = art_classifier_orig.predict(X_test_adversarial_evasion)
y_pred_evasion_attack_labels = np.argmax(y_pred_evasion_attack_proba, axis=1)

accuracy_evasion_attack = accuracy_score(y_test_subset_evasion, y_pred_evasion_attack_labels)
roc_auc_evasion_attack = roc_auc_score(y_test_subset_evasion, y_pred_evasion_attack_proba[:, 1])

print(f"Accuratezza del modello originale su {n_evasion_samples} campioni avversari di evasione: {accuracy_evasion_attack:.4f}")
print(f"ROC AUC del modello originale su {n_evasion_samples} campioni avversari di evasione: {roc_auc_evasion_attack:.4f}")


# Simulazione Attacco di Poisoning (Label Flipping)
print("\n--- Simulazione Attacco di Poisoning (Label Flipping) ---")

poison_percentage = 0.40
n_poison = int(poison_percentage * len(X_train_processed))

X_train_poisoned = X_train_processed.copy()
y_train_poisoned = y_train.copy()

lgbm_temp = LGBMClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42, verbose=-1)
lgbm_temp.fit(X_train_processed, y_train)
y_pred_proba_temp = lgbm_temp.predict_proba(X_train_processed)[:, 1]

uncertainty = np.abs(y_pred_proba_temp - 0.5)

poison_indices_sorted = np.argsort(uncertainty)
poison_indices = X_train_processed.index[poison_indices_sorted[:n_poison]]

y_train_poisoned.loc[poison_indices] = 1 - y_train_poisoned.loc[poison_indices]
print(f"{n_poison} etichette flippate strategicamente nel training set per l'attacco di poisoning.")

lgbm_model_poisoned = LGBMClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42, verbosity=-1)
lgbm_model_poisoned.fit(X_train_poisoned, y_train_poisoned)
print("Addestramento modello su dati avvelenati strategicamente completato.")

y_pred_poisoned_model = lgbm_model_poisoned.predict(X_test_processed)
y_pred_proba_poisoned_model = lgbm_model_poisoned.predict_proba(X_test_processed)[:, 1]

accuracy_poisoned_model = accuracy_score(y_test, y_pred_poisoned_model)
roc_auc_poisoned_model = roc_auc_score(y_test, y_pred_proba_poisoned_model)
print(f"Accuratezza del modello avvelenato (su dati di test puliti): {accuracy_poisoned_model:.4f}")
print(f"ROC AUC Score del modello avvelenato (su dati di test puliti): {roc_auc_poisoned_model:.4f}")

# Difesa: Addestramento Avversario
print("\n--- Difesa: Addestramento Avversario ---")
n_adv_train_samples = 200
if len(X_train_processed) > n_adv_train_samples:
    X_train_subset_for_adv_df_indexed = X_train_processed.sample(n=n_adv_train_samples, random_state=42)
    X_train_subset_for_adv = X_train_subset_for_adv_df_indexed.values
else:
    X_train_subset_for_adv_df_indexed = X_train_processed.copy()
    X_train_subset_for_adv = X_train_subset_for_adv_df_indexed.values
    n_adv_train_samples = len(X_train_processed)

print(f"Generazione di {n_adv_train_samples} campioni avversari dal training set per l'addestramento (può richiedere tempo)...")
attack_adv_training = HopSkipJump(classifier=art_classifier_orig,
                                  targeted=False,
                                  max_iter=10,
                                  max_eval=100,
                                  init_eval=10,
                                  verbose=False)
X_train_adversarial_generated = attack_adv_training.generate(x=X_train_subset_for_adv)
print("Generazione campioni avversari per l'addestramento completata.")

X_train_adversarial_generated_df = pd.DataFrame(X_train_adversarial_generated, columns=X_train_processed.columns)
y_train_subset_for_adv_labels = y_train.loc[X_train_subset_for_adv_df_indexed.index]

X_train_augmented = pd.concat([X_train_processed, X_train_adversarial_generated_df], ignore_index=True)
y_train_augmented = pd.concat([y_train, y_train_subset_for_adv_labels], ignore_index=True)

print(f"Dimensione training set originale: {X_train_processed.shape}, Aumentato a: {X_train_augmented.shape}")

lgbm_model_adv_trained = LGBMClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42, verbosity=-1)
lgbm_model_adv_trained.fit(X_train_augmented, y_train_augmented)
print("Addestramento modello avversario completato.")

# Valutazione del Modello Addestrato Avversariamente
print("\n--- Valutazione Modello Addestrato Avversariamente ---")
y_pred_adv_trained_clean = lgbm_model_adv_trained.predict(X_test_processed)
y_pred_proba_adv_trained_clean = lgbm_model_adv_trained.predict_proba(X_test_processed)[:, 1]
accuracy_adv_trained_clean = accuracy_score(y_test, y_pred_adv_trained_clean)
roc_auc_adv_trained_clean = roc_auc_score(y_test, y_pred_proba_adv_trained_clean)
print(f"Accuratezza modello add. avversariamente (su dati test puliti): {accuracy_adv_trained_clean:.4f}")
print(f"ROC AUC modello add. avversariamente (su dati test puliti): {roc_auc_adv_trained_clean:.4f}")

art_classifier_adv_trained = BlackBoxClassifier(
    predict_fn=lgbm_model_adv_trained.predict_proba,
    input_shape=X_test_processed.shape[1:],
    nb_classes=2,
    clip_values=clip_values,
)

y_pred_adv_trained_adv_test_proba = art_classifier_adv_trained.predict(X_test_adversarial_evasion)
y_pred_adv_trained_adv_test_labels = np.argmax(y_pred_adv_trained_adv_test_proba, axis=1)

accuracy_adv_trained_adv_test = accuracy_score(y_test_subset_evasion, y_pred_adv_trained_adv_test_labels)
roc_auc_adv_trained_adv_test = roc_auc_score(y_test_subset_evasion, y_pred_adv_trained_adv_test_proba[:, 1])

print(f"Accuratezza modello add. avversariamente (su {n_evasion_samples} dati test avversari): {accuracy_adv_trained_adv_test:.4f}")
print(f"ROC AUC modello add. avversariamente (su {n_evasion_samples} dati test avversari): {roc_auc_adv_trained_adv_test:.4f} (su {n_evasion_samples} campioni)")

# Riepilogo dei Risultati
print("\n--- Riepilogo Risultati ---")
print(f"Modello Originale (su dati puliti):          Accuratezza={accuracy_orig:.4f}, ROC AUC={roc_auc_orig:.4f}")
print(f"Modello Originale (sotto attacco evasione): Accuratezza={accuracy_evasion_attack:.4f}, ROC AUC={roc_auc_evasion_attack:.4f} (su {n_evasion_samples} campioni)")
print(f"Modello Avvelenato (su dati puliti):         Accuratezza={accuracy_poisoned_model:.4f}, ROC AUC={roc_auc_poisoned_model:.4f}")
print(f"Modello Add. Avversariamente (dati puliti): Accuratezza={accuracy_adv_trained_clean:.4f}, ROC AUC={roc_auc_adv_trained_clean:.4f}")
print(f"Modello Add. Avversariamente (dati avv.):   Accuratezza={accuracy_adv_trained_adv_test:.4f}, ROC AUC={roc_auc_adv_trained_adv_test:.4f} (su {n_evasion_samples} campioni)")

print("\nSimulazione completata.")