In [50]:
import os
import json
import random
from datetime import datetime
import pandas as pd
import numpy as np
from progressbar import progressbar
from multiprocessing.pool import ThreadPool

Preparing data from:

https://www.stratosphereips.org/datasets-iot23

In [2]:
pd.set_option('display.max_columns', None)

In [54]:
parsed_files_folder = 'dataset/parsed/'
processed_dfs_folder = 'dataset/processed/'
shuffled_dfs_folder = 'dataset/shuffled/'

In [53]:
files = [parsed_files_folder + i for i in os.listdir(parsed_files_folder)]

In [4]:
columns_to_drop = ['ts', 'uid', 'detailed-label', 'id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p', 'local_orig', 'local_resp', 'tunnel_parents', 'history']
numerical_columns = ['duration', 'orig_bytes', 'resp_bytes', 'missed_bytes', 'orig_pkts', 'orig_ip_bytes', 'resp_pkts', 'resp_ip_bytes']
categorical_columns = ['proto', 'service', 'conn_state']

In [5]:
class NumericalCheck:
    def __init__(self, col_name):
        self.col_name = col_name
        self.min = np.nan
        self.max = np.nan
    
    def update(self, val):
        if np.isnan(self.min) or val < self.min:
            self.min = val
        if np.isnan(self.max) or val > self.max:
            self.max = val
            
    def __str__(self):
        return f'<{self.col_name} -> [{self.min}; {self.max}]>'
    
    def __repr__(self):
        return self.__str__()

In [6]:
class CategoricalCheck:
    def __init__(self, col_name, upper_bound=3000):
        self.col_name = col_name
        self.uniques = set()
        self.valid = True
        self.upper_bound = upper_bound
        self.count = 0
        
    def _check_bound(self):
        self.count = len(self.uniques)
        if self.count > self.upper_bound:
            self.valid = False
            self.uniques = set()
        
    def add_points(self, points: set):
        if self.valid:
            self.uniques.update(points)
            self._check_bound()
        else:
            self.count += len(points)
     
    def __str__(self):
        return f'<{self.col_name}: {self.count} || {"valid" if self.valid else "invalid"}>'
    
    def __repr__(self):
        return self.__str__()

In [7]:
categorical_checks = {col: CategoricalCheck(col) for col in categorical_columns}
numerical_checks = {col: NumericalCheck(col) for col in numerical_columns}

In [8]:
def check_df(file):
    df = pd.read_parquet(file)
    df = df.drop(columns_to_drop, axis=1)
    for col in numerical_columns:
        df[col] = df[col].replace('-', np.nan).map(float)
        numerical_checks[col].update(df[col].max())
        numerical_checks[col].update(df[col].min())
    
    for col in categorical_columns:
        df[col] = df[col].str.lower()
        categorical_checks[col].add_points(set(df[col]))

In [9]:
for file in progressbar(files, max_value=len(files), redirect_stdout=True):
    check_df(file)

100% (342 of 342) |######################| Elapsed Time: 0:23:51 Time:  0:23:51


In [10]:
numerical_checks

{'duration': <duration -> [1e-06; 93280.030966]>,
 'orig_bytes': <orig_bytes -> [0.0; 66205578295.0]>,
 'resp_bytes': <resp_bytes -> [0.0; 31720511878.0]>,
 'missed_bytes': <missed_bytes -> [0.0; 1908819480.0]>,
 'orig_pkts': <orig_pkts -> [0.0; 66027354.0]>,
 'orig_ip_bytes': <orig_ip_bytes -> [0.0; 1914793266.0]>,
 'resp_pkts': <resp_pkts -> [0.0; 239484.0]>,
 'resp_ip_bytes': <resp_ip_bytes -> [0.0; 349618679.0]>}

In [20]:
with open('normalizers.json', 'w+') as file:
    json.dump({k: v.max for k, v in numerical_checks.items()}, file)

In [11]:
categorical_checks

{'proto': <proto: 3 || valid>,
 'service': <service: 7 || valid>,
 'conn_state': <conn_state: 13 || valid>}

In [22]:
with open('encoders.json', 'w+') as file:
    json.dump({k: list(v.uniques) for k, v in categorical_checks.items()}, file)

In [23]:
with open('normalizers.json', 'r') as file:
    numerical_checks = json.load(file)
    
with open('encoders.json', 'r') as file:
    categorical_checks = json.load(file)

- normalize numerical_columns
- one-hot-encode categorical_columns
- df.label -> df.isMalicious (lambda r: r == 'malicious') <- y

In [26]:
def encode_df(df):
    for col in categorical_columns:
        for val in categorical_checks[col]:
            if val == '-':
                continue
            df[col] = df[col].str.lower()
            df[f'{col}_{val}'] = (df[col] == val).map(lambda v: 1 if v else 0)
    return df.drop(categorical_columns, axis=1)

In [27]:
def normalize_df(df):
    for col in numerical_columns:
        df[col] = df[col].replace('-', np.nan).map(float)
        df[col] /= numerical_checks[col]
    return df

In [28]:
def process_file(file):
    print(f'processing file {file}')
    df = pd.read_parquet(file)
    df = df.drop(columns_to_drop, axis=1)
    df['label'] = (df['label'].str.lower() == 'malicious').map(lambda v: 1 if v else 0)
    df = encode_df(df)
    df = normalize_df(df)
    df.to_parquet(file.replace('parsed', 'processed'))
    print(f'file {file} finished')

In [32]:
with ThreadPool(4) as pool:
    pool.map(process_file, files)

processing file dataset/parsed/part_1
processing file dataset/parsed/part_119
processing file dataset/parsed/part_139
processing file dataset/parsed/part_159
file dataset/parsed/part_1 finished
processing file dataset/parsed/part_10
file dataset/parsed/part_159 finished
processing file dataset/parsed/part_16
file dataset/parsed/part_139 finished
processing file dataset/parsed/part_14
file dataset/parsed/part_10 finished
processing file dataset/parsed/part_100
file dataset/parsed/part_119 finished
processing file dataset/parsed/part_12
file dataset/parsed/part_16 finished
processing file dataset/parsed/part_160
file dataset/parsed/part_100 finished
processing file dataset/parsed/part_101
file dataset/parsed/part_14 finished
processing file dataset/parsed/part_140
file dataset/parsed/part_12 finished
processing file dataset/parsed/part_120
file dataset/parsed/part_101 finished
processing file dataset/parsed/part_102
file dataset/parsed/part_140 finished
processing file dataset/parsed/par

In [36]:
pd.read_parquet('dataset/processed/part_50')

Unnamed: 0,duration,orig_bytes,resp_bytes,missed_bytes,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,label,proto_tcp,proto_icmp,proto_udp,service_dns,service_ssl,service_ssh,service_dhcp,service_http,service_irc,conn_state_rstos0,conn_state_sf,conn_state_rstr,conn_state_sh,conn_state_oth,conn_state_shr,conn_state_s3,conn_state_rstrh,conn_state_s1,conn_state_rsto,conn_state_rej,conn_state_s0,conn_state_s2
0,5.360204e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
1,2.144082e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
2,3.216122e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
3,2.144082e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
4,5.360204e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999996,2.144082e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
999997,2.144082e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
999998,2.144082e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
999999,1.072041e-11,0.0,0.0,0.0,3.029048e-08,4.177997e-08,0.0,0.0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0


In [37]:
processed_files = [processed_dfs_folder + file for file in os.listdir(processed_dfs_folder)]

In [39]:
double_check = {file: len(pd.read_parquet(file)) == len(pd.read_parquet(file.replace('processed', 'parsed'))) for file in processed_files}

In [41]:
{k: v for k, v in double_check.items() if not v}

{}

## Sorting files and rows

In [47]:
batch_size = 10 # files

In [51]:
random.shuffle(processed_files)

In [60]:
len(processed_files)

342

In [67]:
def count_shuffled_files():
    return len(os.listdir(shuffled_dfs_folder))

In [70]:
for i in progressbar(range(0, len(processed_files), batch_size), max_value=round(len(processed_files) / batch_size), redirect_stdout=True):
    df = pd.DataFrame()
    for file in processed_files[i:i+batch_size]:
        df = df.append(pd.read_parquet(file))
    
    df = df.sample(frac=1).reset_index(drop=True)
    df.to_parquet(f'{shuffled_dfs_folder}part_{count_shuffled_files() + 1}')

100% (34.2 of 34.2) |####################| Elapsed Time: 0:08:19 Time:  0:08:19
