In [31]:
import pandas as pd
import tensorflow as tf
import joblib
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score
#from tensorflow.keras.models import load_model
from tensorflow.keras.models import load_model
from art.estimators.classification import KerasClassifier, SklearnClassifier
from art.estimators.classification import XGBoostClassifier
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
import glob
import os

# Path to the folder containing your CSV files
csv_folder = 'road/preprocessed'  # Update this
# Read all CSV files in the folder
csv_files = glob.glob(os.path.join(csv_folder, "*.csv"))
csv_files = [f for f in csv_files if not os.path.basename(f).rstrip(".csv").endswith("m")]
print(csv_files)


['road/preprocessed\\csa1.csv', 'road/preprocessed\\csa2.csv', 'road/preprocessed\\csa3.csv', 'road/preprocessed\\fa1.csv', 'road/preprocessed\\fa2.csv', 'road/preprocessed\\fa3.csv', 'road/preprocessed\\mecta.csv', 'road/preprocessed\\msa1.csv', 'road/preprocessed\\msa2.csv', 'road/preprocessed\\msa3.csv', 'road/preprocessed\\rloffa1.csv', 'road/preprocessed\\rloffa2.csv', 'road/preprocessed\\rloffa3.csv', 'road/preprocessed\\rlona1.csv', 'road/preprocessed\\rlona2.csv', 'road/preprocessed\\rlona3.csv']


In [32]:
# Subsets of files
csa = csv_files[:3]
fa = csv_files[3:6]
mecta = csv_files[6:7]
msa = csv_files[7:10]
rloffa = csv_files[10:13]
rlona = csv_files[13:16]
print(csa)
print(fa)
print(mecta)
print(msa)
print(rloffa)
print(rlona)
# Function to load subset and filter normal samples
def load_dataset(files):
    df = pd.concat((pd.read_csv(f) for f in files), ignore_index=True)
    df=df[df['Flag'] == 1].copy()
    X = df.drop(columns=['Flag'])   # features only
    y = df['Flag'].values           
    X.columns = [c.replace("[", "_").replace("]", "").replace("<", "_") for c in X.columns]
    return X, y

# Load datasets with labels
X_csa, y_csa = load_dataset(csa)
X_fa, y_fa = load_dataset(fa)
X_mecta, y_mecta = load_dataset(mecta)
X_msa, y_msa = load_dataset(msa)
X_rloffa, y_rloffa = load_dataset(rloffa)
X_rlona, y_rlona = load_dataset(rlona)


# Load models
dnn_model = tf.keras.models.load_model("road/models/dnn_model.h5")
dt_model = joblib.load("road/models/dt_model.pkl")
rf_model = joblib.load("road/models/rf_model.pkl")
et_model = joblib.load("road/models/et_model.pkl")
xgb_model = joblib.load("road/models/xgboost_model.pkl")

# Wrap models
dnn_art = KerasClassifier(model=dnn_model, clip_values=(0, 255))
#dt_art = SklearnClassifier(model=dt_model, clip_values=(0, 255))
#rf_art = SklearnClassifier(model=rf_model, clip_values=(0, 255))
#et_art = SklearnClassifier(model=et_model, clip_values=(0, 255))
#xgb_art = XGBoostClassifier(model=xgb_model, clip_values=(0, 255))


['road/preprocessed\\csa1.csv', 'road/preprocessed\\csa2.csv', 'road/preprocessed\\csa3.csv']
['road/preprocessed\\fa1.csv', 'road/preprocessed\\fa2.csv', 'road/preprocessed\\fa3.csv']
['road/preprocessed\\mecta.csv']
['road/preprocessed\\msa1.csv', 'road/preprocessed\\msa2.csv', 'road/preprocessed\\msa3.csv']
['road/preprocessed\\rloffa1.csv', 'road/preprocessed\\rloffa2.csv', 'road/preprocessed\\rloffa3.csv']
['road/preprocessed\\rlona1.csv', 'road/preprocessed\\rlona2.csv', 'road/preprocessed\\rlona3.csv']


