In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import sys
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from imblearn.under_sampling import RandomUnderSampler
from sklearn.ensemble import  RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split

from utils.helpers import *
from utils.models import *
from utils.params import *

import warnings
warnings.filterwarnings("ignore")

if not sys.warnoptions:
    warnings.simplefilter("ignore")
    os.environ["PYTHONWARNINGS"] = "ignore"

setSeed()

## 🤖 Baseline

In [None]:
def evaluate_baseline(vehicle, paths):
    datasetPath = f'./dataset/{vehicle}_multi.csv'

    df = pd.read_csv(datasetPath)
    df = df.rename(columns={'Flag': 'Class'})

    features = df.drop(['Class'], axis=1).values
    labels = df['Class'].values

    features, labels = RandomUnderSampler(random_state=seed).fit_resample(features, labels)

    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=seed)

    # Convert data to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.FloatTensor(y_train).unsqueeze(1)

    X_test_tensor = torch.FloatTensor(X_test)
    y_test_tensor = torch.FloatTensor(y_test).unsqueeze(1)

    # Create DataLoader for training set
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    # Create DataLoader for test set
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    ### FCN ###

    # Initialize the FCN model
    input_size_fcn = len(df.columns) - 1
    hidden_size_fcn = 64
    output_size_fcn = 4
    model_fcn = FCNMultiClass(input_size_fcn, hidden_size_fcn, output_size_fcn).to(device)

    # Initialize the optimizer for the FCN model
    optimizer_fcn = optim.Adam(model_fcn.parameters(), lr=0.001)

    fcnPath = paths[0]

    model_fcn.load_state_dict(torch.load(fcnPath))

    # Evaluate the FCN model
    accuracy_fcn, f1_fcn = evaluate_multi_class_model(model_fcn, test_dataloader, device)
    print(f'[👑 FCN {vehicle}] Accuracy: {accuracy_fcn:.3f}, F1: {f1_fcn:.3f}')

    ### CNN ###

    # Initialize the CNN model
    input_size_cnn = len(df.columns) - 1
    output_size_cnn = 4
    model_cnn = CNNMultiClass(input_size_cnn, output_size_cnn).to(device)

    # Initialize the optimizer for the CNN model
    optimizer_cnn = optim.Adam(model_cnn.parameters(), lr=0.001)

    cnnPath = paths[1]

    model_cnn.load_state_dict(torch.load(cnnPath))

    # Evaluate the CNN model
    accuracy_cnn, f1_cnn = evaluate_multi_class_model(model_cnn, test_dataloader, device)
    print(f'[👑 CNN {vehicle}] Accuracy: {accuracy_cnn:.3f}, F1: {f1_cnn:.3f}')

    ### LSTM ###
    input_size_lstm = len(df.columns) - 1  # Adjust this based on your data
    hidden_size_lstm = 64
    output_size_lstm = 4
    model_lstm = LSTMMultiClass(input_size_lstm, hidden_size_lstm, output_size_lstm).to(device)
    
    optimizer_lstm = optim.Adam(model_lstm.parameters(), lr=0.001)

    lstmPath = paths[2]

    model_lstm.load_state_dict(torch.load(lstmPath))

    # Evaluate the FCN model
    accuracy_lstm, f1_lstm = evaluate_multi_class_model(model_lstm, test_dataloader, device)
    print(f'[👑 LSTM {vehicle}] Accuracy: {accuracy_lstm:.3f}, F1: {f1_lstm:.3f}\n')

In [None]:
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/FCN_multi.pth', f'./models/{vehicle}/CNN_multi.pth', f'./models/{vehicle}/LSTM_multi.pth']
    evaluate_baseline(vehicle, paths)

## ⚔️ Attacks

In [None]:
resultsFolder = './results/'
transferabilityFolder = os.path.join(resultsFolder, 'transferability')

