In [1]:
import pandas as pd
# import vaex
import numpy as np
import glob
import dask.dataframe as dd
import json
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tqdm import tqdm

In [2]:
def fill_flag(sample):
    if not isinstance(sample['Flag'], str):
        col = 'Data' + str(sample['DLC'])
        sample['Flag'] = sample[col]
    return sample

In [3]:
def convert_canid_bits(cid):
    try:
        s = bin(int(str(cid), 16))[2:].zfill(29)
        bits = list(map(int, list(s)))
        return bits
    except:
        return None

In [4]:
# Read by dask first
attributes = ['Timestamp', 'canID', 'DLC', 
                           'Data0', 'Data1', 'Data2', 
                           'Data3', 'Data4', 'Data5', 
                           'Data6', 'Data7', 'Flag']
folder = './Data/Car-Hacking/'
attack_types = ['DoS', 'Fuzzy', 'gear', 'RPM']
# attack = attack_types[1]
# file_name = '{}{}_dataset.csv'.format(folder, attack)
# print(file_name)
# df = dd.read_csv(file_name, header=None, names=attributes)
# for f in files[1]:
#     print('Reading file: ', f)
#     df = df.append(pd.read_csv(f, header=None))

In [5]:
def preprocess(file_name):
    df = dd.read_csv(file_name, header=None, names=attributes)
    print('Reading from {}: DONE'.format(file_name))
    print('Dask processing: -------------')
    df = df.apply(fill_flag, axis=1)
    pd_df = df.compute()
    pd_df = pd_df[['Timestamp', 'canID', 'Flag']]
    pd_df['canBits'] = pd_df.canID.apply(convert_canid_bits)
    print('Dask processing: DONE')
    print('Aggregate data -----------------')
    as_strided = np.lib.stride_tricks.as_strided  
    test_df = pd_df.reset_index()
    win = 29
    v = as_strided(test_df.canBits, (len(test_df) - (win - 1), win), (test_df.canBits.values.strides * 2))
    test_df['Flag'] = test_df['Flag'].apply(lambda x: True if x == 'T' else False)
    test_df['features'] = pd.Series(v.tolist(), index=test_df.index[win - 1:])
    # test_df['features'] = test_df.features.apply(lambda x: np.array(x).ravel().tolist())
    v = as_strided(test_df.Flag, (len(test_df) - (win - 1), win), (test_df.Flag.values.strides * 2))
    test_df['label'] = pd.Series(v.tolist(), index=test_df.index[win - 1:])
    test_df = test_df.iloc[win - 1:]
    test_df['label'] = test_df['label'].apply(lambda x: 1 if any(x) else 0)
    print('Preprocessing: DONE')
    return test_df[['features', 'label']].reset_index().drop(['index'], axis=1)

In [6]:
def create_train_test(df):
    print('Create train - test - val: ')
    train, test = train_test_split(df, test_size=0.3, shuffle=True)
    train, val = train_test_split(train, test_size=0.2, shuffle=True)
    train_ul, train_l = train_test_split(train, test_size=0.1, shuffle=True)
    train_ul = train_ul.reset_index().drop(['index'], axis=1)
    train_l = train_l.reset_index().drop(['index'], axis=1)
    test = test.reset_index().drop(['index'], axis=1)
    val = val.reset_index().drop(['index'], axis=1)
    
    data_info = {
        "train_unlabel": train_ul.shape[0],
        "train_label": train_l.shape[0],
        "validation": val.shape[0],
        "test": test.shape[0]
    }
    
    return data_info, train_ul, train_l, val, test

In [7]:
def serialize_example(x, y):
    """converts x, y to tf.train.Example and serialize"""
    #Need to pay attention to whether it needs to be converted to numpy() form
    input_features = tf.train.Int64List(value = np.array(x).flatten())
    label = tf.train.Int64List(value = np.array([y]))
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(int64_list = input_features),
            "label" : tf.train.Feature(int64_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()

In [8]:
def write_tfrecord(data, filename):
    tfrecord_writer = tf.io.TFRecordWriter(filename)
    for _, row in tqdm(data.iterrows()):
        tfrecord_writer.write(serialize_example(row['features'], row['label']))
    tfrecord_writer.close()    

In [9]:
%time
for attack in attack_types[1:]:
    file_name = '{}{}_dataset.csv'.format(folder, attack)
    print(file_name + '---------------------------')
    df = preprocess(file_name)
    data_info, train_ul, train_l, val, test = create_train_test(df)
    save_path = './Data/{}/'.format(attack)
    print('Path: ', save_path)
    print('Writing train_unlabel.......................')
    write_tfrecord(train_ul, save_path + "train_unlabel")
    print('Writing train_label.......................')
    write_tfrecord(train_l, save_path + "train_label")
    print('Writing test.......................')
    write_tfrecord(test, save_path + "test")
    print('Writing val.......................')
    write_tfrecord(val, save_path + "val")
    print('Writing data info')
    json.dump(data_info, open(save_path + 'datainfo.txt', 'w'))
    print('==========================================')

CPU times: user 0 ns, sys: 9 µs, total: 9 µs
Wall time: 16.7 µs
./Data/Car-Hacking/Fuzzy_dataset.csv---------------------------
Reading from ./Data/Car-Hacking/Fuzzy_dataset.csv: DONE
Dask processing: -------------


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta={'Timestamp': 'float64', 'canID': 'object', 'DLC': 'int64', 'Data0': 'object', 'Data1': 'object', 'Data2': 'object', 'Data3': 'object', 'Data4': 'object', 'Data5': 'object', 'Data6': 'object', 'Data7': 'object', 'Flag': 'object'})



Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
Create train - test - val: 


