In [9]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import Window
from pyspark.sql import functions as F 
from typing import Union
import os

In [10]:
spark = SparkSession.builder.appName("anomaly")\
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [3]:
sc.getConf().getAll()

[('spark.driver.port', '57993'),
 ('spark.app.name', 'anomaly'),
 ('spark.executor.id', 'driver'),
 ('spark.app.startTime', '1706958770693'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.rdd

In [4]:
from pyspark.conf import SparkConf
#sc.setSystemProperty("spark.executor.memory", "2g")
sc.setSystemProperty("spark.driver.memory", "4g")

In [5]:
def preproc(filepath, output_path, save = True):

        files = os.listdir(filepath)
        print(os.path.realpath(files[0]))
        filepaths = [os.path.join(filepath, x) for x in files]

        spark = SparkSession.builder.appName("anomaly_detection").getOrCreate()
        preproc_df = spark.read.option('delimiter', "|").csv(filepaths, header = True)
        
        static_cols = ["local_orig", "local_resp", "missed_bytes", "tunnel_parents"]
        numerical_cols = [
                "duration",
                "orig_bytes",
                "resp_bytes",
                "orig_pkts",
                "orig_ip_bytes",
                "resp_pkts",
                "resp_ip_bytes"]

        categorical_cols = ["proto", "service", "conn_state","label"]

        preproc_df = preproc_df.withColumn(
                        "dt", F.from_unixtime("ts"))\
                        .withColumn("dt", F.to_timestamp("dt"))\
                .withColumnsRenamed(
                        {
                                "id.orig_h": "source_ip",
                                "id.orig_p": "source_port",
                                "id.resp_h": "destination_ip",
                                "id.resp_p": "destination_port",
                        })\
                .withColumns(
                        {
                                "day": F.date_trunc("day", F.col("dt")),
                                "hour": F.date_trunc("hour", F.col("dt")),
                                "minute": F.date_trunc("minute", F.col("dt")),
                                "second": F.date_trunc("second", F.col("dt"))
                        })\
                .drop(*static_cols)\
                .replace("-", None)\
                .withColumns({x: F.col(x).cast("double") for x in numerical_cols})\
                .fillna({x: 'missing' for x in categorical_cols})\
                .fillna({x: -999999 for x in numerical_cols})
        if save == True:
                preproc_df.write.parquet(output_path, mode = 'overwrite')
        return preproc_df

def mult_60(mins):
    """
    This function in essence multiplies by 60.
    Useful for time conversions minutes to seconds, hours to minutes
    """
    return mins * 60

def generate_window(window_in_minutes:int,
                    partition_by:str,
                    timestamp_col:str):
    """This function generates a window, the column specified in orderBy
    is propagated to rangeBetween, Window can also work without orderBy"""
    window = (
        Window().partitionBy(F.col(partition_by))\
        .orderBy(F.col(timestamp_col).cast("long"))\
        .rangeBetween(-mult_60(window_in_minutes), -1))
    return window

#difference between partition by and group by
#- partition by adds column, group by aggregates column
#- partition by examines in more detail with features like 
#rows between, and range between

def generate_rolling_aggregate(col:str,
                                partition_by: Union[str,None] = None,
                                operation: str = "count", 
                                timestamp_col:str = "dt",
                                window_in_minutes:int = 1,):
    if partition_by == None :
        partition_by = col
    
    if operation == "count":
        return F.count(F.col(col)).over(
            generate_window(window_in_minutes = window_in_minutes,
                            partition_by=partition_by,
                            timestamp_col= timestamp_col))
    elif operation == "sum":
        return F.sum(F.col(col)).over(
            generate_window(window_in_minutes = window_in_minutes,
                            partition_by=partition_by,
                            timestamp_col= timestamp_col))
    elif operation == "avg":
        return F.avg(F.col(col)).over(
            generate_window(window_in_minutes = window_in_minutes,
                            partition_by=partition_by,
                            timestamp_col= timestamp_col))
    else:
        raise ValueError(f"Operation {operation} is not defined")

## Before features were saved

In [6]:
train = spark.read.parquet("../data/cleaned/cleaned.pq")
train.show(5)

                                                                                

+-----------------+------------------+-------------+-----------+---------------+----------------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+--------------------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|               ts|               uid|    source_ip|source_port| destination_ip|destination_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|               label|detailed-label|                 dt|                day|               hour|             minute|             second|
+-----------------+------------------+-------------+-----------+---------------+----------------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+--------------------+--------------+-------------------+-------------------+-------------------+---

In [4]:
train.columns

['ts',
 'uid',
 'source_ip',
 'source_port',
 'destination_ip',
 'destination_port',
 'proto',
 'service',
 'duration',
 'orig_bytes',
 'resp_bytes',
 'conn_state',
 'history',
 'orig_pkts',
 'orig_ip_bytes',
 'resp_pkts',
 'resp_ip_bytes',
 'label',
 'detailed-label',
 'dt',
 'day',
 'hour',
 'minute',
 'second']

### Train data

In [7]:
train = train.withColumns(
    {
        "source_ip_count_last_min": generate_rolling_aggregate(col ="source_ip", operation="count",timestamp_col="dt",window_in_minutes=1),
        "source_ip_count_last_30_min": generate_rolling_aggregate(col ="source_ip", operation="count",timestamp_col="dt",window_in_minutes=30),
    }
)

In [5]:
train.show()

24/02/02 15:47:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 4:>                                                          (0 + 1) / 1]

+-----------------+------------------+---------------+-----------+--------------+----------------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------------+---------------------------+
|               ts|               uid|      source_ip|source_port|destination_ip|destination_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes| label|detailed-label|                 dt|                day|               hour|             minute|             second|source_ip_count_last_min|source_ip_count_last_30_min|
+-----------------+------------------+---------------+-----------+--------------+----------------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+------+-----------

                                                                                

In [8]:
train.write.parquet("../data/features/train_features.pq", mode = "overwrite")

                                                                                

In [1]:
import gc

gc.collect()

3

### Test data

In [8]:
test = spark.read.parquet("../data/cleaned/test.pq")

In [9]:
test = test.withColumns(
    {
        "source_ip_count_last_min": generate_rolling_aggregate(col ="source_ip", operation="count",timestamp_col="dt",window_in_minutes=1),
    }
)

test.write.parquet("../data/features/test_features.pq", mode="overwrite")

                                                                                

In [10]:
test = spark.read.parquet("../data/features/test_features.pq")

In [None]:
test = test.withColumns(
    {
        "source_ip_count_last_30_min": generate_rolling_aggregate(col ="source_ip", operation="count",timestamp_col="dt",window_in_minutes=30),
    }
)
test.write.parquet("../data/features/test_features.pq", mode="overwrite")

Was unable to load fresh data as a test set due to memory limitations. My solve for this was to filter the train dataset by certain Ips and location

## Reading in features

In [8]:
train = spark.read.parquet("../data/features/train_features.pq")

In [None]:
train = train.withColumns(
    {
        "source_ip_avg_bytes_last_min": generate_rolling_aggregate(col="orig_ip_bytes", operation="avg",timestamp_col="dt",window_in_minutes=1),
        "source_ip_avg_bytes_last_30_min": generate_rolling_aggregate(col="orig_ip_bytes", operation="avg",timestamp_col="dt",window_in_minutes=30)
    }
)
train.write.parquet("../data/cleaned/engineered_feature_2.pq", mode = "overwrite")
train.show()

In [None]:
train.filter(F.col("source_ip_count_last_min") > 1000).show()

Due to limited space, on device and on machine, It led to out of memory heapspace error. My fix for this was to update my Java version and set executable environments. Additionally I computed in batches, to reduce use in memory and wrote to parquet after features were engineered. or use a bigger machine in the cloud, and practice the use of a cluster

learnt about cache and persist. 

#https://medium.com/illumination/managing-memory-and-disk-resources-in-pyspark-with-cache-and-persist-aa08026929e2#:~:text=uncache()%20%3A%20This%20method%20is,if%20it%20is%20needed%20again.

## Other uncomputed features --

In [None]:
#Other uncomputed features

"dest_ip_count_last_min": generate_rolling_aggregate(col="destination_ip", operation="count",timestamp_col="dt",window_in_minutes=1),
"dest_ip_count_last_30_min": generate_rolling_aggregate(col="destination_port", operation="count",timestamp_col="dt",window_in_minutes=30),
"dest_port_count_last_min": generate_rolling_aggregate(col="destination_ip", operation="count",timestamp_col="dt",window_in_minutes=1),
"dest_port_count_last_30_min": generate_rolling_aggregate(col="destination_port", operation="count",timestamp_col="dt",window_in_minutes=30),
"source_ip_avg_pkts_last_min": generate_rolling_aggregate(col="orig_pkts",partition_by="source_ip", operation="avg",timestamp_col="dt",window_in_minutes=1),
"source_ip_avg_pkts_last_30_min": generate_rolling_aggregate(col="orig_pkts",partition_by="source_ip",operation="avg",timestamp_col="dt",window_in_minutes=30),

In [5]:
train.schema

StructType([StructField('ts', StringType(), True), StructField('uid', StringType(), True), StructField('source_ip', StringType(), True), StructField('source_port', StringType(), True), StructField('destination_ip', StringType(), True), StructField('destination_port', StringType(), True), StructField('proto', StringType(), True), StructField('service', StringType(), True), StructField('duration', DoubleType(), True), StructField('orig_bytes', DoubleType(), True), StructField('resp_bytes', DoubleType(), True), StructField('conn_state', StringType(), True), StructField('history', StringType(), True), StructField('orig_pkts', DoubleType(), True), StructField('orig_ip_bytes', DoubleType(), True), StructField('resp_pkts', DoubleType(), True), StructField('resp_ip_bytes', DoubleType(), True), StructField('label', StringType(), True), StructField('detailed-label', StringType(), True), StructField('dt', TimestampType(), True), StructField('day', TimestampType(), True), StructField('hour', Times

In [11]:
label = train.select("label")
label

DataFrame[label: string]

## Pipeline

In [16]:
numerical_features = ['duration', 'orig_bytes', 'resp_bytes',
                    'orig_pkts','orig_ip_bytes','resp_pkts',
                    'resp_ip_bytes', 'source_ip_count_last_min',
                    'source_ip_count_last_30_min']

categorical_features = ["proto", "service", "conn_state"]

In [17]:
#Ignoring IP so that model can be make decision based on features not including IP

categorical_features_indexed = [c + "_index" for c in categorical_features]
input_features = numerical_features + categorical_features_indexed

In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [62]:
train.groupby("label").count().show()



+----------+-------+
|orig_bytes|  count|
+----------+-------+
|     147.0|      1|
|     576.0|      3|
|     160.0|      2|
|       0.0|3390652|
|     112.0|      4|
|     280.0|      7|
|      88.0|      3|
|    2364.0|      1|
|      29.0|      1|
|      96.0|   1375|
|      80.0|      2|
|     120.0|    936|
|     272.0|      1|
|     104.0|     38|
|       1.0|      2|
|     421.0|     10|
|      25.0|      7|
|     992.0|      1|
|     149.0|      7|
|    1237.0|      1|
+----------+-------+
only showing top 20 rows



                                                                                

24/02/03 02:07:08 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

On Memory management with Spark 

https://www.clairvoyant.ai/blog/apache-spark-out-of-memory-issue

In [13]:
train = train.withColumn(
    "good", F.when(F.col("label") == 'Benign', 1).otherwise(0)
)

In [41]:
train.select([F.countDistinct(F.col(c)) for c in categorical_features]).show()



+---------------------+-----------------------+--------------------------+-----------------------+
|count(DISTINCT proto)|count(DISTINCT service)|count(DISTINCT conn_state)|count(DISTINCT history)|
+---------------------+-----------------------+--------------------------+-----------------------+
|                    3|                      3|                        10|                     53|
+---------------------+-----------------------+--------------------------+-----------------------+



                                                                                

In [18]:
indexer = StringIndexer(inputCols= categorical_features, outputCols=categorical_features_indexed, handleInvalid='skip')
one_hot_encoder = OneHotEncoder(inputCols= categorical_features_indexed, outputCols = categorical_features_indexed, handleInvalid='skip')
assembler = VectorAssembler(inputCols=input_features, outputCol="features", handleInvalid="skip")

randomforest = RandomForestClassifier(featuresCol="features", labelCol = "good", numTrees=100)
pipeline = Pipeline(stages=[indexer, assembler, randomforest])

In [19]:
t_train = train.where(~F.col("source_ip").like("10%"))
test = train.where(F.col("source_ip").like("10%"))

#t_train.show()
pipeline = pipeline.fit(t_train)

                                                                                

In [20]:
pred = pipeline.transform(test)

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

roc = BinaryClassificationEvaluator(labelCol="good", metricName="areaUnderROC")
print("ROC AUC", roc.evaluate(pred))

pr = BinaryClassificationEvaluator(labelCol="good", metricName="areaUnderPR")
print("PR AUC", pr.evaluate(pred))

pred

ROC AUC 1.0
PR AUC 1.0


DataFrame[ts: string, uid: string, source_ip: string, source_port: string, destination_ip: string, destination_port: string, proto: string, service: string, duration: double, orig_bytes: double, resp_bytes: double, conn_state: string, history: string, orig_pkts: double, orig_ip_bytes: double, resp_pkts: double, resp_ip_bytes: double, label: string, detailed-label: string, dt: timestamp, day: timestamp, hour: timestamp, minute: timestamp, second: timestamp, source_ip_count_last_min: bigint, source_ip_count_last_30_min: bigint, good: int, proto_index: double, service_index: double, conn_state_index: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

Result is very good, because of limited datapoints in test dataset. We will test on more datapoints when it can loaded in memory

In [23]:
import pandas as pd

pd.DataFrame(
    {
        "importance": list(pipeline.stages[-1].featureImportances),
        "feature": pipeline.stages[-2].getInputCols(),
    }
).sort_values("importance", ascending=False)

Unnamed: 0,importance,feature
9,0.354916,proto_index
1,0.321276,orig_bytes
11,0.219281,conn_state_index
2,0.041969,resp_bytes
4,0.039945,orig_ip_bytes
6,0.015788,resp_ip_bytes
5,0.003505,resp_pkts
0,0.001123,duration
3,0.000976,orig_pkts
8,0.00075,source_ip_count_last_30_min


24/02/03 16:33:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE