In [1]:
import os, sys
from pathlib import Path

import pandas as pd 
import dask.dataframe as dd

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, Normalizer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.utils import shuffle

import hashlib  
import ipaddress
import json
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)


##### FUNC ############
def ip_to_float(ip):
    try:
        ret = float(int(ipaddress.IPv4Address(ip)))
        
        if ret < 256:
            return ret/1e5
        
        return ret/1e9
    except:
        return 0.0  # for invalid or empty IPs
    
def sum_of_squares(partition):
    return pd.Series([(partition ** 2).sum()])

def string_to_float(s):
    if pd.notna(s):
        return int(hashlib.sha256(str(s).encode('utf-8')).hexdigest(), 16) % 10**8 / 1e8
    return 0


def down_ratio(f):
    return f/(f+1e-9)

# Bucket port
def bucket_port(port):
    if port < 1024:
        return 0  # Well-known
    elif port < 49152:
        return 1  # Registered
    else:
        return 2  # Dynamic/private
##### FUNC ##############

In [None]:
dir_in = "C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge.csv"
dir_out = "C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge1.1.csv"
df = dd.read_csv(dir_out, assume_missing= True)

# print(df['Dst IP'].value_counts().compute())

cols = [col for col in df.columns if df[col].dtype !="string"]
print(cols)



# print(type(skewnesses))
# for key in skewnesses:
#     print(f"{key}: {skewnesses[key]}")

# sample_cols_to_remove = []
# for col in cols:
#     tmp = df[col].min().compute()
#     print(f"{col}: {tmp}")
    # if (tmp < 0):
        # sample_cols_to_remove.append(tmp)
# print(sample_cols_to_remove)
    
# dict_dtypes = {}
# for col in df.columns:
#     if("Port" or "Protocol" or "Total Fwd Packet" or "Total Bwd packets" or "Flag" or "Subflow Fwd Packets") in col:
#         dict_dtypes[col] = "int32"
#     elif col.equals("Flow ID") :
#         dict_dtypes[col]= "string"
#     else:
#         dict_dtypes[col] = "float64"

# with open("cic_dtypes.json", "w") as f: #  encoding='utf-8'
#     json.dump(dict_dtypes, f, indent=4)