In [33]:
print(y_csa)

[1 1 1 ... 1 1 1]


In [34]:
from art.attacks.evasion import FastGradientMethod, BasicIterativeMethod, ProjectedGradientDescent

def generate_constrained_attack(estimator, X, method="FGSM", eps=1.0, eps_step=0.1, max_iter=10):
    """
    Generate adversarial examples with IVN constraints using FGSM, BIM, or PGD.
    
    Parameters:
    - estimator: ART classifier
    - X: np.ndarray or pd.DataFrame, shape (n_samples, 10)
    - method: str, one of {"FGSM", "BIM", "PGD"}
    - eps: float, total allowed perturbation
    - eps_step: float, step size
    - max_iter: int, only used for iterative attacks (BIM, PGD)
    
    Returns:
    - X_adv: adversarial examples (np.ndarray), clipped to [0, 255] on DATA[0]-[7]
    """
    if isinstance(X, pd.DataFrame):
        X = X.to_numpy()

    # Only allow perturbation on DATA[0]-[7]
    perturbation_mask = np.array([False, False] + [True] * 8)

    # Select the attack method
    if method == "FGSM":
        attack = FastGradientMethod(estimator=estimator, eps=eps, eps_step=eps_step)
    elif method == "BIM":
        attack = BasicIterativeMethod(estimator=estimator, eps=eps, eps_step=eps_step, max_iter=max_iter, verbose=False)
    elif method == "PGD":
        attack = ProjectedGradientDescent(estimator=estimator, eps=eps, eps_step=eps_step, max_iter=max_iter, num_random_init=1, verbose=False)
    else:
        raise ValueError("Unsupported attack method. Choose 'FGSM', 'BIM', or 'PGD'.")

    # Generate adversarial examples
    X_adv = attack.generate(x=X, mask=perturbation_mask)

    # Clip DATA[0] to DATA[7] to valid byte range
    X_adv[:, 2:] = np.clip(X_adv[:, 2:], 0, 255)
    
    # Log attack details
    print(f"[{method}] Generated adversarial examples with eps={eps}, eps_step={eps_step}, max_iter={max_iter if method != 'FGSM' else 'N/A'}")
    
    return X_adv


In [35]:
from sklearn.metrics import confusion_matrix, f1_score

def constraint_compliant(X):
    X_np = X.to_numpy() if isinstance(X, pd.DataFrame) else X
    return (
        (X_np[:, 0] >= 0) & (X_np[:, 0] <= 1068) &  # CAN ID
        (X_np[:, 1] >= 1) & (X_np[:, 1] <= 8) &     # DLC
        np.all((X_np[:, 2:] >= 0) & (X_np[:, 2:] <= 255), axis=1)  # DATA[0]-[7]
    )

from sklearn.metrics import confusion_matrix, f1_score

