# Data Preparation

Uses 8-9 GB of RAM, peaks around 12 GB during loading
Takes 

#### Import dependencies

In [30]:
import os
import ipaddress
import glob
import sys

# import re
# from collections import Counter

import pandas as pd
import numpy as np
import geoip2.database
import humanize

# import matplotlib.pyplot as plt
# import seaborn as sns
# sns.set_theme()
# from scipy import stats

In [37]:
output_file_prefix = './data/CTU-IoT-Malware-Capture'
csv_files_to_load = glob.glob('./data/CTU-IoT-Malware-Capture*.labeled.csv')
training_outfile = output_file_prefix + "_train.csv"

ORIGINAL_LABEL_COLUMN_NAME = 'label'
LABEL_COLUMN_NAME = 'label_bool'

NORMALIZE_METHOD = "min_max"

COLUMN_NAMES_CATEGORICAL = [ #'ip_asn', 'ip_dest_country',
                            'id.resp_p', 'id.orig_p',
                            'id.orig_h', 'id.resp_h',
                            'proto', 'service', 'conn_state']

FEATURE_PROPER_DATATYPES = {
    'local_orig':   bool,
    'local_resp':   bool,
    'missed_bytes': int,
    'id.resp_p':    'category',
    'id.orig_p':    'category',
    'id.orig_h':    'category',
    'id.resp_h':    'category',
    'proto':        'category',
    'service':      'category',
    'conn_state':   'category'
    #'orig_bytes':   int,       # has NaN values
    #'resp_bytes':   int        # has NaN values
    }

columns_to_OHE = ['proto', 'service', 'conn_state', 
                  'history', 'ip_dest_country'] 
                    #'id.resp_h', 'id.orig_h']
                    
# TODO: use a third party service/mapping/dictionary/API for these lookups
SERVICE_TO_PROTOCOL_AND_PORT_MAPPINGS = {
  'ssh': {'protocol': 'tcp', 'port': 22},
  'dns': {'protocol': 'udp', 'port': 53},
}

geoip_country = geoip2.database.Reader('./geoip/GeoLite2-Country_20240308/GeoLite2-Country.mmdb')
geoip_asn     = geoip2.database.Reader('./geoip/GeoLite2-ASN_20240308/GeoLite2-ASN.mmdb')

# df.dtypes

### Load the data into a Pandas dataframe
Define the path to the dataset file
Define the name of the label column

In [9]:
# load a SINGLE CSV file:
# rootdir = os.getcwd()
# infile = os.path.join(rootdir, 'data',
#                       'CTU-IoT-Malware-Capture-20-1conn.log.labeled.csv')
# df = pd.read_csv(infile, delimiter='|', na_values='-')

# load a directory of CSV files:

dfs = []
for iter_csv_file in csv_files_to_load:
    # filesize_MB = int(os.stat(iter_csv_file).st_size / (1024 * 1024))
    # if filesize_MB >= 5:
    #     print(f'skipping file {filesize_MB}, too big at {filesize_MB} MB')
    #     continue
    # /var/folders/90/cd8pt9qd43q0svfjsljg9ccr0000gn/T/ipykernel_60137/4122006169.py:16: DtypeWarning: Columns (7,22) have mixed types. Specify dtype option on import or set low_memory=False.

    df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')
    dfs.append(df_temp)
    del df_temp

# print(sys.getsizeof(dfs))

df = pd.concat(dfs, ignore_index=True)
# print(sys.getsizeof(df))

del dfs

  df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')
  df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')
  df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')
  df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')
  df_temp = pd.read_csv(iter_csv_file, delimiter='|', na_values='-')


#### Customized variables for this dataset

Feature description from documentation: https://www.kaggle.com/datasets/agungpambudi/network-malware-detection-connection-analysis/data

|Field Name|Description|Type|
| ----------- | ----------- | ----------- |
|ts|The timestamp of the connection event.|time|
|uid|A unique identifier for the connection.|string|
|id.orig_h|The source IP address.|addr|
|id.orig_p|The source port.|port|
|id.resp_h|The destination IP address.|addr|
|id.resp_p|The destination port.|port|
|proto|The network protocol used (e.g., 'tcp').|enum|
|service|The service associated with the connection.|string|
|duration|The duration of the connection.|interval|
|orig_bytes|The number of bytes sent from the source to the destination.|count|
|resp_bytes|The number of bytes sent from the destination to the source.|count|
|conn_state|The state of the connection.|string|
|local_orig|Indicates whether the connection is considered local or not.|bool|
|local_resp|Indicates whether the connection is considered local or not.|bool|
|missed_bytes|The number of missed bytes in the connection.|count|
|history|A history of connection states.|string|
|orig_pkts|The number of packets sent from the source to the destination.|count|
|orig_ip_bytes|The number of IP bytes sent from the source to the destination.|count|
|resp_pkts|The number of packets sent from the destination to the source.|count|
|resp_ip_bytes|The number of IP bytes sent from the destination to the source.|count|
|tunnel_parents|Indicates if this connection is part of a tunnel.|set[string]|
|label|A label associated with the connection (e.g., 'Malicious' or 'Benign').|string|
|detailed-label|A more detailed description or label for the connection.|string|

