# Data Preparation

## Library Imports

In [1]:
## Import necessary libraries here
# Data handling
import os
import glob
import polars as pl
import pandas as pd

## 1. Load Data
First, we will use the function 'glob' because it will return all the files starting with "merged".

In [2]:
# Merged Files Path
path = "../Data/Raw"

# Grabbing the correct path
raw_data_path = os.path.join(path, "Merged*.csv")

# Putting all the chunks in an array
merged_files = sorted(glob.glob(raw_data_path))

print("Found Files: ", len(merged_files))

Found Files:  63


Using Polars to handle big datasets

In [3]:
# Build a single lazy frame from all CSVs
lazy_frames = [pl.scan_csv(f) for f in merged_files]

# Concatenate lazily
raw_lf = pl.concat(lazy_frames)

# Quick sanity check on just a couple of rows
print(raw_lf.limit(2).collect())

shape: (2, 40)
┌────────────┬────────────┬────────────┬────────────┬───┬──────────┬────────┬──────────┬───────────┐
│ Header_Len ┆ Protocol   ┆ Time_To_Li ┆ Rate       ┆ … ┆ IAT      ┆ Number ┆ Variance ┆ Label     │
│ gth        ┆ Type       ┆ ve         ┆ ---        ┆   ┆ ---      ┆ ---    ┆ ---      ┆ ---       │
│ ---        ┆ ---        ┆ ---        ┆ f64        ┆   ┆ f64      ┆ i64    ┆ f64      ┆ str       │
│ f64        ┆ i64        ┆ f64        ┆            ┆   ┆          ┆        ┆          ┆           │
╞════════════╪════════════╪════════════╪════════════╪═══╪══════════╪════════╪══════════╪═══════════╡
│ 19.92      ┆ 6          ┆ 63.36      ┆ 25893.9622 ┆ … ┆ 0.000039 ┆ 100    ┆ 1772.41  ┆ DDOS-PSHA │
│            ┆            ┆            ┆ 18         ┆   ┆          ┆        ┆          ┆ CK_FLOOD  │
│ 0.0        ┆ 47         ┆ 64.0       ┆ 3703.84133 ┆ … ┆ 0.000271 ┆ 100    ┆ 2304.0   ┆ MIRAI-GRE │
│            ┆            ┆            ┆ 1          ┆   ┆          ┆        

### 1.1 Saving Raw dataset
The format will be Parquet because it is faster than working with CSV files.

In [4]:
# Stream the full lazy dataset directly to Parquet without loading everything into RAM
raw_lf.sink_parquet("../Data/Raw/Raw_Dataset")
print("Raw dataset saved to Parquet format.")

Raw dataset saved to Parquet format.


### 1.2 Loading Raw Dataset

In [5]:
# Use scan_parquet to keep things lazy
raw_lf = pl.scan_parquet("../Data/Raw/Raw_Dataset")

## 2. Data Cleaning

Dropping null values in 'Label' and 'Std' columns 

In [6]:
clean_lf = raw_lf.drop_nulls(["Label", "Std"])

Adding in the 'Attack_Family' and 'Binary_Label' columns

In [7]:
# inspect unique attack names
unique_labels = (
    clean_lf
    .select(pl.col("Label").unique())
    .collect()
    .to_pandas()["Label"]
    .tolist()
)

unique_labels

['COMMANDINJECTION',
 'XSS',
 'DDOS-ICMP_FLOOD',
 'DDOS-ACK_FRAGMENTATION',
 'DDOS-SYN_FLOOD',
 'DOS-UDP_FLOOD',
 'VULNERABILITYSCAN',
 'DNS_SPOOFING',
 'DDOS-SLOWLORIS',
 'BENIGN',
 'DOS-HTTP_FLOOD',
 'MITM-ARPSPOOFING',
 'DDOS-PSHACK_FLOOD',
 'RECON-OSSCAN',
 'MIRAI-UDPPLAIN',
 'BACKDOOR_MALWARE',
 'RECON-PINGSWEEP',
 'DDOS-UDP_FRAGMENTATION',
 'RECON-HOSTDISCOVERY',
 'DDOS-UDP_FLOOD',
 'DDOS-RSTFINFLOOD',
 'DDOS-HTTP_FLOOD',
 'BROWSERHIJACKING',
 'DICTIONARYBRUTEFORCE',
 'DDOS-ICMP_FRAGMENTATION',
 'DDOS-SYNONYMOUSIP_FLOOD',
 'MIRAI-GREETH_FLOOD',
 'UPLOADING_ATTACK',
 'DOS-TCP_FLOOD',
 'DDOS-TCP_FLOOD',
 'DOS-SYN_FLOOD',
 'SQLINJECTION',
 'RECON-PORTSCAN',
 'MIRAI-GREIP_FLOOD']