['Src IP', 'Src Port', 'Dst IP', 'Dst Port', 'Protocol', 'Flow Duration', 'Total Fwd Packet', 'Total Bwd packets', 'Total Length of Fwd Packet', 'Total Length of Bwd Packet', 'Fwd Packet Length Max', 'Fwd Packet Length Min', 'Fwd Packet Length Mean', 'Fwd Packet Length Std', 'Bwd Packet Length Max', 'Bwd Packet Length Min', 'Bwd Packet Length Mean', 'Bwd Packet Length Std', 'Flow Bytes/s', 'Flow Packets/s', 'Flow IAT Mean', 'Flow IAT Std', 'Flow IAT Max', 'Flow IAT Min', 'Fwd IAT Total', 'Fwd IAT Mean', 'Fwd IAT Std', 'Fwd IAT Max', 'Fwd IAT Min', 'Bwd IAT Total', 'Bwd IAT Mean', 'Bwd IAT Std', 'Bwd IAT Max', 'Bwd IAT Min', 'Fwd PSH Flags', 'Bwd PSH Flags', 'Fwd URG Flags', 'Bwd URG Flags', 'Fwd Header Length', 'Bwd Header Length', 'Fwd Packets/s', 'Bwd Packets/s', 'Packet Length Min', 'Packet Length Max', 'Packet Length Mean', 'Packet Length Std', 'Packet Length Variance', 'FIN Flag Count', 'SYN Flag Count', 'RST Flag Count', 'PSH Flag Count', 'ACK Flag Count', 'URG Flag Count', 'CWR 

In [10]:
for key in skewnesses.index:
    print(f"{key}: {skewnesses[key]}")

Src IP: -11.85681679850071
Src Port: 0.015213355831179637
Dst IP: -4.967303819322464
Dst Port: 0.09876885999094136
Protocol: 2.118618792060651
Flow Duration: 0.30019469732670484
Total Fwd Packet: 429.0949915638777
Total Bwd packets: 1409.5336398232519
Total Length of Fwd Packet: 286.6088684541981
Total Length of Bwd Packet: 1601.5047436185903
Fwd Packet Length Max: 12.627625954398045
Fwd Packet Length Min: 4.700604984503514
Fwd Packet Length Mean: 4.335824219378202
Fwd Packet Length Std: 9.855103870151606
Bwd Packet Length Max: 7.45366273602255
Bwd Packet Length Min: 6.99705146758592
Bwd Packet Length Mean: 6.60842154603048
Bwd Packet Length Std: 12.57926471226421
Flow Bytes/s: 202.66317622042916
Flow Packets/s: 21.83056570256306
Flow IAT Mean: 5.4085986782333535
Flow IAT Std: 6.312092908701854
Flow IAT Max: 3.0509258914754738
Flow IAT Min: 6.851115116258727
Fwd IAT Total: 0.3157038699218383
Fwd IAT Mean: 5.353639979115954
Fwd IAT Std: 5.22723198580291
Fwd IAT Max: 3.075270883326676
Fw

In [None]:
first_chunk = True
labels_to_remove = ["Command_Injection", "Backdoor_Malware", "Dictionary_BruteForce", "Browser_Hijacking"]
sample_cols_to_remove = ['Flow Duration', 'Flow Bytes/s', 'Flow Packets/s', 'Flow IAT Mean', 'Flow IAT Max', 'Flow IAT Min', 'Fwd IAT Total', 'Fwd IAT Mean', 'Fwd IAT Max', 'Fwd IAT Min', 'Bwd IAT Total', 'Bwd IAT Mean', 'Bwd IAT Max', 'Bwd IAT Min', 'Bwd Bulk Rate Avg']
# ["Flow Duration","Flow Bytes/s", "Flow Packets/s", "Flow IAT Mean", "Flow IAT Max", "Flow IAT Min", "Fwd IAT Total", "Fwd IAT Mean", "Fwd IAT Max", "Fwd IAT Min", "Bwd IAT Total", "Bwd IAT Mean", "Bwd IAT Max","Bwd IAT Min"]
cols_to_drop = ["Flow ID", "Timestamp"]

chunk_size = 200000    
for index, chunk in enumerate(pd.read_csv(dir_in, chunksize=chunk_size)):
    chunk = chunk.dropna()
    chunk = chunk.drop_duplicates()
    chunk = chunk.drop(columns=cols_to_drop)
    
    for col in sample_cols_to_remove:
        chunk = chunk[chunk[col]>=0]
    
    # print(chunk.dtypes)
    chunk = chunk.replace([np.inf, -np.inf, "inf", "-inf", "Infinity", "-Infinity", r'[N|n][a|A][N|n]', "(empty)"], 0)
    chunk = chunk.fillna(0)
    
    chunk["Src Port"] = chunk["Src Port"].apply(bucket_port)
    chunk["Dst Port"] = chunk["Dst Port"].apply(bucket_port)
    
    # chunk['Flow ID'] = chunk['Flow ID'].apply(string_to_float)

    chunk['Src IP'] = chunk['Src IP'].apply(ip_to_float)
    chunk['Dst IP'] = chunk['Dst IP'].apply(ip_to_float)

    chunk = chunk[~chunk["Label"].isin(labels_to_remove)]

    chunk.to_csv(dir_out, mode='w' if first_chunk else 'a', header=first_chunk, index=False)
    # chunk.to_parquet(output_file, engine='pyarrow', index=False)
    first_chunk = False
    print(f"Index: {index}") #Chunktail: {chunk.tail()}

In [2]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import gc
import numpy as np
import joblib
import dask.dataframe as dd
import pandas as pd

# merge1.2 -> scaled
dir_in = "C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge1.1.csv"
dir_out = "C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge1.2.csv"

df = dd.read_csv(dir_in, assume_missing=True)

labels = df["Label"].unique().compute().tolist()
labels = sorted(labels, key=lambda x: (x != "Benign", x))
label_mapping = {label: idx for idx, label in enumerate(labels)}
with open("pkl/label_mapping.txt", "w") as f:
    for k, v in label_mapping.items():
        f.write(f"{k}: {v}\n")
    f.close()

gc.collect()

standard_scaler = StandardScaler()
minmax_scaler = MinMaxScaler()

cols = [col for col in df.columns if df[col].dtype !="string"]
skewnesses = df[cols].skew().compute()
do_not_touch_cols = ['Src IP', 'Src Port', 'Dst IP', 'Dst Port', 'Protocol' ]

del df
gc.collect()

minmax_cols = []
standard_cols = []

for col in skewnesses.index:
    if col not in do_not_touch_cols:
        if abs(skewnesses[col]) <= 1.0:
            minmax_cols.append(col)
        else:
            standard_cols.append(col)
print(f"MinMax Scaler Columns: {minmax_cols}")
print(f"StandardScaler Columns: {standard_cols}")

chunk_size = 500000
DECIMAL_BIN = 6
# Partial Fit
for index, chunk in enumerate(pd.read_csv(dir_in, chunksize=chunk_size)):
    # minmax_chunk = np.log1p(chunk[minmax_cols])
    print(f"Partial fit chunk: {index}")
    
    standard_chunk = np.log1p(chunk[standard_cols])
    
    minmax_scaler.partial_fit(chunk[minmax_cols]) 
    standard_scaler.partial_fit(standard_chunk)
    
    gc.collect()

first_chunk = True
for index, chunk in enumerate(pd.read_csv(dir_in, chunksize=chunk_size)):
    print(f"Transform chunk: {index}")
    
    chunk = chunk.sample(frac=1.0, random_state=42)
    chunk[standard_cols] = np.log1p(chunk[standard_cols])
    
    chunk[minmax_cols] = minmax_scaler.transform(chunk[minmax_cols])
    chunk[standard_cols] = standard_scaler.transform(chunk[standard_cols])
    
    chunk[minmax_cols] = chunk[minmax_cols].round(DECIMAL_BIN)
    chunk[standard_cols] = chunk[standard_cols].round(DECIMAL_BIN)
    
    chunk["Label"] = chunk["Label"].map(label_mapping)
    chunk.to_csv(dir_out, mode='w' if first_chunk else 'a', header=first_chunk, index=False)
    
    first_chunk= False
    gc.collect()

MinMax Scaler Columns: ['Flow Duration', 'Fwd IAT Total', 'Bwd PSH Flags', 'Bwd URG Flags', 'Fwd Bytes/Bulk Avg', 'Fwd Packet/Bulk Avg', 'Fwd Bulk Rate Avg', 'Fwd Seg Size Min']
StandardScaler Columns: ['Total Fwd Packet', 'Total Bwd packets', 'Total Length of Fwd Packet', 'Total Length of Bwd Packet', 'Fwd Packet Length Max', 'Fwd Packet Length Min', 'Fwd Packet Length Mean', 'Fwd Packet Length Std', 'Bwd Packet Length Max', 'Bwd Packet Length Min', 'Bwd Packet Length Mean', 'Bwd Packet Length Std', 'Flow Bytes/s', 'Flow Packets/s', 'Flow IAT Mean', 'Flow IAT Std', 'Flow IAT Max', 'Flow IAT Min', 'Fwd IAT Mean', 'Fwd IAT Std', 'Fwd IAT Max', 'Fwd IAT Min', 'Bwd IAT Total', 'Bwd IAT Mean', 'Bwd IAT Std', 'Bwd IAT Max', 'Bwd IAT Min', 'Fwd PSH Flags', 'Fwd URG Flags', 'Fwd Header Length', 'Bwd Header Length', 'Fwd Packets/s', 'Bwd Packets/s', 'Packet Length Min', 'Packet Length Max', 'Packet Length Mean', 'Packet Length Std', 'Packet Length Variance', 'FIN Flag Count', 'SYN Flag Count',

In [2]:
import joblib
joblib.dump(standard_scaler, "pkl/standardScaler.pkl")
joblib.dump(minmax_scaler, "pkl/minmaxScaler.pkl")

NameError: name 'standard_scaler' is not defined

In [1]:
import dask.dataframe as dd
import polars as pl
dir_out = "C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge1.2.csv"


df = dd.read_csv(dir_out, blocksize="750MB")
df = df.sample(frac=1.0, random_state=42)
# print(df[cols].skew().compute())

In [2]:

print(df.head(10))

          Src IP  Src Port    Dst IP  Dst Port  Protocol  Flow Duration  \
215017  3.232240         1  3.232236         0        17       0.000014   
517800  3.232271         0  3.232271         1         6       0.000060   
275053  3.232271         1  3.232271         0        17       0.000148   
652084  3.232271         0  3.232271         2         6       0.000084   
113204  3.232271         1  2.650362         0         6       0.000792   
503457  3.232271         1  3.232271         1         6       0.767013   
197416  3.232271         0  3.232271         2         6       0.133569   
669740  3.232271         0  3.232271         1         6       0.747819   
933518  3.232271         1  3.232271         1         6       0.885966   
294342  3.232271         1  3.232271         0        17       0.033408   

        Total Fwd Packet  Total Bwd packets  Total Length of Fwd Packet  \
215017         -1.112147           0.562077                    0.932964   
517800         -1.112147

In [3]:
df.to_parquet("C:/Users/hoang/Documents/Dataset_KLTN/ciciot2023_extracted/merge-processed/merge1.2")