In [11]:
df['service'].unique()
# array([nan, 'dns', 'http', 'dhcp', 'ssl', 'irc', 'ssh'], dtype=object)


array([nan, 'dns', 'http', 'dhcp', 'ssl', 'irc', 'ssh'], dtype=object)

# Transforms

More transform ideas:
* service vs port/protcol mismatch
* first time contact between client/server
* receiving end high port
* total last 24 hour bandwidth between client/server

In [12]:
# Setting the label as boolean
df[LABEL_COLUMN_NAME] = df[ORIGINAL_LABEL_COLUMN_NAME].isin(['Malicious   C&C']).astype(int).astype(bool)

for colname, newdatatype in FEATURE_PROPER_DATATYPES.items():
    df[colname] = df[colname].astype(newdatatype)

# for iter_colname in COLUMN_NAMES_CATEGORICAL:
#     df[iter_colname] = df[iter_colname].astype('category')
   
# TODO: double check this w/ unique values 
df['is_tunneled'] = not(df['tunnel_parents'].isna)

# converting the date to timestamp,
# need the unit='s' to convert Unix time
df["ts_converted"] = pd.to_datetime(
    df["ts"], errors="raise",
    unit='s'
)

df.set_index('uid', inplace=True)
# IP_ADDRESS_COLUMN_NAMES = ['id.orig_h', 'id.resp_h']
# for iter_colname in IP_ADDRESS_COLUMN_NAMES:
#     df[iter_colname] = df[iter_colname].apply(ipaddress.ip_address)
df.dtypes

ts                       float64
id.orig_h               category
id.orig_p               category
id.resp_h               category
id.resp_p               category
proto                   category
service                 category
duration                 float64
orig_bytes               float64
resp_bytes               float64
conn_state              category
local_orig                  bool
local_resp                  bool
missed_bytes               int64
history                   object
orig_pkts                float64
orig_ip_bytes            float64
resp_pkts                float64
resp_ip_bytes            float64
tunnel_parents           float64
label                     object
detailed-label            object
label_bool                  bool
is_tunneled                 bool
ts_converted      datetime64[ns]
dtype: object

In [13]:
# Show some sample data after the transforms
df.head(3)

Unnamed: 0_level_0,ts,id.orig_h,id.orig_p,id.resp_h,id.resp_p,proto,service,duration,orig_bytes,resp_bytes,...,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents,label,detailed-label,label_bool,is_tunneled,ts_converted
uid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
CdNmOg26ZIaBRzPvWj,1545403000.0,192.168.1.196,59932.0,104.248.160.24,80.0,tcp,,3.097754,0.0,0.0,...,3.0,180.0,0.0,0.0,,Malicious C&C,,True,False,2018-12-21 14:34:02.863611904
CgzGV333k9WCximeu8,1545403000.0,192.168.1.196,59932.0,104.248.160.24,80.0,tcp,,,,,...,1.0,60.0,0.0,0.0,,Malicious C&C,,True,False,2018-12-21 14:34:10.041294080
CLm5Pd3ZnqmYVjrZ44,1545403000.0,192.168.1.196,59932.0,104.248.160.24,80.0,tcp,,,,,...,1.0,60.0,0.0,0.0,,Malicious C&C,,True,False,2018-12-21 14:34:18.441478912


# Locating missing values

In [14]:
# TODO: show this as a percentage of the whole

# Locating missing values:
nan_count = np.sum(df.isnull(), axis=0).sort_values(ascending=False)

# display just columns that have at least 1 missing value:
nan_count[nan_count > 0]

tunnel_parents    25011003
service           24993006
detailed-label    17954112
duration          15272073
orig_bytes        15272073
resp_bytes        15272073
history              25116
dtype: int64

## Removing columns that the model doesn't use
TBD

In [15]:
df.drop(
    columns=[
        ORIGINAL_LABEL_COLUMN_NAME,     # was replaced
        "detailed-label",               # will be used in future version of this Notebook
        "ts",                           # was converted to a new column
        # "uid",                           # unique identifier, not used by model
        "tunnel_parents"                # documentation isn't clear enough on what this is or how it is formatted or why to be useful
    ],
    inplace=True,
)

df.dtypes

