In [None]:
import sklearn as sk
import pandas as pd
import os 
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score
from imblearn.over_sampling import SMOTE
from sklearn.metrics import f1_score
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import StackingClassifier


from google.colab import drive
drive.mount('/content/drive')

def read_CAN_trace(file):
    dataset = pd.read_csv(file, sep=',')
    return dataset

def load_dataset(folder):
    datasets = []
    for filename in os.listdir(folder):
        f = os.path.join(folder, filename)
        datasets.append(read_CAN_trace(f))

    dataset = pd.DataFrame(columns=datasets[0].columns)
    for d in datasets:
        dataset=pd.concat([dataset, d], ignore_index=True)
    return dataset

def from_hex_to_float(val):
  val = float.fromhex(val)
  return val

def normalize_dataset(dataset):
  MAX_VALUE_CAN_ID = max(dataset['CAN_ID'])
  MIN_VALUE_CAN_ID = min(dataset['CAN_ID'])
  DIFF_MAX_MIN_ID = MAX_VALUE_CAN_ID - MIN_VALUE_CAN_ID

  def normalize_value_ID(val):
    x_n = (val - MIN_VALUE_CAN_ID)/DIFF_MAX_MIN_ID
    return x_n
  dataset['CAN_ID'] = dataset['CAN_ID'].apply(normalize_value_ID)
  
  return dataset

def dataset_preprocessing(dataset):
  from numpy import float64, float32
  dataset=dataset.dropna() 
  dataset['CAN_ID'] =dataset['CAN_ID'].apply(from_hex_to_float)
  dataset['PAYLOAD_HEX']=dataset['PAYLOAD_HEX'].apply(from_hex_to_float)
  dataset['ANOMALY']=dataset['ANOMALY'].astype(bool)
  dataset['PAYLOAD_BIN']=dataset['PAYLOAD_BIN'].astype(float32)
  dataset['PAYLOAD_HEX']=dataset['PAYLOAD_HEX'].astype(float32)
  dataset=dataset.drop(columns=['PAYLOAD_BIN', 'DLC', 'timestamp']) 

  dataset = normalize_dataset(dataset)
  return dataset

def print_results(y_true, predicted):
  print(f'accuracy  --> {accuracy_score(y_true,predicted)}')
  print(f'Confusion Matrix --> \n {confusion_matrix(y_true,predicted)}')
  print(f'F-1 Score --> {f1_score(y_true, predicted)}')


def make_dataset_id(paths):
  #Make Pandas Dataframe from multiple paths of can txt trace
  datasets = []
  for p in paths:
    datasets.append(read_CAN_trace(p))

  dataset = pd.DataFrame(columns=datasets[0].columns)
  for d in datasets:
    dataset=pd.concat([dataset, d], ignore_index=True)
  return dataset


def make_train_and_test(paths):
  dataset = make_dataset_id(paths)
  dataset = dataset_preprocessing(dataset)

  x_train, x_test, y_train, y_test = train_test_split(dataset.drop(columns=['ANOMALY']), dataset['ANOMALY'], random_state=0, train_size=(3/4), shuffle=True)
  
  smote = SMOTE(random_state = 2)
  x_train_res, y_train_res = smote.fit_resample(x_train, y_train)

  estimators = [
              ('dt', DecisionTreeClassifier(max_depth=8, min_samples_split=8, min_samples_leaf=3)),
              ('rf', RandomForestClassifier(verbose = 2, n_estimators = 200, max_depth=8, min_samples_split=8, min_samples_leaf=3, n_jobs=-1)),
              ('ext', ExtraTreesClassifier(verbose = 2, n_estimators = 200, max_depth=8, min_samples_split=8, min_samples_leaf=3, n_jobs=-1)),
              ('xgbc', XGBClassifier(n_estimators = 200, tree_method = 'exact'))

  ]

  stackClassifier = StackingClassifier(estimators = estimators, verbose = 3)
  stackClassifier.fit(x_train_res, y_train_res)

  pred = stackClassifier.predict(x_test)
  print_results(y_test, pred)

  return stackClassifier

def make_SMOTE(x_train, y_train):
  smote = SMOTE(random_state = 2)
  x_train_res, y_train_res = smote.fit_resample(x_train, y_train)

  return x_train_res, y_train_res

def save_pickle_file(file_name, var_to_dump ):
  with open(file_name, "wb") as open_file:
    pickle.dump(var_to_dump, open_file)
def load_pickle_file(file_name):
  with open(file_name, "rb") as open_file:
    f = pickle.load(open_file)
  return f

Mounted at /content/drive


In [None]:
"""
n_7 OSR
"""

path_file_1 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_01.can.txt"
path_file_2 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_02.can.txt"
path_file_3 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_03.can.txt"
path_file_4 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_04.can.txt"
path_file_5 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_05.can.txt"
path_file_6 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_06.can.txt"
path_file_7 = "/Dataset_DAGA/infected/OrderedSequenceReplay/n_7_V40_07.can.txt"

paths = [path_file_1, path_file_2, path_file_3, path_file_4]

In [None]:
dataset = make_dataset_id(paths)
dataset = dataset_preprocessing(dataset)
dataset

In [None]:
classifier = make_train_and_test(paths)
"""
Saving models
"""
import pickle

file_name = "/DAGA_classifier/OSR_n_7_StackClassifier.pkl"
with open(file_name, "wb") as open_file:
  pickle.dump(classifier, open_file)

In [None]:
dataset = read_CAN_trace(path_file_5)
dataset = dataset_preprocessing(dataset)

Y = dataset['ANOMALY']
x = dataset.drop(columns=['ANOMALY'])

pred = classifier.predict(x)
print("results: .......")
print_results(Y, pred)