In [8]:
# mapping attack labels to their attack family
label_to_family = {
    # BENIGN
    "BENIGN": "BENIGN",

    # DDoS Family
    "DDOS-ACK_FRAGMENTATION": "DDOS",
    "DDOS-UDP_FLOOD": "DDOS",
    "DDOS-SLOWLORIS": "DDOS",
    "DDOS-ICMP_FLOOD": "DDOS",
    "DDOS-RSTFINFLOOD": "DDOS",
    "DDOS-PSHACK_FLOOD": "DDOS",
    "DDOS-HTTP_FLOOD": "DDOS",
    "DDOS-UDP_FRAGMENTATION": "DDOS",
    "DDOS-TCP_FLOOD": "DDOS",
    "DDOS-SYN_FLOOD": "DDOS",
    "DDOS-SYNONYMOUSIP_FLOOD": "DDOS",
    "DDOS-ICMP_FRAGMENTATION": "DDOS",

    # DoS Family
    "DOS-TCP_FLOOD": "DOS",
    "DOS-HTTP_FLOOD": "DOS",
    "DOS-SYN_FLOOD": "DOS",
    "DOS-UDP_FLOOD": "DOS",

    # Recon Family
    "RECON-PINGSWEEP": "RECON",
    "RECON-OSSCAN": "RECON",
    "RECON-PORTSCAN": "RECON",
    "RECON-HOSTDISCOVERY": "RECON",
    "VULNERABILITYSCAN": "RECON",

    # Brute Force
    "DICTIONARYBRUTEFORCE": "BRUTE_FORCE",

    # Spoofing
    "MITM-ARPSPOOFING": "SPOOFING",
    "DNS_SPOOFING": "SPOOFING",

    # Web-based
    "SQLINJECTION": "WEB",
    "COMMANDINJECTION": "WEB",
    "BACKDOOR_MALWARE": "WEB",
    "UPLOADING_ATTACK": "WEB",
    "XSS": "WEB",
    "BROWSERHIJACKING": "WEB",

    # Mirai
    "MIRAI-GREIP_FLOOD": "MIRAI",
    "MIRAI-GREETH_FLOOD": "MIRAI",
    "MIRAI-UDPPLAIN": "MIRAI",
}

# adding the attack family column
clean_lf = clean_lf.with_columns(
    pl.col("Label").replace(label_to_family).alias("Label_Family")
)
# inspect unique attack families
print("The unique attack families are:", clean_lf.select(pl.col("Label_Family").unique()).collect())

The unique attack families are: shape: (8, 1)
┌──────────────┐
│ Label_Family │
│ ---          │
│ str          │
╞══════════════╡
│ BENIGN       │
│ DOS          │
│ RECON        │
│ DDOS         │
│ BRUTE_FORCE  │
│ WEB          │
│ SPOOFING     │
│ MIRAI        │
└──────────────┘


In [9]:
# creation of the binary attack label column from the attack family
clean_lf = clean_lf.with_columns(
    pl.when(pl.col("Label_Family") == "BENIGN")
      .then(0)
      .otherwise(1)
      .alias("Label_Binary")
)

# inspect unique binary labels
print("The unique binary labels are:", clean_lf.select(pl.col("Label_Binary").unique()).collect())

The unique binary labels are: shape: (2, 1)
┌──────────────┐
│ Label_Binary │
│ ---          │
│ i32          │
╞══════════════╡
│ 0            │
│ 1            │
└──────────────┘


Removing Duplicates
This can take a while due to the dataset size and will eat a lot of RAM. Please make sure to have 16GB of RAM available for processing.

In [10]:
clean_lf = clean_lf.unique()

In [11]:
rows_after_dedupe = clean_lf.select(pl.len()).collect().item()
print("Rows after dedupe:", rows_after_dedupe)

Rows after dedupe: 21005417


Dropping Infinite Values

In [12]:
clean_lf = clean_lf.filter(~pl.col("Rate").is_infinite())

Cleaning up column names with consistent formatting

In [13]:
# Standardize column names: replace spaces and capitalization inconsistencies
clean_lf = clean_lf.rename({
    col: col.strip().replace(" ", "_").replace("-", "_")
    for col in clean_lf.collect_schema().names()
})

In [14]:
# Print all column names without collecting the dataset
column_names = clean_lf.collect_schema().names()
print(column_names)