def evaluate_model_on_adversarial_tabular(
    model,
    X_clean,
    y_clean,          # true labels (0=normal, 1=attack)
    X_adv_dict,
    model_name="RF",
    attack_name="BIM",
    is_probabilistic=False
):
    results = []
    sample_size = len(X_clean)

    # Predict on clean data
    if is_probabilistic:
        y_pred_clean = np.argmax(model.predict(X_clean), axis=1)
    else:
        y_pred_clean = model.predict(X_clean)

    # Confusion matrix (TN, FP, FN, TP)
    cm_clean = confusion_matrix(y_clean, y_pred_clean, labels=[0,1])
    tn, fp, fn, tp = cm_clean.ravel()
    f1_clean = f1_score(y_clean, y_pred_clean, average='weighted')

    for eps_val, X_adv in X_adv_dict.items():
        valid_mask = constraint_compliant(X_adv)
        X_adv_valid = X_adv[valid_mask]
        if len(X_adv_valid) == 0:
            continue

        # Ground truth is same as y_clean for aligned samples
        y_true_adv = y_clean[valid_mask]

        if is_probabilistic:
            y_pred_adv = np.argmax(model.predict(X_adv_valid), axis=1)
        else:
            y_pred_adv = model.predict(X_adv_valid)

        # Binary prediction: 0=normal, 1=attack
        y_pred_adv_binary = (y_pred_adv != 0).astype(int)

        f1_adv = f1_score(y_true_adv, y_pred_adv_binary, average='weighted')

        # Confusion matrix for adversarial data
        cm_adv = confusion_matrix(y_true_adv, y_pred_adv_binary, labels=[0,1])
        tn_adv, fp_adv, fn_adv, tp_adv = cm_adv.ravel()

        # ASR: fraction of adversarial attack samples misclassified as normal
        attack_mask = (y_true_adv == 1)
        asr = fn_adv / max(1, np.sum(attack_mask))

        results.append([
            model_name, sample_size,
            f"{f1_clean*100:.1f}%", fn,
            attack_name, eps_val,
            f"{f1_adv*100:.1f}%", fn_adv, f"{asr:.1%}"
        ])

    columns = ["Model", "Sample Size",
               "F1 Score", "FN", 
               "Attack", "Epsilon",
               "F1 Score (Adv)", "FN (Adv)", "ASR (FN)"]
    
    return pd.DataFrame(results, columns=columns)


In [36]:
import numpy as np
import pandas as pd
from tqdm import tqdm

def generate_dt_attack(model, X, target_class, epsilon=1.0, max_attempts=10, batch_size=1024):
    X_adv = X.copy().astype(float)

    # Auto-detect the correct DATA column format
    if f"DATA[0]" in X.columns:
        data_features = [f"DATA[{i}]" for i in range(8)]
    elif f"DATA_0" in X.columns:
        data_features = [f"DATA_{i}" for i in range(8)]
    else:
        raise ValueError("Expected columns like 'DATA[0]' or 'DATA_0' not found.")

    for batch_start in tqdm(range(0, len(X_adv), batch_size), desc="Generating DT attack"):
        batch_end = min(batch_start + batch_size, len(X_adv))
        batch = X_adv.iloc[batch_start:batch_end].copy()

        for attempt in range(max_attempts):
            perturbations = np.random.uniform(-epsilon, epsilon, size=(len(batch),))
            features_to_perturb = np.random.choice(data_features, size=len(batch))

            for i, feat in enumerate(features_to_perturb):
                batch.at[batch.index[i], feat] += perturbations[i]
                batch.at[batch.index[i], feat] = np.clip(batch.at[batch.index[i], feat], 0, 255)

            preds = model.predict(batch)
            if len(preds.shape) > 1 and preds.shape[1] > 1:
                preds_classes = np.argmax(preds, axis=1)
            else:
                preds_classes = preds

            success_mask = preds_classes == target_class
            if success_mask.all():
                break

        X_adv.iloc[batch_start:batch_end] = batch

    return X_adv



