In [1]:
import pandas as pd
# import vaex
import numpy as np
import glob
import dask.dataframe as dd
import json
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tqdm import tqdm

In [2]:
print('Tensorflow version', tf.__version__)

Tensorflow version 2.11.0


In [3]:
def fill_flag(sample):
    if not isinstance(sample['Flag'], str):
        col = 'Data' + str(sample['DLC'])
        sample['Flag'] = sample[col]
    return sample

In [4]:
def convert_canid_bits(cid):
    try:
        s = bin(int(str(cid), 16))[2:].zfill(29)
        bits = list(map(int, list(s)))
        return bits
    except:
        return None

In [5]:
# Read by dask first
attributes = ['Timestamp', 'canID', 'DLC',
                           'Data0', 'Data1', 'Data2',
                           'Data3', 'Data4', 'Data5',
                           'Data6', 'Data7', 'Flag']
dataset_path  = '../data/car-hacking/'
attack_types = ['DoS', 'Fuzzy', 'gear', 'RPM']
attack = attack_types[0]
file_name = '{}{}_dataset.csv'.format(dataset_path, attack)
print(file_name)
# df = pd.read_csv(file_name, header=None, names=attributes)
# for f in files[1]:
#     print('Reading file: ', f)
#     df = df.append(pd.read_csv(f, header=None))

../data/car-hacking/Fuzzy_dataset.csv


In [38]:
df2 = dd.read_csv(file_name, header=None)
print(df2)

Dask DataFrame Structure:
                    0       1      2       3       4      5       6       7       8        9       10      11
npartitions=3                                                                                                
               float64  object  int64  object  object  int64  object  object  object  float64  object  object
                   ...     ...    ...     ...     ...    ...     ...     ...     ...      ...     ...     ...
                   ...     ...    ...     ...     ...    ...     ...     ...     ...      ...     ...     ...
                   ...     ...    ...     ...     ...    ...     ...     ...     ...      ...     ...     ...
Dask Name: read-csv, 1 graph layer


