# Developing a Machine Learning model for detecting Internet of Things Malware
Based off AGUNG PAMBUDI's dataset published on Kaggle: https://www.kaggle.com/datasets/agungpambudi/network-malware-detection-connection-analysis/data

The 12 source CSV files are about 3.2 GB total when uncompressed, and have a total of 25,011,015 lines in them.

## Primary objectives:
* develop a well generalized model with a targeted (TBD Metric) of (TBD metric value)
* learn how to load/process data at gigabyte scale with on-prem hardware
* show my InfoSec expertise by enhancing the existing dataset with valuable new features, and by providing quality data preparation/filtering

## To Do Lists:
### To Do - Optimization / Performance
* switch input file from CSV to Parquet, partition data on disk
* switch output from from CSV to Parquet
* minimize memory utilization - select smallest appropriate datatypes for each feature
* add support for multi-threading and vectorization, especially my custom feature functions
* display progress meter for reading/writing files if possible, maybe ETA too

### To Do - Data processing
* check for bias in data, rebalance as needed
* winsorize selected numerical features
* normalize selected numerical features
* investigate feature correlation with other features and the label, remove features that aren't providing value or that are too correlated with other features
* display counts of missing values as a percentage of the whole
* double check tunnel_parents column for unique values 
* Figure out what to do with missing values - remove or replace
* OHE - get count of # of new columns created during process, display it
* OHE - make sure it is not one-hot encoding Booleans

### To Do - Feature enhancement ideas
* check for service does NOT match the port/protocol because that is something suspicious and implies they're trying to hide something. Initial version: use hardcoded dict to support the 5-6 services listed. Enhanced: use third party service to do the lookup and support multiple port numbers for a single service (like http)
* add support for threat intelligence feeds wrt IPs and other stuff

### To Do - Research:
* investigate other common Intrusion Detection System Indictors of Comprimise for possible feature development
* investigate that latest MITRE ATT&CK framework methodologies and attacker trends for possible feature development

### To Do - Modeling:
* Build a Jupyter Notebook for training & optimizing Logistic Regression models on this data
* Optimize the above model using gradient descent on hyperparameters
* try out other model algorithms like decision tree, and others

### To Do -  Longer term:
* apply the model to my existing data lake of netflow, DNS, endpoint collection, and other data from SIEM.
* implement model training as a pipeline to automatically create and optimize new models, roll them out
* deploy the model to my local network and scan network traffic in near real time
* investigate running in parallel computing, cloud providers, and/or GPU

Misc notes:
* Currently uses 8-9 GB of working set RAM, peaks around 12 GB during loading

In [1]:
# Performance improvements
%pip install --upgrade pip
%pip install cython numba pandas numpy geoip2 humanize pyarrow matplotlib seaborn scipy

%load_ext Cython

import numba
numba.set_num_threads(4)

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


OMP: Info #276: omp_set_nested routine deprecated, please use omp_set_max_active_levels instead.


#### Import dependencies

In [2]:
import os
import ipaddress
import glob
import sys

# import re
# from collections import Counter

import pandas as pd
import numpy as np
import geoip2.database
import humanize

# import matplotlib.pyplot as plt
# import seaborn as sns
# sns.set_theme()
# from scipy import stats

In [3]:
output_file_prefix = './data/CTU-IoT-Malware-Capture'
csv_files_to_load = glob.glob('./data/CTU-IoT-Malware-Capture*.labeled.csv')
training_outfile = output_file_prefix + "_train.csv.gz"

ORIGINAL_LABEL_COLUMN_NAME = 'label'
LABEL_COLUMN_NAME = 'label_bool'

NORMALIZE_METHOD = "min_max"

COLUMN_NAMES_CATEGORICAL = [ #'ip_asn', 'ip_dest_country',
                            'id.resp_p', 'id.orig_p',
                            'id.orig_h', 'id.resp_h',
                            'proto', 'service', 'conn_state']