In [37]:
def run_attacks(dataset_name, X_normal, y_clean, models, attack_methods):
    results = {}
    cols = X_normal.columns
    for attack in attack_methods:
        X_adv_dict_all = {}  # holds adversarial sets per model

        # === Generate adversarial examples ===
        if attack in ["FGSM", "BIM", "PGD"]:
            # These only use the DNN
            if attack == "FGSM":
                X_adv_dict = {
                    1: generate_constrained_attack(dnn_art, X_normal, method="FGSM", eps=1.0, eps_step=0.1),
                    5: generate_constrained_attack(dnn_art, X_normal, method="FGSM", eps=5.0, eps_step=0.1)
                }
            elif attack == "BIM":
                X_adv_dict = {
                    1: generate_constrained_attack(dnn_art, X_normal, method="BIM", eps=1.0, eps_step=0.1, max_iter=10),
                    5: generate_constrained_attack(dnn_art, X_normal, method="BIM", eps=5.0, eps_step=0.1, max_iter=10)
                }
            elif attack == "PGD":
                X_adv_dict = {
                    1: generate_constrained_attack(dnn_art, X_normal, method="PGD", eps=1.0, eps_step=0.1, max_iter=20),
                    5: generate_constrained_attack(dnn_art, X_normal, method="PGD", eps=5.0, eps_step=0.1, max_iter=20)
                }

            # store once, reused by all models
            X_adv_dict_all = {"DNN": X_adv_dict}

        elif attack == "DT":
            # Generate DT attack for *every* model in models
            for model_name, model_obj in models.items():
                if model_name == "DNN":
                    base_model = dnn_model
                else:
                    base_model = model_obj

                X_adv_dict_all[model_name] = {
                    1: generate_dt_attack(base_model, X_normal, target_class=1, epsilon=1.0, batch_size=1024),
                    5: generate_dt_attack(base_model, X_normal, target_class=1, epsilon=5.0, batch_size=1024)
                }

        else:
            continue
    
        # === Evaluate across all models ===
        for model_name, model_obj in models.items():
            X_fixed = pd.DataFrame(X_normal.values, columns=cols)

            # choose the right adversarial examples
            if attack == "DT":
                X_adv_fixed = {eps: pd.DataFrame(X_adv, columns=cols)
                               for eps, X_adv in X_adv_dict_all[model_name].items()}
            else:
                X_adv_fixed = {eps: pd.DataFrame(X_adv, columns=cols)
                               for eps, X_adv in X_adv_dict_all["DNN"].items()}

            result_key = f"{dataset_name}_{model_name}_{attack}"
            results[result_key] = evaluate_model_on_adversarial_tabular(
                model_obj,
                X_fixed,
                y_clean,
                X_adv_fixed,
                model_name=model_name,
                attack_name=attack,
                is_probabilistic=(model_name == "DNN")
            )

    return results


In [38]:
models = {"DNN": dnn_art, "DT": dt_model, "RF": rf_model, "ET": et_model, "XGBoost": xgb_model}

datasets = {"rlona": (X_rlona, y_rlona), "rloffa": (X_rloffa, y_rloffa), "msa": (X_msa, y_msa), "mecta": (X_mecta, y_mecta), "fa": (X_fa, y_fa)}

attack_methods = ["FGSM", "BIM", "PGD"]


In [39]:
# mapping dataset keys to pretty names
dataset_labels = {
    "csa": "Correlated Signal Attack",
    "fa": "Fuzzing Attack",
    "mecta": "Max Engine Coolant Temp Attack",
    "msa": "max_speedometer_attack",  
    "rloffa": "Reverse Light Off Attack",
    "rlona": "Reverse Light On Attack"
}

# Run attacks for all datasets
all_results = {}
for dataset_key, (X,y) in datasets.items():
    dataset_results = run_attacks(dataset_key, X, y, models, attack_methods)
    # merge all model/attack results into one big DataFrame for this dataset
    merged_df = pd.concat(dataset_results.values(), ignore_index=True)

    # store under dataset key
    all_results[dataset_key] = merged_df

# === Print nicely ===
for dataset_key, df in all_results.items():
    dataset_label = dataset_labels.get(dataset_key, dataset_key)
    print(f"\n=== {dataset_label} ===")
    display(df)
   # Jupyter; if terminal: print(df.to_string())


  updates=self.state_updates,


[FGSM] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=N/A
[FGSM] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=N/A
[BIM] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=10
[BIM] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=10
[PGD] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=20
[PGD] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=20
[FGSM] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=N/A
[FGSM] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=N/A
[BIM] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=10
[BIM] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=10
[PGD] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=20
[PGD] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=20
[FGSM] Generated adversarial examples with eps=1.0, eps_step=0.1, ma

