## Feature Engineering with PySpark.
This tutorial uses the output files generated in the EDA script

The code is copied from the video: https://www.youtube.com/watch?v=TlXqsL4ysB0&t=178s

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [2]:
spark = (
    SparkSession.builder.appName("iot")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

In [3]:
output_dir = r"C:\Users\gabyl\spark_outputs\preprocessing"

df = spark.read.parquet(output_dir)
df.show()

+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+--------+---------+-------------+---------+-------------+---------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+
|                 ts|               uid|      source_ip|source_port|        dest_ip|dest_port|proto|service| duration|orig_bytes|resp_bytes|conn_state| history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|    label|      detailed-label|                 dt|               hour|             minute|             second|       day|
+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+--------+---------+-------------+---------+-------------+---------+--------------------+-------------------+-------------------+-------------------+-------------------+----------+
|1.5261

In [4]:
df = df.withColumn("is_bad", F.when(F.col("label") != "Benign", 1).otherwise(0))

## Feature Engineering
Let's add some time-series features

In [5]:
# Example of rolling feature generation
df.withColumn(
    "activity_count_last_5m",
    F.count('source_ip').over(Window().partitionBy('source_ip').orderBy(F.col('dt').cast('long')).rangeBetween(-5*60,-1))
).show()

+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+-------+---------+-------------+---------+-------------+------+--------------+-------------------+-------------------+-------------------+-------------------+----------+------+----------------------+
|                 ts|               uid|      source_ip|source_port|        dest_ip|dest_port|proto|service| duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes| label|detailed-label|                 dt|               hour|             minute|             second|       day|is_bad|activity_count_last_5m|
+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+-------+---------+-------------+---------+-------------+------+--------------+-------------------+-------------------+-------------------+---------

In [6]:
# Lets create some custom functions
def create_custom_window(
    partition_by: str,
    timestamp_col: str,
    window_in_minutes: int,
):
    window = (
        Window()
        .partitionBy(partition_by)
        .orderBy(F.col(timestamp_col).cast('long'))
        .rangeBetween(-window_in_minutes*60, -1)
    )
    
    return window

def generate_rolling_aggregate(
    col: str,
    partition_by: str = None,
    operation: str = "count",
    timestamp_col: str = "dt",
    window_in_minutes: int = 1,
):
    if partition_by is None:
        partition_by = col
    
    if operation == "count":
        return F.count(col).over(
            create_custom_window(
                partition_by=partition_by,
                timestamp_col=timestamp_col,
                window_in_minutes=window_in_minutes
            )
        )
    elif operation == "sum":
        return F.sum(col).over(
            create_custom_window(
                partition_by=partition_by,
                timestamp_col=timestamp_col,
                window_in_minutes=window_in_minutes
            )
        )
    elif operation == "avg":
        return F.avg(col).over(
            create_custom_window(
                partition_by=partition_by,
                timestamp_col=timestamp_col,
                window_in_minutes=window_in_minutes
            )
        )
    else:
        raise ValueError(f"Operation '{operation}' is not defined.")

In [7]:
# Now we apply the custom feature engineering to create several new features.
# This cell won't take any time because it doesn't really apply to the dataframe. It just defines the calculations to take place.
# We'de need to apply a df.show() for example for it to really compute
df = df.withColumns({
    "source_ip_count_last_5m" : generate_rolling_aggregate(col='source_ip', partition_by='source_ip', operation='count', window_in_minutes=5),
    "source_ip_count_last_30m" : generate_rolling_aggregate(col='source_ip', partition_by='source_ip', operation='count', window_in_minutes=30),
    "source_port_count_last_5m" : generate_rolling_aggregate(col='source_port', partition_by='source_port', operation='count', window_in_minutes=5),
    "source_port_count_last_30m" : generate_rolling_aggregate(col='source_port', partition_by='source_port', operation='count', window_in_minutes=30),
    "dest_ip_count_last_5m" : generate_rolling_aggregate(col='dest_ip', partition_by='dest_ip', operation='count', window_in_minutes=5),
    "dest_ip_count_last_30m" : generate_rolling_aggregate(col='dest_ip', partition_by='dest_ip', operation='count', window_in_minutes=30),
    "dest_port_count_last_5m" : generate_rolling_aggregate(col='dest_port', partition_by='dest_port', operation='count', window_in_minutes=5),
    "dest_port_count_last_30m" : generate_rolling_aggregate(col='dest_port', partition_by='dest_port', operation='count', window_in_minutes=30),
    "source_ip_avg_pkts_last_5m": generate_rolling_aggregate(col='orig_pkts', partition_by='source_ip', operation='avg', window_in_minutes=5),
    "source_ip_avg_pkts_last_30m": generate_rolling_aggregate(col='orig_pkts', partition_by='source_ip', operation='avg', window_in_minutes=30),
    "source_ip_avg_bytes_last_5m": generate_rolling_aggregate(col='orig_ip_bytes', partition_by='source_ip', operation='avg', window_in_minutes=5),
    "source_ip_avg_pkts_last_30m": generate_rolling_aggregate(col='orig_ip_bytes', partition_by='source_ip', operation='avg', window_in_minutes=30),
})

In [8]:
output_dir_fe = r"C:\Users\gabyl\spark_outputs\feature_engineering"

df.write.mode("overwrite").partitionBy("day").parquet(output_dir)

In [48]:
df_feat_eng = spark.read.parquet(output_dir)
del df
df_feat_eng.show()

NameError: name 'df' is not defined

In [49]:
df_feat_eng.dtypes

[('ts', 'double'),
 ('uid', 'string'),
 ('source_ip', 'string'),
 ('source_port', 'double'),
 ('dest_ip', 'string'),
 ('dest_port', 'double'),
 ('proto', 'string'),
 ('service', 'string'),
 ('duration', 'double'),
 ('orig_bytes', 'double'),
 ('resp_bytes', 'double'),
 ('conn_state', 'string'),
 ('history', 'string'),
 ('orig_pkts', 'double'),
 ('orig_ip_bytes', 'double'),
 ('resp_pkts', 'double'),
 ('resp_ip_bytes', 'double'),
 ('label', 'string'),
 ('detailed-label', 'string'),
 ('dt', 'timestamp'),
 ('hour', 'timestamp'),
 ('minute', 'timestamp'),
 ('second', 'timestamp'),
 ('is_bad', 'int'),
 ('source_ip_count_last_5m', 'bigint'),
 ('source_ip_count_last_30m', 'bigint'),
 ('source_port_count_last_5m', 'bigint'),
 ('source_port_count_last_30m', 'bigint'),
 ('dest_ip_count_last_5m', 'bigint'),
 ('dest_ip_count_last_30m', 'bigint'),
 ('dest_port_count_last_5m', 'bigint'),
 ('dest_port_count_last_30m', 'bigint'),
 ('source_ip_avg_pkts_last_5m', 'double'),
 ('source_ip_avg_pkts_last_30m'

In [50]:
# Selecting the numerical and categorical feature names
numerical_cols = []
categorical_cols = []
for col, dtype in df_feat_eng.dtypes:
    if dtype in ['int', 'bigint', 'float', 'double', 'decimal']:
        numerical_cols.append(col)
    elif dtype == 'string':
        categorical_cols.append(col)
    

In [51]:
# Lets see what features have a small amount of unique values, because we don't want to process one-hot encode really long unique values
df_feat_eng.select([F.count_distinct(col) for col in categorical_cols]).show()

+-------------------+-------------------------+-----------------------+---------------------+-----------------------+--------------------------+-----------------------+---------------------+------------------------------+
|count(DISTINCT uid)|count(DISTINCT source_ip)|count(DISTINCT dest_ip)|count(DISTINCT proto)|count(DISTINCT service)|count(DISTINCT conn_state)|count(DISTINCT history)|count(DISTINCT label)|count(DISTINCT detailed-label)|
+-------------------+-------------------------+-----------------------+---------------------+-----------------------+--------------------------+-----------------------+---------------------+------------------------------+
|            1008748|                    15004|                 597107|                    3|                      5|                        11|                    126|                    2|                             2|
+-------------------+-------------------------+-----------------------+---------------------+-------------------

In [None]:
# Lets apply some innitial filering:
categorical_cols_to_drop = ['uid','source_ip','dest_ip','label', 'detailed-label']
categorical_cols = [col for col in categorical_cols if col not in categorical_cols_to_drop]

In [54]:
# Maybe we can reduce the number of unique values in each feature by filtering out the least frequent ones:
categorical_valid_values = {}
for col in categorical_cols:
    # Save all the classes that appear more than 100 times
    categorical_valid_values[col] = (
        df_feat_eng.groupby(col)
        .count()
        .filter(F.col("count") > 100)
        .select(col)
        .toPandas()
        .values.ravel()
    )
    
    # Transform all the classes that are NOT in the previous saved classes, to the class: 'Other'
    df_feat_eng = df_feat_eng.withColumn(
        col,
        F.when(F.col(col).isin(list(categorical_valid_values[col])), F.col(col)).otherwise(
            F.lit("Other").alias(col)
        )
    )

In [55]:
# Read again the count of classes in each feature, to see the effect of the previous transformation
df_feat_eng.select([F.count_distinct(col) for col in categorical_cols]).show()

+---------------------+-----------------------+--------------------------+-----------------------+
|count(DISTINCT proto)|count(DISTINCT service)|count(DISTINCT conn_state)|count(DISTINCT history)|
+---------------------+-----------------------+--------------------------+-----------------------+
|                    3|                      3|                         7|                     22|
+---------------------+-----------------------+--------------------------+-----------------------+



## Train-Test Split
We could do something like: df.randomSplit() But we want to need to split it by source IP adress to avoid data leakage

In [None]:
# Lets see the counts of malicious data for every source_ip in descending order
df_feat_eng.groupby("source_ip").agg(F.sum(F.col("is_bad")).alias("bad_sum")).orderBy("bad_sum", ascending=False).show(5)

+---------------+-------+
|      source_ip|bad_sum|
+---------------+-------+
|192.168.100.103| 539473|
|  219.250.49.64|      0|
|109.239.172.131|      0|
| 119.67.116.218|      0|
| 27.158.202.208|      0|
+---------------+-------+
only showing top 5 rows


In [None]:
# Training non-malicious IPs on 80%
train_ips = (
    df_feat_eng.where(
        ~F.col('source_ip').isin(['192.168.100.103'])
    )
    .select(F.col('source_ip'), F.lit(1).alias("is_train"))
    .dropDuplicates()
    .sample(0.8)
)

df_feat_eng = df_feat_eng.join(train_ips, on="source_ip", how="left")

+---------------+--------+
|      source_ip|is_train|
+---------------+--------+
|  219.250.49.64|       1|
| 183.216.15.199|       1|
|  190.156.17.15|       1|
|  189.162.19.47|       1|
|  97.124.43.191|       1|
| 197.46.229.113|       1|
|211.177.163.234|       1|
|  181.55.249.55|       1|
|173.243.119.238|       1|
|  59.127.248.68|       1|
|176.178.163.243|       1|
|154.202.106.183|       1|
| 92.210.138.127|       1|
|  91.186.56.243|       1|
| 91.248.210.207|       1|
|   2.203.249.61|       1|
| 23.245.244.115|       1|
|  95.230.57.222|       1|
|  106.92.73.245|       1|
|  88.22.220.215|       1|
+---------------+--------+
only showing top 20 rows


In [None]:
# We need to create the training and testing dataframes. We only have malicious data on the Source IP 192.168.100.103 (apparently)
# How can we separate the train and test dataframes so that both have an equal balance of malicious and non-malicious classes?
# We have to ensure we avoid data leakage by placing the same ip_address on both training and testing dataframes

In [70]:
malicious_ip_df = df_feat_eng.where(F.col("source_ip") == '192.168.100.103')
benign_ip_df = df_feat_eng.where(F.col("source_ip") != '192.168.100.103')

# Now split each df 20/80 for test/train
malicious_train_df, malicious_test_df = malicious_ip_df.randomSplit([0.8, 0.2], seed=42)
benign_train_df, benign_test_df = benign_ip_df.randomSplit([0.8, 0.2], seed=42)

# Now join the 
train_df = malicious_train_df.unionByName(benign_train_df)
test_df = malicious_test_df.unionByName(benign_test_df)

print("Training Data Distribution:")
train_df.groupBy('label').count().show()

print("Testing Data Distribution:")
test_df.groupBy('label').count().show()

Training Data Distribution:
+---------+------+
|    label| count|
+---------+------+
|Malicious|431605|
|   Benign|375811|
+---------+------+

Testing Data Distribution:
+---------+------+
|    label| count|
+---------+------+
|Malicious|107868|
|   Benign| 93464|
+---------+------+