# https://stackoverflow.com/questions/29245848/what-are-all-the-dtypes-that-pandas-recognizes
FEATURE_PROPER_DATATYPES = {
    'local_orig':   'Int64',
    'local_resp':   'Int64',
    'missed_bytes': 'Int64',
    'id.resp_p':    'category',
    'id.orig_p':    'category',
    'id.orig_h':    'category',
    'id.resp_h':    'category',
    'proto':        'category',
    'service':      'category',
    'conn_state':   'category',
    'tunnel_parents':   'category',
    'duration':     'float32',   # np.float32
    'history':      'category'
    # 'ts': have to pass parameters
    
    #'orig_bytes':   int,       # has NaN values, so have to use float
    #'resp_bytes':   int        # has NaN values
    }

columns_to_OHE = ['proto', 'service', 'conn_state', 
                  'history', 'ip_dest_country'] 
                    #'id.resp_h', 'id.orig_h']
                    
SERVICE_TO_PROTOCOL_AND_PORT_MAPPINGS = {
  'ssh': {'protocol': 'tcp', 'port': 22},
  'dns': {'protocol': 'udp', 'port': 53},
}

geoip_country = geoip2.database.Reader('./geoip/GeoLite2-Country_20240308/GeoLite2-Country.mmdb')
geoip_asn     = geoip2.database.Reader('./geoip/GeoLite2-ASN_20240308/GeoLite2-ASN.mmdb')

def get_human_friendly_mem_size(dfx):
    return humanize.naturalsize(dfx.memory_usage(index=True, deep=True))
    #  sys.getsizeof(objname))

# df.dtypes

### Load the data into a Pandas dataframe
Define the path to the dataset file
Define the name of the label column

performance
|Runtime|Parser Engine|specified datatypes|specified index|specified chunksize|memory_map|df.memory_usage|
|---|---|---|---|---|---|---|
|96 sec|unspecifed|unspecifed|unspecified|unspecified|unspecified (Default: False)|?|
|3.5 min|unspecifed|almost all|uid|unspecified|unspecified (Default: False)|?|
|4 min 6 sec|unspecifed|almost all|uid|unspecified|True|16.9 GB|
|2 min 7 sec|pyarrow|almost all|uid|unspecified|True|14.3 GB|

In [4]:
# load a directory of CSV files:
# chunksizeint
dfs = []
for iter_csv_file in csv_files_to_load:
    # filesize_MB = int(os.stat(iter_csv_file).st_size / (1024 * 1024))
    # if filesize_MB >= 5:
    #     print(f'skipping file {filesize_MB}, too big at {filesize_MB} MB')
    #     continue
    # /var/folders/90/cd8pt9qd43q0svfjsljg9ccr0000gn/T/ipykernel_60137/4122006169.py:16: DtypeWarning: Columns (7,22) have mixed types. Specify dtype option on import or set low_memory=False.

    # https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-chunking
    # https://stackoverflow.com/questions/66346343/can-i-load-multiple-csv-files-using-pyarrow
    # https://arrow.apache.org/docs/2.0/python/generated/pyarrow.csv.ReadOptions.html#pyarrow.csv.ReadOptions
    print(iter_csv_file)
    df_temp = pd.read_csv(iter_csv_file,
                          index_col='uid',
                          #engine='pyarrow',        # causes issues wiith delimiters sometimes
                          dtype=FEATURE_PROPER_DATATYPES,
                          delimiter='|',
                          na_values='-'
    )
    dfs.append(df_temp)
    del df_temp

df = pd.concat(dfs, ignore_index=True)
del dfs

./data/CTU-IoT-Malware-Capture-3-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-21-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-34-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-44-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-20-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-8-1conn.log.labeled.csv
./data/CTU-IoT-Malware-Capture-42-1conn.log.labeled.csv


In [5]:
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.memory_usage.html
# get_human_friendly_mem_size(df)
df.memory_usage(deep=True)

Index                  132
ts                 1606472
id.orig_h         13780568
id.orig_p         12476827
id.resp_h         14163874
id.resp_p         11993825
proto             12049748
service            6761894
duration            803236
orig_bytes         1606472
resp_bytes         1606472
conn_state        11866311
local_orig         1807281
local_resp         1807281
missed_bytes       1807281
history           11687799
orig_pkts          1606472
orig_ip_bytes      1606472
resp_pkts          1606472
resp_ip_bytes      1606472
tunnel_parents      200809
label             13338996
detailed-label    14122418
dtype: int64

#### Customized variables for this dataset

Feature description from documentation: https://www.kaggle.com/datasets/agungpambudi/network-malware-detection-connection-analysis/data