Unnamed: 0,Model,Sample Size,F1 Score,FN,Attack,Epsilon,F1 Score (Adv),FN (Adv),ASR (FN)
0,DNN,15195,0.0%,15195,FGSM,1,0.0%,15186,100.0%
1,DNN,15195,0.0%,15195,FGSM,5,0.0%,15186,100.0%
2,DT,15195,98.9%,324,FGSM,1,13.2%,14112,92.9%
3,DT,15195,98.9%,324,FGSM,5,20.1%,13488,88.8%
4,RF,15195,99.0%,309,FGSM,1,38.1%,11607,76.4%
5,RF,15195,99.0%,309,FGSM,5,0.7%,15135,99.7%
6,ET,15195,98.9%,324,FGSM,1,84.8%,4014,26.4%
7,ET,15195,98.9%,324,FGSM,5,1.4%,15081,99.3%
8,XGBoost,15195,99.0%,312,FGSM,1,0.7%,15135,99.7%
9,XGBoost,15195,99.0%,312,FGSM,5,0.7%,15135,99.7%



=== Reverse Light Off Attack ===


Unnamed: 0,Model,Sample Size,F1 Score,FN,Attack,Epsilon,F1 Score (Adv),FN (Adv),ASR (FN)
0,DNN,10605,0.0%,10605,FGSM,1,0.0%,6387,100.0%
1,DNN,10605,0.0%,10605,FGSM,5,0.0%,6387,100.0%
2,DT,10605,72.4%,4590,FGSM,1,25.8%,5442,85.2%
3,DT,10605,72.4%,4590,FGSM,5,25.8%,5442,85.2%
4,RF,10605,71.9%,4647,FGSM,1,24.3%,5502,86.1%
5,RF,10605,71.9%,4647,FGSM,5,24.3%,5502,86.1%
6,ET,10605,72.4%,4590,FGSM,1,25.9%,5436,85.1%
7,ET,10605,72.4%,4590,FGSM,5,25.8%,5442,85.2%
8,XGBoost,10605,72.4%,4593,FGSM,1,25.8%,5442,85.2%
9,XGBoost,10605,72.4%,4593,FGSM,5,25.8%,5442,85.2%



=== max_speedometer_attack ===


Unnamed: 0,Model,Sample Size,F1 Score,FN,Attack,Epsilon,F1 Score (Adv),FN (Adv),ASR (FN)
0,DNN,17221,0.0%,17221,FGSM,1,0.0%,14846,100.0%
1,DNN,17221,0.0%,17221,FGSM,5,0.0%,14846,100.0%
2,DT,17221,98.0%,659,FGSM,1,7.0%,14310,96.4%
3,DT,17221,98.0%,659,FGSM,5,24.8%,12745,85.8%
4,RF,17221,98.3%,571,FGSM,1,3.4%,14592,98.3%
5,RF,17221,98.3%,571,FGSM,5,3.4%,14592,98.3%
6,ET,17221,98.0%,667,FGSM,1,88.1%,3151,21.2%
7,ET,17221,98.0%,667,FGSM,5,88.1%,3151,21.2%
8,XGBoost,17221,98.4%,526,FGSM,1,3.4%,14592,98.3%
9,XGBoost,17221,98.4%,526,FGSM,5,3.4%,14592,98.3%



=== Max Engine Coolant Temp Attack ===