id.orig_h              category
id.orig_p              category
id.resp_h              category
id.resp_p              category
proto                  category
service                category
duration                float64
orig_bytes              float64
resp_bytes              float64
conn_state             category
local_orig                 bool
local_resp                 bool
missed_bytes              int64
history                  object
orig_pkts               float64
orig_ip_bytes           float64
resp_pkts               float64
resp_ip_bytes           float64
label_bool                 bool
is_tunneled                bool
ts_converted     datetime64[ns]
dtype: object

# Transformations - Data Enrichment via adding features

This step takes the longest - about 8 minutes on a MacBook air

TODO: see if it can be multithreaded

In [16]:
# configure and load the GeoIP databases
# %pip install geoip2
# restart the kernel

# https://dev.maxmind.com/geoip/geolite2-free-geolocation-data?lang=en  
# https://www.maxmind.com/en/accounts/985797/geoip/downloads
# https://github.com/maxmind/GeoIP2-python?tab=readme-ov-file#database-usage


def ip_to_country(ip_as_str):
    try:
        ip = ipaddress.ip_address(ip_as_str)
        if ip.is_global:
            return geoip_country.country(ip).country.name
    finally:
        return None

def ip_to_asn(ip_as_str):
    try:
        ip = ipaddress.ip_address(ip_as_str)
        if ip.is_global:
            return geoip_asn.asn(ip).autonomous_system_number
    finally:
        return None

# GeoIP
df['ip_dest_country'] = df['id.resp_h'].apply(ip_to_country)
df['ip_asn']          = df['id.resp_h'].apply(ip_to_asn)

print(df['ip_dest_country'].unique().tolist())
print(df['ip_asn'].unique().tolist())

#TODO: add feature where service does NOT match the port/protocol
# because that is something suspicious and implies they're trying to
# hide something

df.dtypes

[None]
[None]


id.orig_h                category
id.orig_p                category
id.resp_h                category
id.resp_p                category
proto                    category
service                  category
duration                  float64
orig_bytes                float64
resp_bytes                float64
conn_state               category
local_orig                   bool
local_resp                   bool
missed_bytes                int64
history                    object
orig_pkts                 float64
orig_ip_bytes             float64
resp_pkts                 float64
resp_ip_bytes             float64
label_bool                   bool
is_tunneled                  bool
ts_converted       datetime64[ns]
ip_dest_country            object
ip_asn                     object
dtype: object

## Converting strings to one-hot encoded columns
Locate string columns that have a small number of unique values and replace them with one-hot encoded versions, then remove the original column.

Runtime: 3 min on Macbook air

In [17]:
for iter_column_name in columns_to_OHE:
    # define a new column name
    new_column_prefix = iter_column_name # + '_onehot_'
    
    # create a one-hot encoded version in a new dataframe
    temp_df = pd.get_dummies(df[iter_column_name], prefix=new_column_prefix)

    # merge the new dataframe into the existing one
    df = df.join(temp_df)

    # remove the original column now that it has been encoded 
    # into the existing dataframe
    df.drop(columns=iter_column_name, inplace=True)
    
    # TODO: get count of # of new columns
    # TODO: make sure it is not one-hot encoding Booleans
    print(f'One-hot encoded: {iter_column_name} into {new_column_prefix}*')

One-hot encoded: proto into proto*
One-hot encoded: service into service*
One-hot encoded: conn_state into conn_state*
One-hot encoded: history into history*
One-hot encoded: ip_dest_country into ip_dest_country*


In [18]:
# Everything should be reduced to numbers at this point

list_of_string_columns = df.select_dtypes(include=object).columns.tolist()

# create a Pandas Series that lists the string columns by ascending counts
df_unique_string_vals = df[list_of_string_columns].nunique().sort_values(ascending=True)
df_unique_string_vals

ip_asn    0
dtype: int64

# Re-order the columns
Sort the column names alphabetically, but make sure the 'label' column is always last.
AWS Sagemaker cares about the order and having the label be last.

In [19]:
# alphabetically sort the column names, but leave the label as the last column
column_order = sorted(df.columns)
column_order.remove(LABEL_COLUMN_NAME)
column_order.append(LABEL_COLUMN_NAME)
df = df.reindex(column_order, axis=1)

# Final tests

In [None]:
# Runtime: 1 min on Macbook Air
# check for missing values
# check for any remaining strings
#df.describe(include="all")

pd.describe?