In [6]:
def preprocess(file_name):
    # Read CSV file
    df = dd.read_csv(file_name, header=None, names=attributes, dtype={'Data1': 'object', 'Data2': 'object', 'Data4': 'object',
       'Data6': 'object'}) # Data2 & Data 6 is object - something field in hex can't convert to int or float
    print('Reading from {}: DONE'.format(file_name))
    print('Dask processing: -------------')
    df = df.apply(fill_flag, axis=1, meta={'Timestamp': 'float64', 'canID': 'object', 'DLC': 'int64', 'Data0': 'object', 'Data1': 'object', 'Data2': 'int64', 'Data3': 'object', 'Data4': 'object', 'Data5': 'object', 'Data6': 'float64', 'Data7': 'object', 'Flag': 'object'})
    pd_df = df.compute()
    pd_df = pd_df[['Timestamp', 'canID', 'Flag']].sort_values('Timestamp',  ascending=True)
    pd_df['canBits'] = pd_df.canID.apply(convert_canid_bits)
    pd_df['Flag'] = pd_df['Flag'].apply(lambda x: True if x == 'T' else False)
    print('Dask processing: DONE')
    print('Aggregate data -----------------')
    as_strided = np.lib.stride_tricks.as_strided
    win = 29
    s = 29
    feature = as_strided(pd_df.canBits, ((len(pd_df) - win) // s + 1, win), (8*s, 8)) #Stride is counted by bytes
    label = as_strided(pd_df.Flag, ((len(pd_df) - win) // s + 1, win), (1*s, 1))
    df = pd.DataFrame({
        'features': pd.Series(feature.tolist()),
        'label': pd.Series(label.tolist())
    }, index= range(len(feature)))

    df['label'] = df['label'].apply(lambda x: 1 if any(x) else 0)
    print('Preprocessing: DONE')
    print('#Normal: ', df[df['label'] == 0].shape[0])
    print('#Attack: ', df[df['label'] == 1].shape[0])
    return df[['features', 'label']].reset_index().drop(['index'], axis=1)

In [14]:
df = dd.read_csv(file_name, header=None, names=attributes, dtype={'Data2': 'object',
       'Data6': 'object'})
print('Reading from {}: DONE'.format(file_name))
print('Dask processing: -------------')
df = df.apply(fill_flag, axis=1, meta={'Timestamp': 'float64', 'canID': 'object', 'DLC': 'int64', 'Data0': 'object', 'Data1': 'object', 'Data2': 'int64', 'Data3': 'object', 'Data4': 'object', 'Data5': 'object', 'Data6': 'float64', 'Data7': 'object', 'Flag': 'object'})
pd_df = df.compute()
pd_df = pd_df[['Timestamp', 'canID', 'Flag', 'DLC']].sort_values('Timestamp',  ascending=True)
#pd_df['canBits'] = pd_df.canID.apply(convert_canid_bits)
pd_df['Flag'] = pd_df['Flag'].apply(lambda x: True if x == 'T' else False)
filtered_df = pd_df[pd_df['Flag'] == False]

canIDs = []
for index, row in pd_df.iterrows():
    canIDs.append(row['canID'])

print(canIDs)

Reading from ../data/car-hacking/Fuzzy_dataset.csv: DONE
Dask processing: -------------


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [8]:
df = preprocess(file_name)

Reading from ../data/car-hacking/Fuzzy_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  87888
#Attack:  44486


In [9]:
pd_df[pd_df['canID'] == '02b0']['Timestamp'].diff()

1              NaN
19        0.009996
42        0.010007
60        0.009996
81        0.010007
            ...   
338341    1.865040
341957    2.013793
353435    6.547768
363092    5.330678
367266    3.116163
Name: Timestamp, Length: 53477, dtype: float64

In [10]:
print('#Normal:', pd_df[pd_df['Flag'] == False].shape[0])
print('#Attack:', pd_df[pd_df['Flag'] == True].shape[0])

#Normal: 3347013
#Attack: 491847


In [11]:
as_strided = np.lib.stride_tricks.as_strided
win = 29
s = 29
feature = as_strided(pd_df.canBits, ((len(pd_df) - win) // s + 1, win), (8*s, 8))
label = as_strided(pd_df.Flag, ((len(pd_df) - win) // s + 1, win), (1*s, 1))
df = pd.DataFrame({
    'features': pd.Series(feature.tolist()),
    'label': pd.Series(label.tolist())
}, index= range(len(feature)))

df['label'] = df['label'].apply(lambda x: 1 if any(x) else 0)

In [12]:
print('#Normal: ', df[df['label'] == 0].shape[0])
print('#Attack: ', df[df['label'] == 1].shape[0])

#Normal:  87888
#Attack:  44486


In [13]:
def serialize_example(x, y):
    """converts x, y to tf.train.Example and serialize"""
    #Need to pay attention to whether it needs to be converted to numpy() form
    input_features = tf.train.Int64List(value = np.array(x).flatten())
    label = tf.train.Int64List(value = np.array([y]))
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(int64_list = input_features),
            "label" : tf.train.Feature(int64_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()

In [14]:
def read_tfrecord(example):
    input_dim = 841
    feature_description = {
    'input_features': tf.io.FixedLenFeature([input_dim], tf.int64),
    'label': tf.io.FixedLenFeature([1], tf.int64)
    }
    return tf.io.parse_single_example(example, feature_description)

In [15]:
def data_from_tfrecord(tf_filepath, batch_size, repeat_time):
    data = tf.data.TFRecordDataset(tf_filepath)
    data = data.map(read_tfrecord)
    data = data.shuffle(2)
    data = data.repeat(repeat_time + 1)
    data = data.batch(batch_size)
    # print(tf.data.experimental.cardinality(data))
    iterator = data.make_one_shot_iterator()
    return iterator.get_next()

In [16]:
def data_helper(data_tf, sess):
    n_labels = 2
    data = sess.run(data_tf)
    x, y = data['input_features'], data['label']
    size = x.shape[0]
    y_one_hot = np.eye(n_labels)[y].reshape([size, n_labels])
    return x, y_one_hot

In [17]:
def get_size(file_path):
    dataset = data_from_tfrecord(file_path, 1000, 0)
    # print(tf.data.experimental.cardinality(dataset).numpy())
    init = tf.global_variables_initializer()
    size = 0
    with tf.Session() as sess:
        sess.run(init)
        while True:
            try:
                x_l, y_l = data_helper(dataset, sess)
                size += x_l.shape[0]
            except Exception as e:
                print(type(e).__name__)
                break

    return size

In [35]:
def write_tfrecord(data, filename):
    print('Writing {}================= '.format(filename))
    print("Debug=================")
    print(data)
    tfrecord_writer = tf.io.TFRecordWriter(filename)
    for _, row in tqdm(data.iterrows()):
        tfrecord_writer.write(serialize_example(row['features'], row['label']))
    tfrecord_writer.close()

In [19]:
def train_test_split(source_path, dest_path, DATASET_SIZE,\
                     train_size = 500 * 1000, train_label_size = 100 * 1000):
    # dataset = data_from_tfrecord('./Data/TFRecord/DoS', 1000, 0)
    #DATASET_SIZE = data_info['./Data/TFRecord/DoS']
    #train_size = 500 * 1000
    #train_label_size = 100 * 1000
    val_size = int((DATASET_SIZE - train_size) * 0.2)
    test_size = DATASET_SIZE - train_size - val_size

    print(train_size, val_size, test_size)

    dataset = tf.data.TFRecordDataset(source_path)
    dataset = dataset.shuffle(1000000)
    dataset = dataset.map(read_tfrecord)

    train = dataset.take(train_size)
    train_label = train.take(train_label_size)
    train_unlabel = train.skip(train_label_size)

    val = dataset.skip(train_size)
    test = val.skip(val_size)
    val = val.take(val_size)

    batch_size = 10000
    train_label = train_label.batch(batch_size)
    train_unlabel = train_unlabel.batch(batch_size)
    test = test.batch(batch_size)
    val = val.batch(batch_size)

    train_test_info = {
        "train_unlabel": train_size - train_label_size,
        "train_label": train_label_size,
        "validation": val_size,
        "test": test_size
    }
    json.dump(train_test_info, open(dest_path + 'datainfo.txt', 'w'))
    write_tfrecord(train_label, dest_path + 'train_label')
    write_tfrecord(train_unlabel, dest_path + 'train_unlabel')
    write_tfrecord(test, dest_path + 'test')
    write_tfrecord(val, dest_path + 'val')

In [20]:
def create_train_test(df):
    print('Create train - test - val: ')
    train, test = train_test_split(df, test_size=0.3, shuffle=True)
    train, val = train_test_split(train, test_size=0.2, shuffle=True)
    train_ul, train_l = train_test_split(train, test_size=0.1, shuffle=True)
    train_ul = train_ul.reset_index().drop(['index'], axis=1)
    train_l = train_l.reset_index().drop(['index'], axis=1)
    test = test.reset_index().drop(['index'], axis=1)
    val = val.reset_index().drop(['index'], axis=1)

    data_info = {
        "train_unlabel": train_ul.shape[0],
        "train_label": train_l.shape[0],
        "validation": val.shape[0],
        "test": test.shape[0]
    }

    return data_info, train_ul, train_l, val, test

In [21]:
def main(indir, outdir, attacks):
    data_info = {}
    for attack in attacks:
        print('Attack: {} ==============='.format(attack))
        finput = '{}/{}_dataset.csv'.format(indir, attack)
        df = preprocess(finput)
        print("Writing...................")
        foutput_attack = '{}/{}'.format(outdir, attack)
        foutput_normal = '{}/Normal_{}'.format(outdir, attack)
        df_attack = df[df['label'] == 1]
        df_normal = df[df['label'] == 0]
        write_tfrecord(df_attack, foutput_attack)
        write_tfrecord(df_normal, foutput_normal)

        data_info[foutput_attack] = df_attack.shape[0]
        data_info[foutput_normal] = df_normal.shape[0]

    json.dump(data_info, open('{}/datainfo.txt'.format(outdir), 'w'))
    print("DONE!")

In [24]:
%time
main("../data/car-hacking", "../data/TFRecord/", attack_types)

CPU times: user 1e+03 ns, sys: 0 ns, total: 1e+03 ns
Wall time: 4.29 µs
Reading from ../data/car-hacking/DoS_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  88954
#Attack:  37451
Writing...................


37451it [00:40, 926.33it/s]




88954it [01:34, 938.14it/s]


Reading from ../data/car-hacking/Fuzzy_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  87888
#Attack:  44486
Writing...................


44486it [00:48, 913.29it/s]




87888it [01:37, 904.95it/s]


Reading from ../data/car-hacking/gear_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  87928
#Attack:  65283
Writing...................


65283it [01:13, 892.04it/s]




87928it [01:35, 924.26it/s]


Reading from ../data/car-hacking/RPM_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  87997
#Attack:  71372
Writing...................


71372it [01:18, 913.72it/s]




87997it [01:35, 924.55it/s]

DONE!





In [29]:
data_info = json.load(open('../data/TFRecord/datainfo.txt'))
data_info

{'../data/TFRecord//DoS': 37451,
 '../data/TFRecord//Normal_DoS': 88954,
 '../data/TFRecord//Fuzzy': 44486,
 '../data/TFRecord//Normal_Fuzzy': 87888,
 '../data/TFRecord//gear': 65283,
 '../data/TFRecord//Normal_gear': 87928,
 '../data/TFRecord//RPM': 71372,
 '../data/TFRecord//Normal_RPM': 87997}

In [47]:
def write_tfrecord_split(data, filename):
    print('Writing {}================= '.format(filename))
    tfrecord_writer = tf.io.TFRecordWriter(filename)
    for batch_data in iter(data):
        for x, y in zip(batch_data['input_features'], batch_data['label']):
            tfrecord_writer.write(serialize_example(x, y))

In [48]:
# dataset = data_from_tfrecord('./Data/TFRecord/DoS', 1000, 0)
DATASET_SIZE = data_info['../data/TFRecord//DoS']
train_size = 500 * 1000
train_label_size = 100 * 1000
val_size = int((DATASET_SIZE - train_size) * 0.2)
test_size = DATASET_SIZE - train_size - val_size
print(train_size, val_size, test_size)
dataset = tf.data.TFRecordDataset('../data/TFRecord/DoS')
dataset = dataset.map(read_tfrecord)
dataset = dataset.shuffle(2)
train = dataset.take(train_size)
train_label = train.take(train_label_size)
train_unlabel = train.skip(train_label_size)
val = dataset.skip(train_size)
test = val.skip(val_size)
val = val.take(val_size)
batch_size = 10000
train = train.batch(batch_size)
test = test.batch(batch_size)
val = val.batch(batch_size)

write_tfrecord_split(train_label, '../data/DoS/train_label')
write_tfrecord_split(train_unlabel, '../data/DoS/train_unlabel')
write_tfrecord_split(test, '../data/DoS/test')
write_tfrecord_split(val, '../data/DoS/val')

500000 -92509 -370040


In [44]:
# Create training test
%time
for attack in attack_types[1:]:
    file_name = '{}{}_dataset.csv'.format(dataset_path, attack)
    print(file_name + '---------------------------')
    df = preprocess(file_name)
    data_info, train_ul, train_l, val, test = create_train_test(df)
    save_path = '../data/{}/'.format(attack)
    print('Path: ', save_path)
    print('Writing train_unlabel.......................')
    write_tfrecord(train_ul, save_path + "train_unlabel")
    print('Writing train_label.......................')
    write_tfrecord(train_l, save_path + "train_label")
    print('Writing test.......................')
    write_tfrecord(test, save_path + "test")
    print('Writing val.......................')
    write_tfrecord(val, save_path + "val")
    print('Writing data info')
    json.dump(data_info, open(save_path + 'datainfo.txt', 'w'))
    print('==========================================')

CPU times: user 1 µs, sys: 1e+03 ns, total: 2 µs
Wall time: 17.2 µs
../data/car-hacking/Fuzzy_dataset.csv---------------------------
Reading from ../data/car-hacking/Fuzzy_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
#Normal:  87888
#Attack:  44486
Create train - test - val: 


TypeError: train_test_split() got an unexpected keyword argument 'test_size'

In [46]:
%%time
data_info = json.load(open('../data/TFRecord/datainfo.txt'))
attack_types = ['DoS', 'Fuzzy', 'gear', 'RPM']
for attack in attack_types:
    print("Attack: {} ==============".format(attack))
    source = '../data/TFRecord//{}'.format(attack)
    dest = '../data/{}/'.format(attack)
    train_test_split(source, dest, data_info[source])

500000 -92509 -370040
<BatchDataset element_spec={'input_features': TensorSpec(shape=(None, 841), dtype=tf.int64, name=None), 'label': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)}>


AttributeError: 'BatchDataset' object has no attribute 'iterrows'

In [22]:
train_test_info = {
        "train_unlabel": train_size - train_label_size,
        "train_label": train_label_size,
        "validation": val_size,
        "test": test_size
}
json.dump(data_info, open(save_path + 'datainfo.txt', 'w'))

NameError: name 'train_size' is not defined

In [23]:
%%time
normal_size = 0
data_info = json.load(open('./Data/TFRecord/datainfo.txt'))
attack_types = ['DoS', 'Fuzzy', 'gear', 'RPM']
for attack in attack_types:
    normal_size += data_info['./Data/TFRecord/Normal_{}'.format(attack)]
sources = ['./Data/TFRecord/Normal_{}'.format(a) for a in attack_types]
dest = './Data/Normal/'
train_test_split(sources, dest, normal_size, train_size=500*1000*4*3, train_label_size=100*1000*4*3)

FileNotFoundError: [Errno 2] No such file or directory: './Data/TFRecord/datainfo.txt'

In [24]:
train_test_info = {
        "train_unlabel": train_size - train_label_size,
        "train_label": train_label_size,
        "validation": val_size,
        "test": test_size
}
json.dump(data_info, open(save_path + 'datainfo.txt', 'w'))

NameError: name 'train_size' is not defined

In [None]:
for attack in attack_types[1:]:
    print('Attack: {} ==============='.format(attack))
    file_name = '{}{}_dataset.csv'.format(dataset_path, attack)
    df = preprocess(file_name)
    write_tfrecord(df[df['label'] == 1], './Data/TFRecord/{}'.format(attack))
    write_tfrecord(df[df['label'] == 0], './Data/TFRecord/Normal_{}'.format(attack))