In [50]:
from math import ceil
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import mean_squared_error, accuracy_score, precision_score, recall_score, \
    f1_score, ConfusionMatrixDisplay, roc_auc_score, RocCurveDisplay

In [51]:
def transform_dataset(dataset: pd.DataFrame):
    df = dataset.copy()
    df['CAN ID'] = df['CAN ID'].apply(lambda i: int(i, 16))
    df['Flag'] = df['Flag'].apply(lambda x: 0 if x == 'R' else 1)

    for i in range(8):
        df[f'DATA{i}'] = df[f'DATA{i}'].apply(lambda i: int(i, 16))

    for col in df.columns:
        df[col] = pd.to_numeric(df[col])
    
    return df

In [52]:
# Training data for baseline model and GridSearch
def splitting_data(dataset: pd.DataFrame):
    data_size = len(dataset)
    train_size = ceil(data_size * .8)
    test_size = ceil((data_size - train_size) * .5)
    print(train_size, test_size)
    train = dataset.iloc[:train_size]
    vali = dataset.iloc[train_size:train_size+test_size]
    test = dataset.iloc[train_size+test_size:]

    X_train = train.drop(columns=['Flag'])
    y_train = train['Flag']

    X_val = vali.drop(columns=['Flag'])
    y_val = vali['Flag']

    X_test = test.drop(columns=['Flag'])
    y_test = test['Flag']

    return X_train, X_val, X_test, y_train, y_val, y_test

In [53]:
# Splitting the whole dataset into 10 chunks for K-fold cross validation
def splitting_to_k_chunks(dataset, k=10):
    return np.array_split(dataset, k)

In [54]:
# def k_fold_splitting(dataset, k=10):
#     average_size = ceil(len(dataset) / k)
#     k_folds_dataset = []
#     for i in range(k):
#         train_set = 

In [55]:
folder_path = '/Users/gabrielchung/Documents/Data 245/Final Presentation/Car Hacking Dataset/9) Car-Hacking Dataset/{0}_dataset.csv'
column_names = ['Timestamp', 'CAN ID', 'DLC', 'DATA0', 'DATA1', 'DATA2', 'DATA3', 'DATA4', 'DATA5', 'DATA6', 'DATA7', 'Flag']
attacks = {}
for attack in ['DoS', 'Fuzzy', 'gear', 'RPM']:
    data = pd.read_csv(folder_path.format(attack), names=column_names)
    data = data.dropna()
    attacks[attack] = data

In [56]:
for att, data in attacks.items():
    print(att)
    attacks[att] = transform_dataset(data)

DoS
Fuzzy
gear
RPM


In [57]:
model_data = {
    "X_train": [],
    "X_val": [],
    "X_test": [],
    "y_train": [],
    "y_test": [],
    "y_val": [],
}
for att, data in attacks.items():
    x_tr, x_val, x_te, y_tr, y_val, y_te = splitting_data(data)
    model_data["X_train"].append(x_tr)
    model_data["X_val"].append(x_val)
    model_data["X_test"].append(x_te)
    model_data["y_train"].append(y_tr)
    model_data["y_val"].append(y_val)
    model_data["y_test"].append(y_te)

2907667 363458
3000820 375102
3522382 440298
3664181 458023


In [58]:
X_train = pd.concat(model_data["X_train"]).values
y_train = pd.concat(model_data["y_train"]).values

In [59]:
train_mses = []
val_mses = {
    'DoS': [],
    'Fuzzy': [],
    'gear': [],
    'RPM': [],
}
# evaluations = pd.DataFrame(columns=['Attack', 'Params', 'Accuracy', 'Precision', 'Recall', 'F1-Score'])
evaluations = []
dtc = DecisionTreeClassifier()

dtc.fit(X_train, y_train)

y_train_pred = dtc.predict(X_train)
train_mse = mean_squared_error(y_train, y_train_pred)
train_mses.append(train_mse)

for i, att in enumerate(['DoS', 'Fuzzy', 'gear', 'RPM']):
    # evaluate the accuracy on the validation set
    y_val_pred = dtc.predict(model_data["X_val"][i])
    val_mse = mean_squared_error(model_data["y_val"][i], y_val_pred)
    val_mses[att].append(val_mse)


    y_pred = dtc.predict(model_data["X_test"][i])
    evaluations.append({
        "Attack": att,
        "Params": None,
        "Accuracy": accuracy_score(model_data["y_test"][i], y_pred),
        "Precision": precision_score(model_data["y_test"][i], y_pred),
        "Recall": recall_score(model_data["y_test"][i], y_pred),
        "F1-Score": f1_score(model_data["y_test"][i], y_pred)
    })

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))


In [60]:
max_depths = [3, 5, 7, 9]
min_samples_split = [2, 4, 6]
criterion = ["entropy", "log_loss"]
params_grid = [{ "max_depth": md, "min_samples_split": mss } for mss in min_samples_split for md in max_depths]
for cr in criterion:
    for pg in params_grid:
        pg["criterion"] = cr

for param_grid in params_grid:
    dtc = DecisionTreeClassifier(**param_grid)
    dtc.fit(X_train, y_train)

    y_train_pred = dtc.predict(X_train)
    train_mse = mean_squared_error(y_train, y_train_pred)
    train_mses.append(train_mse)
    
    for i, att in enumerate(['DoS', 'Fuzzy', 'gear', 'RPM']):
        # evaluate the accuracy on the validation set
        y_val_pred = dtc.predict(model_data["X_val"][i])
        val_mse = mean_squared_error(model_data["y_val"][i], y_val_pred)
        val_mses[att].append(val_mse)

        y_pred = dtc.predict(model_data["X_test"][i])
        evaluations.append({
            "Attack": att,
            "Params": f"max_depth: {param_grid['max_depth']}, min_samples_split: {param_grid['min_samples_split']}, criterion: {param_grid['criterion']}",
            "Accuracy": accuracy_score(model_data["y_test"][i], y_pred),
            "Precision": precision_score(model_data["y_test"][i], y_pred),
            "Recall": recall_score(model_data["y_test"][i], y_pred),
            "F1-Score": f1_score(model_data["y_test"][i], y_pred)
        })


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, "true nor predicted", "F-score is", len(true_sum))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_

In [None]:
evaluations = pd.concat([pd.DataFrame([e]) for e in evaluations])

In [None]:

print(evaluations)
print(evaluations.columns)
print(train_mses)
# plot the results
# plt.plot(depths, train_accs, label='Training Accuracy')
# plt.plot(depths, val_accs, label='Validation Accuracy')
# plt.xlabel('Tree Depth')
# plt.ylabel('Accuracy')
# plt.legend()
# plt.show()

           0
0     Attack
0     Params
0   Accuracy
0  Precision
0     Recall
0   F1-Score
RangeIndex(start=0, stop=1, step=1)
[0.0, 0.08301701788080229, 0.08301701788080229, 0.008123603957220477, 0.001972882883226868, 0.0003899946926510399, 0.08301701788080229, 0.008123603957220477, 0.001972882883226868, 0.0003899946926510399, 0.08301701788080229, 0.008123603957220477, 0.001972882883226868, 0.0003899946926510399]