Unnamed: 0,Model,Sample Size,F1 Score,FN,Attack,Epsilon,F1 Score (Adv),FN (Adv),ASR (FN)
0,DNN,88,0.0%,88,FGSM,1,0.0%,6,100.0%
1,DNN,88,0.0%,88,FGSM,5,0.0%,6,100.0%
2,DT,88,80.3%,29,FGSM,1,0.0%,6,100.0%
3,DT,88,80.3%,29,FGSM,5,0.0%,6,100.0%
4,RF,88,81.9%,27,FGSM,1,0.0%,6,100.0%
5,RF,88,81.9%,27,FGSM,5,0.0%,6,100.0%
6,ET,88,80.3%,29,FGSM,1,0.0%,6,100.0%
7,ET,88,80.3%,29,FGSM,5,0.0%,6,100.0%
8,XGBoost,88,82.7%,26,FGSM,1,0.0%,6,100.0%
9,XGBoost,88,82.7%,26,FGSM,5,0.0%,6,100.0%



=== Fuzzing Attack ===


Unnamed: 0,Model,Sample Size,F1 Score,FN,Attack,Epsilon,F1 Score (Adv),FN (Adv),ASR (FN)
0,DNN,1061,0.0%,1061,FGSM,1,0.0%,625,100.0%
1,DNN,1061,0.0%,1061,FGSM,5,0.0%,627,100.0%
2,DT,1061,100.0%,0,FGSM,1,99.2%,10,1.6%
3,DT,1061,100.0%,0,FGSM,5,54.7%,391,62.4%
4,RF,1061,100.0%,0,FGSM,1,99.2%,10,1.6%
5,RF,1061,100.0%,0,FGSM,5,95.2%,57,9.1%
6,ET,1061,100.0%,0,FGSM,1,100.0%,0,0.0%
7,ET,1061,100.0%,0,FGSM,5,100.0%,0,0.0%
8,XGBoost,1061,100.0%,0,FGSM,1,99.2%,10,1.6%
9,XGBoost,1061,100.0%,0,FGSM,5,99.2%,10,1.6%


In [40]:
import matplotlib.pyplot as plt

for dataset_key, df in all_results.items():
    dataset_label = dataset_labels.get(dataset_key, dataset_key)
    
    # Plot as table
    fig, ax = plt.subplots(figsize=(12, len(df)*0.3 + 2))  # adjust height
    ax.axis('tight')
    ax.axis('off')
    table = ax.table(cellText=df.values,
                     colLabels=df.columns,
                     cellLoc='center',
                     loc='center')
    
    # Save as PDF
    pdf_path = f"road/{dataset_key}_fn.pdf"
    plt.title(dataset_label)
    plt.savefig(pdf_path, bbox_inches='tight')
    plt.close(fig)

    print(f"Saved PDF for {dataset_label} at {pdf_path}")


Saved PDF for Reverse Light On Attack at road/rlona_fn.pdf
Saved PDF for Reverse Light Off Attack at road/rloffa_fn.pdf
Saved PDF for max_speedometer_attack at road/msa_fn.pdf
Saved PDF for Max Engine Coolant Temp Attack at road/mecta_fn.pdf
Saved PDF for Fuzzing Attack at road/fa_fn.pdf


In [46]:
X_normal= X_csa
X_adv_dict = {
                    1: generate_constrained_attack(dnn_art, X_normal, method="FGSM", eps=1.0, eps_step=0.1),
                    5: generate_constrained_attack(dnn_art, X_normal, method="FGSM", eps=5.0, eps_step=0.1)
                }

for eps_val, X_adv in X_adv_dict.items():
        valid_mask = constraint_compliant(X_csa)
        X_adv_valid = X_adv[valid_mask]
        if len(X_adv_valid) == 0:
            continue
#print(X_csa.head(2))
#print(X_adv_valid)

valid_mask = constraint_compliant(X_csa)
X_adv_valid = X_csa[valid_mask]
print(X_adv_valid)

[FGSM] Generated adversarial examples with eps=1.0, eps_step=0.1, max_iter=N/A
[FGSM] Generated adversarial examples with eps=5.0, eps_step=0.1, max_iter=N/A
Empty DataFrame
Columns: [CAN ID, DLC, DATA_0, DATA_1, DATA_2, DATA_3, DATA_4, DATA_5, DATA_6, DATA_7]
Index: []
