In [2]:
import numpy as np
import pandas as pd
import os
from ipaddress import IPv4Address, IPv6Address
import pyarrow as pa
import pyarrow.parquet as pq
import gc
import os


In [2]:
# The parquet file has IP addresses and Port Numbers, we need to strip away those values
# and only keep the Payload Bytes. And we need to convert the attack

output_schema = pa.schema([
    *[pa.field(f"payload_byte_{i}",pa.uint8()) for i in range (1,1477)],
    pa.field("attack_label",pa.uint8()),
    pa.field("attack_type",pa.uint8()),
])

In [3]:
## Encoding the attack labels for multiclassification
# labels into integers
label_map  ={
    'normal':0,
    'exploits':1,
    'dos':2,
    'fuzzers':3,
    'generic':4,
    'reconnaissance':5,
    'worms':6,
    'shellcode':7,
    'backdoor':8,
    'analysis':9
}

In [4]:
parquet_dir = '/home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes'
modified_parquet_dir = '/home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified'
for file in os.listdir(parquet_dir):
    input_file = os.path.join(parquet_dir, file)
    output_file = os.path.join(modified_parquet_dir, file)
    
    writer = pq.ParquetWriter(output_file,schema=output_schema,compression="zstd",version="2.6",use_byte_stream_split=True,write_batch_size=100_000)
    parquet = pq.ParquetFile(input_file,memory_map=True,buffer_size = 16384)

    # We will read the parquet file in batches of 100,000 rows
    # and write them to a new parquet file with the modified schema
    # and labels.
    # This is to avoid memory issues with large parquet files.
    # We will also drop the IP addresses and Port Numbers columns
    # and only keep the Payload Bytes.
    # We will also convert the attack labels to integers
    # and add a new column for the attack type.
    # We will also convert the payload bytes to uint8
    # and fill any missing values with 0.
    for batch in parquet.iter_batches(batch_size=100_000):
        df = batch.to_pandas()
        df.drop(columns=['packet_id',
                        'flow_id',
                        'source_ip',
                        'source_port',
                        'destination_ip',
                        'destination_port',
                        'protocol',
                        'payload_length'])
        df.fillna(inplace=True, value=0)

        df = (df.assign(
            attack_type=df.pop("attack_label").map(label_map),
        ))
        df = (df.assign(
            attack_label=(df["attack_type"] != 0).astype(np.uint8),
        ))
        byte_column = [col for col in df.columns if col.startswith("payload_byte_")]
        df[byte_column] = df[byte_column].astype(np.uint8)

        table = pa.Table.from_pandas(df,schema=output_schema)
        writer.write_table(table)
        del df,batch
        gc.collect()
    print(f"Processed {file} and saved to {output_file}")
    # parquet_file = pq.ParquetFile(input_file)

    # print(parquet_file)

Processed Payload_Bytes_File_3.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_3.parquet
Processed Payload_Bytes_File_4.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_4.parquet
Processed Payload_Bytes_File_18.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_18.parquet
Processed Payload_Bytes_File_14.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_14.parquet
Processed Payload_Bytes_File_1.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_1.parquet
Processed Payload_Bytes_File_15.parquet and saved to /home/saurav/Desktop/Internship/MLP_Payload/UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_15.parquet
Processed Payload_Bytes_File_9.parquet and sav

In [3]:
test_file = pq.ParquetFile('./UNSW-NB15/Payload-Bytes-Modified/Payload_Bytes_File_1.parquet',memory_map=True,buffer_size=16384)


In [4]:
df = next(test_file.iter_batches(batch_size = 10000,)).to_pandas()