sonata_t = os.path.join(transferabilityFolder, 'sonata.csv')
soul_t = os.path.join(transferabilityFolder, 'soul.csv')
spark_t = os.path.join(transferabilityFolder, 'spark.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]

df = pd.concat(dfs)

In [None]:
def attacks_evaluation(df, bw='white'):
    """
    Takes in input a list of DataFrames and white/gray/black-box type evaluation
    """

    models_acc = []
    models_f1 = []

    attacks_acc = []
    attacks_f1 = []

    accuracy = []
    f1 = []

    if bw == 'white':
        sub = df[df['Target_Vehicle'] == df['Source_Vehicle']]
        sub = sub[sub['Target_Model'] == sub['Source_Model']]
    elif bw == 'gray':
        sub = df[(df['Target_Vehicle'] != df['Source_Vehicle']) & (df['Target_Model'] == df['Source_Model']) |
                 (df['Target_Vehicle'] == df['Source_Vehicle']) & (df['Target_Model'] != df['Source_Model'])]
        # sub = df[df['Target_Vehicle'] != df['Source_Vehicle']]
        # sub = sub[sub['Target_Model'] == sub['Source_Model']]
    elif bw == 'black':
        sub = df[df['Target_Vehicle'] != df['Source_Vehicle']]
        sub = sub[sub['Target_Model'] != sub['Source_Model']]    
    else:
        raise ValueError('bw must be either white or black')

    for model_name in model_names:
        model_df = sub[sub['Target_Model'] == model_name]

        models_acc.append(model_df['Accuracy'].mean()) 
        models_f1.append(model_df['F1'].mean())

    for attack_name in attack_names:
        attack_df = sub[sub['Attack'] == attack_name]

        attacks_acc.append(attack_df['Accuracy'].mean())
        attacks_f1.append(attack_df['F1'].mean())
        
    accuracy.append(sub['Accuracy'].mean())
    f1.append(sub['F1'].mean())
    

    return models_acc, models_f1, attacks_acc, attacks_f1, np.mean(accuracy), np.mean(f1)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')

In [None]:
def attacks_evaluation_eps(df, eps):
    """
    Takes in input a list of DataFrames and white/gray/black-box type evaluation
    """

    models_acc = []
    models_f1 = []

    attacks_acc = []
    attacks_f1 = []

    accuracy = []
    f1 = []

    if eps == 0.1:
        sub = df[df['Epsilon'] == 0.1]
    elif eps == 0.2:
        sub = df[df['Epsilon'] == 0.2]
    elif eps == 0.3:
        sub = df[df['Epsilon'] == 0.3]
    else:
        raise ValueError('eps must be either 0.1, 0.2 or 0.3')

    for model_name in model_names:
        model_df = sub[sub['Target_Model'] == model_name]

        models_acc.append(model_df['Accuracy'].mean()) 
        models_f1.append(model_df['F1'].mean())

    for attack_name in attack_names:
        attack_df = sub[sub['Attack'] == attack_name]

        attacks_acc.append(attack_df['Accuracy'].mean())
        attacks_f1.append(attack_df['F1'].mean())
        
    accuracy.append(sub['Accuracy'].mean())
    f1.append(sub['F1'].mean())
    

    return models_acc, models_f1, attacks_acc, attacks_f1, np.mean(accuracy), np.mean(f1)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation_eps(df, eps=0.1)
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation_eps(df, eps=0.2)
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation_eps(df, eps=0.3)

print('[MODELS F1]\t[0.1]\t[0.2]\t[0.3]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[0.1]\t[0.2]\t[0.3]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')

In [None]:
e1 = [0.367, 0.337, 0.329, 0.348]
e2 = [0.367, 0.276, 0.272, 0.348]
e3 = [0.367, 0.199, 0.237, 0.348]

num_groups = 4

bar_width = 0.2

x = np.arange(num_groups)

fig, ax = plt.subplots()

fig.set_size_inches(5, 3.5)

for i in range(num_groups):
    ax.bar(x[i] - bar_width, e1[i], bar_width, capsize=5, label='WebServer', color='tab:blue', zorder=3)
    ax.bar(x[i], e2[i], bar_width, capsize=5, label='Routing', color='tab:orange', zorder=3)
    ax.bar(x[i] + bar_width, e3[i], bar_width, capsize=5, label='Intradomain', color='tab:green', zorder=3)

ax.set_xlabel('Attacks')
ax.set_ylabel('Average F1 Score')
ax.set_ylim([0, 0.5])
ax.set_xticks(x)
ax.set_xticklabels(['BIM', 'FGSM', 'PGD', 'RFGSM'])
ax.legend(['ε = 0.1', 'ε = 0.2', 'ε = 0.3'])

plt.tight_layout()
ax.grid(axis='y', linestyle='-', zorder=0)
plt.savefig('./figures/Attacks.pdf', format='pdf')
plt.show()

## 💪 Adversarial Training

### ⚙️ Fine Tuning

#### All Attacks

In [None]:
advFolder = os.path.join(resultsFolder, 'adversarial-training/all-attacks')

sonata_t = os.path.join(advFolder, 'sonata.csv')
soul_t = os.path.join(advFolder, 'soul.csv')
spark_t = os.path.join(advFolder, 'spark.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]
df = pd.concat(dfs)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')
print()
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/adversarial-training/all-attacks/FCN.pth', f'./models/{vehicle}/adversarial-training/all-attacks/CNN.pth', f'./models/{vehicle}/adversarial-training/all-attacks/LSTM.pth']
    evaluate_baseline(vehicle, paths)

#### All Models

In [None]:
advFolder = os.path.join(resultsFolder, 'adversarial-training/all-models')

sonata_t = os.path.join(advFolder, 'sonata.csv')
soul_t = os.path.join(advFolder, 'soul.csv')
spark_t = os.path.join(advFolder, 'spark.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]
df = pd.concat(dfs)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')
print()
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/adversarial-training/all-models/FCN.pth', f'./models/{vehicle}/adversarial-training/all-models/CNN.pth', f'./models/{vehicle}/adversarial-training/all-models/LSTM.pth']
    evaluate_baseline(vehicle, paths)

#### All Vehicles

In [None]:
advFolder = os.path.join(resultsFolder, 'adversarial-training/all-vehicles')

sonata_t = os.path.join(advFolder, 'sonata.csv')
soul_t = os.path.join(advFolder, 'soul.csv')
spark_t = os.path.join(advFolder, 'spark.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]
df = pd.concat(dfs)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')
print()
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/adversarial-training/all-vehicles/FCN.pth', f'./models/{vehicle}/adversarial-training/all-vehicles/CNN.pth', f'./models/{vehicle}/adversarial-training/all-vehicles/LSTM.pth']
    evaluate_baseline(vehicle, paths)

### 🕸️ Online Adversarial Training

#### All Attacks

In [None]:
advFolder = os.path.join(resultsFolder, 'adversarial-training/all-attacks')

sonata_t = os.path.join(advFolder, 'sonata_online.csv')
soul_t = os.path.join(advFolder, 'soul_online.csv')
spark_t = os.path.join(advFolder, 'spark_online.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]
df = pd.concat(dfs)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')
print()
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/adversarial-training/all-attacks/FCN_online.pth', f'./models/{vehicle}/adversarial-training/all-attacks/CNN_online.pth', f'./models/{vehicle}/adversarial-training/all-attacks/LSTM_online.pth']
    evaluate_baseline(vehicle, paths)

#### All Models

In [None]:
advFolder = os.path.join(resultsFolder, 'adversarial-training/all-models')

sonata_t = os.path.join(advFolder, 'sonata_online.csv')
soul_t = os.path.join(advFolder, 'soul_online.csv')
spark_t = os.path.join(advFolder, 'spark_online.csv')

sonata_df = pd.read_csv(sonata_t)
soul_df = pd.read_csv(soul_t)
spark_df = pd.read_csv(spark_t)

dfs = [sonata_df, soul_df, spark_df]
df = pd.concat(dfs)

In [None]:
models_acc_wb, models_f1_wb, attacks_acc_wb, attacks_f1_wb, accuracy_wb, f1_wb = attacks_evaluation(df, bw='white')
models_acc_gb, models_f1_gb, attacks_acc_gb, attacks_f1_gb, accuracy_gb, f1_gb = attacks_evaluation(df, bw='gray')
models_acc_bb, models_f1_bb, attacks_acc_bb, attacks_f1_bb, accuracy_bb, f1_bb = attacks_evaluation(df, bw='black')

print('[MODELS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, model_name in enumerate(model_names):
    print(f'[{model_name}]\t\t{models_f1_wb[i]:.3f}\t{models_f1_gb[i]:.3f}\t{models_f1_bb[i]:.3f}')
print()
print('[ATTACKS F1]\t[WHITE]\t[GRAY]\t[BLACK]')
for i, attack_name in enumerate(attack_names):
    print(f'[{attack_name}]\t\t{attacks_f1_wb[i]:.3f}\t{attacks_f1_gb[i]:.3f}\t{attacks_f1_bb[i]:.3f}')
print()
for vehicle in vehicles:
    paths = [f'./models/{vehicle}/adversarial-training/all-models/FCN_online.pth', f'./models/{vehicle}/adversarial-training/all-models/CNN_online.pth', f'./models/{vehicle}/adversarial-training/all-models/LSTM_online.pth']
    evaluate_baseline(vehicle, paths)