Unnamed: 0,conn_state_OTH,conn_state_REJ,conn_state_RSTO,conn_state_RSTOS0,conn_state_RSTR,conn_state_RSTRH,conn_state_S0,conn_state_S1,conn_state_S2,conn_state_S3,...,resp_ip_bytes,resp_pkts,service_dhcp,service_dns,service_http,service_irc,service_ssh,service_ssl,ts_converted,label_bool
count,25011003,25011003,25011003,25011003,25011003,25011003,25011003,25011003,25011003,25011003,...,25011000.0,25011000.0,25011003,25011003,25011003,25011003,25011003,25011003,25011003,25011003
unique,2,2,2,2,2,2,2,2,2,2,...,,,2,2,2,2,2,2,,2
top,False,False,False,False,False,False,True,False,False,False,...,,,False,False,False,False,False,False,,False
freq,21325667,24994931,25010489,22891151,25009086,25010925,19151121,25010872,25010867,25008545,...,,,25010992,25003970,25007607,25009356,25005104,25010992,,25002318
mean,,,,,,,,,,,...,1.754076,0.01465307,,,,,,,2018-12-22 00:47:11.382609408,
min,,,,,,,,,,,...,0.0,0.0,,,,,,,2018-05-09 15:30:31.015073024,
25%,,,,,,,,,,,...,0.0,0.0,,,,,,,2018-07-26 04:43:56.001054976,
50%,,,,,,,,,,,...,0.0,0.0,,,,,,,2018-12-22 04:10:37.901381888,
75%,,,,,,,,,,,...,0.0,0.0,,,,,,,2019-02-28 20:20:26.340016896,
max,,,,,,,,,,,...,520116.0,9613.0,,,,,,,2019-09-21 00:39:50.773122048,


In [21]:
# show the final datatypes before exporting to CSV
df.dtypes

conn_state_OTH                 bool
conn_state_REJ                 bool
conn_state_RSTO                bool
conn_state_RSTOS0              bool
conn_state_RSTR                bool
                          ...      
service_irc                    bool
service_ssh                    bool
service_ssl                    bool
ts_converted         datetime64[ns]
label_bool                     bool
Length: 303, dtype: object

In [22]:
df.head()

Unnamed: 0_level_0,conn_state_OTH,conn_state_REJ,conn_state_RSTO,conn_state_RSTOS0,conn_state_RSTR,conn_state_RSTRH,conn_state_S0,conn_state_S1,conn_state_S2,conn_state_S3,...,resp_ip_bytes,resp_pkts,service_dhcp,service_dns,service_http,service_irc,service_ssh,service_ssl,ts_converted,label_bool
uid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
CdNmOg26ZIaBRzPvWj,False,False,False,False,False,False,True,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-12-21 14:34:02.863611904,True
CgzGV333k9WCximeu8,False,False,False,False,False,False,True,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-12-21 14:34:10.041294080,True
CLm5Pd3ZnqmYVjrZ44,False,False,False,False,False,False,True,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-12-21 14:34:18.441478912,True
CDn2pd1rDD1lCMXAia,False,False,False,False,False,False,True,False,False,False,...,0.0,0.0,False,True,False,False,False,False,2018-12-21 14:34:13.913069056,False
C1NKkV3tB4rImzbpDj,False,False,False,False,False,False,True,False,False,False,...,0.0,0.0,False,True,False,False,False,False,2018-12-21 14:34:03.902540032,False


In [23]:
# close the readers
geoip_country.close()
geoip_asn.close()

In [36]:
# size output

# print(df.size) # total number of cells (rows times columns)
print(df.shape[0])

# print(humanize.naturalsize(sys.getsizeof(df)))

25011003
13.5 GB


# Storing training and prediction data into CSV files

full dataframe has 25011003 rows

|file type|# of rows|size|runtime|filename|expanded file size (MB)|compression ratio (%)|% of data exported|Est runtime for full data|Est full file size|
|---|---|---|---|---|---|---|---|---|---|
|CSV|509191|898 MB|?|CTU-IoT-Malware-Capture_train.csv|same|N/A|2.0%|?|43 GB|
|XZ|379831|12 MB|10 min 36 sec|CTU-IoT-Malware-Capture_train.xz|703 MB|98%|1.5186%|11.5 hours|790 MB|
|CSV|40261|74.5 MB|60 sec|CTU-IoT-Malware-Capture_train.csv|same|N/A|0.001609731524961|10 hours |45 GB|

In [38]:
# Runtime on Macbook air with full dataset to uncompressed file: at least 8 min
# TODO: find faster way to write file to disk
#   CSV is mariginally faster than XZ, but takes up way more space
#   https://dask.pydata.org/en/latest/diagnostics-local.html
#   increase # of rows/block size
#   https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html
# TODO: progress meter for saving file

# Create a training/test dataset and output to CSV

df.to_csv(training_outfile)
print(f"Training data saved to new CSV file:\n{training_outfile}")

output_filesize = humanize.naturalsize(os.stat(training_outfile).st_size)
print(f'output file size: {output_filesize}')

KeyboardInterrupt: 