|Field Name|Description|Type|
| ----------- | ----------- | ----------- |
|ts|The timestamp of the connection event.|time|
|uid|A unique identifier for the connection.|string|
|id.orig_h|The source IP address.|addr|
|id.orig_p|The source port.|port|
|id.resp_h|The destination IP address.|addr|
|id.resp_p|The destination port.|port|
|proto|The network protocol used (e.g., 'tcp').|enum|
|service|The service associated with the connection.|string|
|duration|The duration of the connection.|interval|
|orig_bytes|The number of bytes sent from the source to the destination.|count|
|resp_bytes|The number of bytes sent from the destination to the source.|count|
|conn_state|The state of the connection.|string|
|local_orig|Indicates whether the connection is considered local or not.|bool|
|local_resp|Indicates whether the connection is considered local or not.|bool|
|missed_bytes|The number of missed bytes in the connection.|count|
|history|A history of connection states.|string|
|orig_pkts|The number of packets sent from the source to the destination.|count|
|orig_ip_bytes|The number of IP bytes sent from the source to the destination.|count|
|resp_pkts|The number of packets sent from the destination to the source.|count|
|resp_ip_bytes|The number of IP bytes sent from the destination to the source.|count|
|tunnel_parents|Indicates if this connection is part of a tunnel.|set[string]|
|label|A label associated with the connection (e.g., 'Malicious' or 'Benign').|string|
|detailed-label|A more detailed description or label for the connection.|string|

In [6]:
df['service'].unique()

array([nan, 'irc', 'ssh', 'dns', 'dhcp', 'http', 'ssl'], dtype=object)

# Transforms

More transform ideas:
* service vs port/protcol mismatch
* first time contact between client/server
* receiving end high port
* total last 24 hour bandwidth between client/server

In [7]:
# Setting the label as boolean
df[LABEL_COLUMN_NAME] = df[ORIGINAL_LABEL_COLUMN_NAME].isin(['Malicious   C&C']).astype(int).astype(bool)

for colname, newdatatype in FEATURE_PROPER_DATATYPES.items():
    df[colname] = df[colname].astype(newdatatype)

# for iter_colname in COLUMN_NAMES_CATEGORICAL:
#     df[iter_colname] = df[iter_colname].astype('category')

df['is_tunneled'] = not(df['tunnel_parents'].isna)

# converting the date to timestamp,
# need the unit='s' to convert Unix time
df['ts_converted'] = pd.to_datetime(
    df['ts'], errors="raise",
    unit='s'
)

# df.set_index('uid', inplace=True)     # causes issues sometimes
# IP_ADDRESS_COLUMN_NAMES = ['id.orig_h', 'id.resp_h']
# for iter_colname in IP_ADDRESS_COLUMN_NAMES:
#     df[iter_colname] = df[iter_colname].apply(ipaddress.ip_address)
df.dtypes

ts                       float64
id.orig_h               category
id.orig_p               category
id.resp_h               category
id.resp_p               category
proto                   category
service                 category
duration                 float32
orig_bytes               float64
resp_bytes               float64
conn_state              category
local_orig                 Int64
local_resp                 Int64
missed_bytes               Int64
history                 category
orig_pkts                float64
orig_ip_bytes            float64
resp_pkts                float64
resp_ip_bytes            float64
tunnel_parents          category
label                     object
detailed-label            object
label_bool                  bool
is_tunneled                 bool
ts_converted      datetime64[ns]
dtype: object

In [8]:
# Show some sample data after the transforms
df.head(3)

Unnamed: 0,ts,id.orig_h,id.orig_p,id.resp_h,id.resp_p,proto,service,duration,orig_bytes,resp_bytes,...,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents,label,detailed-label,label_bool,is_tunneled,ts_converted
0,1526756000.0,192.168.2.5,38792,200.168.87.203,59353,tcp,,2.998333,0.0,0.0,...,3.0,180.0,0.0,0.0,,Malicious,PartOfAHorizontalPortScan,False,False,2018-05-19 18:57:41.866499901
1,1526756000.0,192.168.2.5,38792,200.168.87.203,59353,tcp,,,,,...,1.0,60.0,0.0,0.0,,Malicious,PartOfAHorizontalPortScan,False,False,2018-05-19 18:57:48.874876022
2,1526756000.0,192.168.2.5,38793,200.168.87.203,59353,tcp,,2.997182,0.0,0.0,...,3.0,180.0,0.0,0.0,,Malicious,PartOfAHorizontalPortScan,False,False,2018-05-19 18:57:52.877722025