1it [00:00,  7.08it/s]

Path:  ./Data/Fuzzy/
Writing train_unlabel.......................


1934770it [10:54, 2954.10it/s]
570it [00:00, 2896.08it/s]

Writing train_label.......................


214975it [01:11, 2997.53it/s]
64it [00:00, 638.36it/s]

Writing test.......................


1151650it [06:20, 3026.12it/s]
184it [00:00, 1837.91it/s]

Writing val.......................


537437it [03:06, 2876.34it/s]


Writing data info
./Data/Car-Hacking/gear_dataset.csv---------------------------
Reading from ./Data/Car-Hacking/gear_dataset.csv: DONE
Dask processing: -------------


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta={'Timestamp': 'float64', 'canID': 'object', 'DLC': 'int64', 'Data0': 'object', 'Data1': 'object', 'Data2': 'object', 'Data3': 'object', 'Data4': 'object', 'Data5': 'object', 'Data6': 'object', 'Data7': 'object', 'Flag': 'object'})

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.


Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
Create train - test - val: 


1it [00:00,  5.94it/s]

Path:  ./Data/gear/
Writing train_unlabel.......................


2239328it [12:46, 2921.27it/s]
238it [00:00, 2379.68it/s]

Writing train_label.......................


248815it [01:29, 2769.91it/s]
315it [00:00, 1836.83it/s]

Writing test.......................


1332935it [07:37, 2912.73it/s]
145it [00:00, 1448.68it/s]

Writing val.......................


622036it [03:25, 3025.76it/s]
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta={'Timestamp': 'float64', 'canID': 'object', 'DLC': 'int64', 'Data0': 'object', 'Data1': 'object', 'Data2': 'object', 'Data3': 'object', 'Data4': 'object', 'Data5': 'object', 'Data6': 'object', 'Data7': 'object', 'Flag': 'object'})



Writing data info
./Data/Car-Hacking/RPM_dataset.csv---------------------------
Reading from ./Data/Car-Hacking/RPM_dataset.csv: DONE
Dask processing: -------------
Dask processing: DONE
Aggregate data -----------------
Preprocessing: DONE
Create train - test - val: 


1it [00:00,  5.68it/s]

Path:  ./Data/RPM/
Writing train_unlabel.......................


2329322it [12:51, 3019.52it/s]
252it [00:00, 2514.30it/s]

Writing train_label.......................


258814it [01:23, 3106.57it/s]
1it [00:00,  9.79it/s]

Writing test.......................


1386503it [07:30, 3074.51it/s]
463it [00:00, 2450.96it/s]

Writing val.......................


647035it [03:27, 3111.81it/s]


Writing data info


In [42]:
# with open(save_path + 'datainfo.txt', 'w') as f:
#     f.write('Train Unlabel: {}\n'.format(train_ul.shape[0]))
#     f.write('Train Label: {}\n'.format(train_l.shape[0]))
#     f.write('Test: {}\n'.format(test.shape[0]))
#     f.write('Validation: {}\n'.format(val.shape[0]))
#     f.close()

In [None]:
# raw_data = tf.data.TFRecordDataset('data')
# feature_description = {
#     'input_features': tf.io.FixedLenFeature([29*29], tf.int64),
#     'label': tf.io.FixedLenFeature([1], tf.int64)
# }

# def _parse_image_function(example_proto):
#   # Parse the input tf.train.Example proto using the dictionary above.
#   return tf.io.parse_single_example(example_proto, feature_description)

# parsed_image_dataset = raw_data.map(_parse_image_function)
# parsed_image_dataset

In [38]:
# for image_features in parsed_image_dataset:
#     image_raw = image_features['input_features'].numpy()