In [1]:
# imports
import os

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
import numpy as np

from utils import dtype

In a future release, Dask DataFrame will use a new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 


    # via Python

    # via CLI


  import dask.dataframe as dd


In [2]:
# cluster setup
cluster = LocalCluster(n_workers=os.cpu_count())
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38743 instead


In [3]:
# dataset_part2: dd.DataFrame = dd.read_csv(os.path.join('..', '..', 'data','CSE-CIC-IDS-2018', 'raw_data', 'Tuesday-20-02-2018_TrafficForML_CICFlowMeter.csv'), dtype=dtype)
# dataset_part1: dd.DataFrame = dd.read_csv(os.path.join('..', '..', 'data','CSE-CIC-IDS-2018','raw_data', 'uniform_files', '*.csv'), dtype=dtype)

# extra_cols = dataset_part2.columns.difference(dataset_part1.columns)
# for col in extra_cols:
#     dataset_part1[col] = None
# dataset: dd.DataFrame = dd.concat([dataset_part1, dataset_part2])  # type: ignore
dataset: dd.DataFrame = dd.read_csv(os.path.join('..', '..', 'data','CSE-CIC-IDS-2018', 'meta_labelled_data', '*.part'), dtype=dtype)
dataset['Timestamp'] = dd.to_datetime(dataset['Timestamp'])

def parse_date_time(part):  # datetime disambiguation
    mask = part['Timestamp'].dt.hour < 8
    part.loc[mask, 'Timestamp'] += pd.Timedelta(hours=12)
    return part

dataset = dataset.map_partitions(parse_date_time)

In [6]:
# attack labelling script

# note: this csv was generated from the table provided by the authors at: https://www.unb.ca/cic/datasets/ids-2018.html
attack_list: pd.DataFrame = pd.read_csv(os.path.join('..', '..', 'data','CSE-CIC-IDS-2018', 'attack_list.csv'))  
attack_list['start datetime'] = pd.to_datetime(attack_list['Date'] + ' ' + attack_list['Attack Start Time'] + ':00', dayfirst=True)
attack_list['end datetime'] = pd.to_datetime(attack_list['Date'] + ' ' + attack_list['Attack Finish Time'] + ':59', dayfirst=True)
attack_list['Date'] = pd.to_datetime(attack_list['Date'], dayfirst=True).dt.date

dataset['attack name'] = 'Benign'
dataset['attack category'] = 'Benign'
def label_attacks(partition, add_categories=True):
    for _, attack in attack_list.iterrows():
        if attack['Label'] in ['Infilteration', 'Bot']:
            is_correct_date = np.logical_and(attack['start datetime'] <= partition['Timestamp'], partition['Timestamp'] <= attack['end datetime'])  
        else:
            is_correct_date = attack['Date'] == partition['Timestamp'].dt.date
        is_part_of_attack = np.logical_and(partition['Label'] == attack['Label'], is_correct_date)
        partition.loc[is_part_of_attack, 'attack name'] = attack['Attack Name']  # attack label
        
        if add_categories:
            partition.loc[is_part_of_attack, 'attack category'] = attack['Attack Category']  # add category label
        
    partition.loc[(partition['Label'] != 'Benign') & partition['attack name'] == 'Benign', 'attack name'] = 'Unknown'
    if add_categories:
        partition.loc[(partition['Label'] != 'Benign') & partition['attack name'] == 'Benign', 'attack category'] = 'Unknown'
    return partition
metalabelled_dataset: dd.DataFrame = dataset.map_partitions(label_attacks) # type: ignore
# label_attacks(dataset.partitions[10].compute())

In [7]:
metalabelled_dataset.to_csv(os.path.join('..', '..', 'data', 'CSE-CIC-IDS-2018', 'meta_labelled_data2'), single_file=False, index=False)



['/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/000.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/001.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/002.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/003.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/004.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/005.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/006.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/007.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/008.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/009.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/010.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/011.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/012.part',
 '/home/calvin/FYP/data/CSE-CIC-IDS-2018/meta_labelled_data2/013