# Locating missing values

In [9]:
# Locating missing values:
nan_count = np.sum(df.isnull(), axis=0).sort_values(ascending=False)

# display just columns that have at least 1 missing value:
nan_count[nan_count > 0]

local_resp        200809
tunnel_parents    200809
local_orig        200809
service           188811
duration          101072
orig_bytes        101072
resp_bytes        101072
detailed-label     40976
history             1208
dtype: int64

## Removing columns that the model doesn't use
TBD

In [10]:
df.drop(
    columns=[
        ORIGINAL_LABEL_COLUMN_NAME,     # was replaced
        "detailed-label",               # will be used in future version of this Notebook
        "ts",                           # was converted to a new column
        # "uid",                           # unique identifier, not used by model
        "tunnel_parents"                # documentation isn't clear enough on what this is or how it is formatted or why to be useful
    ],
    inplace=True,
)

df.dtypes

id.orig_h              category
id.orig_p              category
id.resp_h              category
id.resp_p              category
proto                  category
service                category
duration                float32
orig_bytes              float64
resp_bytes              float64
conn_state             category
local_orig                Int64
local_resp                Int64
missed_bytes              Int64
history                category
orig_pkts               float64
orig_ip_bytes           float64
resp_pkts               float64
resp_ip_bytes           float64
label_bool                 bool
is_tunneled                bool
ts_converted     datetime64[ns]
dtype: object

# Transformations - Data Enrichment via adding features

This step takes the longest - about 8 minutes on a MacBook air w/o vectorizing via numba or specifying # of threads via 

In [11]:
# configure and load the GeoIP databases
# %pip install geoip2
# restart the kernel

# https://dev.maxmind.com/geoip/geolite2-free-geolocation-data?lang=en  
# https://www.maxmind.com/en/accounts/985797/geoip/downloads
# https://github.com/maxmind/GeoIP2-python?tab=readme-ov-file#database-usage

# @numba.vectorize
def ip_to_country(ip_as_str):
    try:
        ip = ipaddress.ip_address(ip_as_str)
        if ip.is_global:
            return geoip_country.country(ip).country.name
    finally:
        return None

# @numba.vectorize
def ip_to_asn(ip_as_str):
    try:
        ip = ipaddress.ip_address(ip_as_str)
        if ip.is_global:
            return geoip_asn.asn(ip).autonomous_system_number
    finally:
        return None

# GeoIP
df['ip_dest_country'] = df['id.resp_h'].apply(ip_to_country)
df['ip_asn']          = df['id.resp_h'].apply(ip_to_asn)

print(df['ip_dest_country'].unique().tolist())
print(df['ip_asn'].unique().tolist())

df.dtypes

[None]
[None]


id.orig_h                category
id.orig_p                category
id.resp_h                category
id.resp_p                category
proto                    category
service                  category
duration                  float32
orig_bytes                float64
resp_bytes                float64
conn_state               category
local_orig                  Int64
local_resp                  Int64
missed_bytes                Int64
history                  category
orig_pkts                 float64
orig_ip_bytes             float64
resp_pkts                 float64
resp_ip_bytes             float64
label_bool                   bool
is_tunneled                  bool
ts_converted       datetime64[ns]
ip_dest_country            object
ip_asn                     object
dtype: object

## Converting strings to one-hot encoded columns
Locate string columns that have a small number of unique values and replace them with one-hot encoded versions, then remove the original column.

Runtime: 3 min on Macbook air

In [12]:
for iter_column_name in columns_to_OHE:
    # define a new column name
    new_column_prefix = iter_column_name # + '_onehot_'
    
    # create a one-hot encoded version in a new dataframe
    temp_df = pd.get_dummies(df[iter_column_name], prefix=new_column_prefix)

    # merge the new dataframe into the existing one
    df = df.join(temp_df)

    # remove the original column now that it has been encoded 
    # into the existing dataframe
    df.drop(columns=iter_column_name, inplace=True)
    
    print(f'One-hot encoded: {iter_column_name} into {new_column_prefix}*')