['Header_Length', 'Protocol_Type', 'Time_To_Live', 'Rate', 'fin_flag_number', 'syn_flag_number', 'rst_flag_number', 'psh_flag_number', 'ack_flag_number', 'ece_flag_number', 'cwr_flag_number', 'ack_count', 'syn_count', 'fin_count', 'rst_count', 'HTTP', 'HTTPS', 'DNS', 'Telnet', 'SMTP', 'SSH', 'IRC', 'TCP', 'UDP', 'DHCP', 'ARP', 'ICMP', 'IGMP', 'IPv', 'LLC', 'Tot_sum', 'Min', 'Max', 'AVG', 'Std', 'Tot_size', 'IAT', 'Number', 'Variance', 'Label', 'Label_Family', 'Label_Binary']


Inspect the data types of each column in the lazy df

In [15]:
schema = clean_lf.collect_schema()

for col, dtype in schema.items():
    print(f"{col}: {dtype}")

Header_Length: Float64
Protocol_Type: Int64
Time_To_Live: Float64
Rate: Float64
fin_flag_number: Float64
syn_flag_number: Float64
rst_flag_number: Float64
psh_flag_number: Float64
ack_flag_number: Float64
ece_flag_number: Float64
cwr_flag_number: Float64
ack_count: Int64
syn_count: Int64
fin_count: Int64
rst_count: Int64
HTTP: Float64
HTTPS: Float64
DNS: Float64
Telnet: Float64
SMTP: Float64
SSH: Float64
IRC: Float64
TCP: Float64
UDP: Float64
DHCP: Float64
ARP: Float64
ICMP: Float64
IGMP: Float64
IPv: Float64
LLC: Float64
Tot_sum: Int64
Min: Int64
Max: Int64
AVG: Float64
Std: Float64
Tot_size: Float64
IAT: Float64
Number: Int64
Variance: Float64
Label: String
Label_Family: String
Label_Binary: Int32


Setting the correct column types for each feature and converting to Int32 and Float32.

** Please note that converting to categorical causes null values to appear using Polars. To handle this we will keep the categorical columns as strings to ensure downstream compatibility and we will convert to categorical if needed later.

In [16]:
int_cols = ["ack_count", "syn_count", "fin_count", "rst_count", "Tot_sum", "Min", "Max", "Number"]

float_cols = ["Header_Length", "Time_To_Live", "Rate", "fin_flag_number", "syn_flag_number", "rst_flag_number",
              "psh_flag_number", "ack_flag_number", "ece_flag_number", "cwr_flag_number", "HTTP", "HTTPS", "DNS",
              "Telnet", "SMTP", "SSH", "IRC", "TCP", "UDP", "DHCP", "ARP", "ICMP", "IGMP", "IPv", "LLC", "AVG", 
              "Std", "Tot_size", "IAT", "Variance"]

In [18]:
clean_lf = clean_lf.with_columns(
    # cast integers to Int32
    [pl.col(c).cast(pl.Int32) for c in int_cols] +
    # cast continuous features to Float32
    [pl.col(c).cast(pl.Float32) for c in float_cols]
)

In [19]:
# confirm data type changes
for name, dtype in clean_lf.collect_schema().items():
    print(f"{name}: {dtype}")

Header_Length: Float32
Protocol_Type: Int64
Time_To_Live: Float32
Rate: Float32
fin_flag_number: Float32
syn_flag_number: Float32
rst_flag_number: Float32
psh_flag_number: Float32
ack_flag_number: Float32
ece_flag_number: Float32
cwr_flag_number: Float32
ack_count: Int32
syn_count: Int32
fin_count: Int32
rst_count: Int32
HTTP: Float32
HTTPS: Float32
DNS: Float32
Telnet: Float32
SMTP: Float32
SSH: Float32
IRC: Float32
TCP: Float32
UDP: Float32
DHCP: Float32
ARP: Float32
ICMP: Float32
IGMP: Float32
IPv: Float32
LLC: Float32
Tot_sum: Int32
Min: Int32
Max: Int32
AVG: Float32
Std: Float32
Tot_size: Float32
IAT: Float32
Number: Int32
Variance: Float32
Label: String
Label_Family: String
Label_Binary: Int32


Examine the first few rows of the cleaned dataset

In [20]:
# cleaned dataset to Parquet (streaming, memory-safe)
output_path = "../Data/Cleaned/Cleaned_Dataset.parquet"

clean_lf.sink_parquet(output_path)

print(f"Cleaned dataset written to: {output_path}")

Cleaned dataset written to: ../Data/Cleaned/Cleaned_Dataset.parquet