One-hot encoded: proto into proto*
One-hot encoded: service into service*
One-hot encoded: conn_state into conn_state*
One-hot encoded: history into history*
One-hot encoded: ip_dest_country into ip_dest_country*


In [13]:
# Everything should be reduced to numbers at this point

list_of_string_columns = df.select_dtypes(include=object).columns.tolist()

# create a Pandas Series that lists the string columns by ascending counts
df_unique_string_vals = df[list_of_string_columns].nunique().sort_values(ascending=True)
df_unique_string_vals

ip_asn    0
dtype: int64

# Re-order the columns
Sort the column names alphabetically, but make sure the 'label' column is always last.
AWS Sagemaker cares about the order and having the label be last.

In [14]:
# alphabetically sort the column names, but leave the label as the last column
column_order = sorted(df.columns)
column_order.remove(LABEL_COLUMN_NAME)
column_order.append(LABEL_COLUMN_NAME)
df = df.reindex(column_order, axis=1)

# Final tests

In [15]:
# show the final datatypes before exporting to CSV
df.dtypes

conn_state_OTH                bool
conn_state_REJ                bool
conn_state_RSTO               bool
conn_state_RSTR               bool
conn_state_RSTRH              bool
                         ...      
service_irc                   bool
service_ssh                   bool
service_ssl                   bool
ts_converted        datetime64[ns]
label_bool                    bool
Length: 147, dtype: object

In [16]:
df.head()

Unnamed: 0,conn_state_OTH,conn_state_REJ,conn_state_RSTO,conn_state_RSTR,conn_state_RSTRH,conn_state_S0,conn_state_S1,conn_state_S2,conn_state_S3,conn_state_SF,...,resp_ip_bytes,resp_pkts,service_dhcp,service_dns,service_http,service_irc,service_ssh,service_ssl,ts_converted,label_bool
0,False,False,False,False,False,True,False,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-05-19 18:57:41.866499901,False
1,False,False,False,False,False,True,False,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-05-19 18:57:48.874876022,False
2,False,False,False,False,False,True,False,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-05-19 18:57:52.877722025,False
3,False,False,False,False,False,True,False,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-05-19 18:57:59.884958982,False
4,False,False,False,False,False,True,False,False,False,False,...,0.0,0.0,False,False,False,False,False,False,2018-05-19 18:58:03.888751030,False


In [17]:
# close the readers
geoip_country.close()
geoip_asn.close()

In [18]:
# size output

# print(df.size) # total number of cells (rows times columns)
print(df.shape[0])

# print(humanize.naturalsize(sys.getsizeof(df)))

200809


# Storing training and prediction data into CSV files

full dataframe has 25011003 rows

|file type|# of rows|size|runtime|filename|expanded file size (MB)|compression ratio (%)|% of data exported|Est runtime for full data|Est full file size|
|---|---|---|---|---|---|---|---|---|---|
|CSV|509191|898 MB|?|CTU-IoT-Malware-Capture_train.csv|same|N/A|2.0%|?|43 GB|
|XZ|379831|12 MB|10 min 36 sec|CTU-IoT-Malware-Capture_train.xz|703 MB|98%|1.5186%|11.5 hours|790 MB|
|CSV|40261|74.5 MB|60 sec|CTU-IoT-Malware-Capture_train.csv|same|N/A|0.001609731524961|10 hours |45 GB|

In [19]:
# Runtime on Macbook air with full dataset to uncompressed file: at least 8 min
#   CSV is mariginally faster than XZ, but takes up way more space
#   https://dask.pydata.org/en/latest/diagnostics-local.html
#   increase # of rows/block size
#   https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html

# Create a training/test dataset and output to CSV
df.to_csv(training_outfile,
        sep='|',
        header=True,
        #index=False,
        chunksize=1000000,
        compression='gzip',
        encoding='utf-8')

print(f"Training data saved to new file:\n{training_outfile}")

output_filesize = humanize.naturalsize(os.stat(training_outfile).st_size)
print(f'output file size: {output_filesize}')

Training data saved to new file:
./data/CTU-IoT-Malware-Capture_train.csv.gz
output file size: 4.7 MB
