### Anomaly Detection in Server Logs
#### CS 5614: Big Data Engineering
#### By: Vanessa Eichensehr and Bradley Freedman


**Project Objective:**  

Build a machine learning-based model that detects anomalies on a high volume high velocity log base. 

In [1]:
# Start Spark session

from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-kafka-streaming").\
        master("spark://spark-master:7077").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
        config("spark.executor.memory", "512m").\
        getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f874452a-c159-4902-b295-75125162178f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 565ms :: artifacts dl 20

**Step 1:** Load labeled training data

In [2]:
training_df = spark.read.csv("/data/synthetic_with_anomalies_NEW.csv", header=True, inferSchema=True)
training_df.show(5)

                                                                                

+-------------+--------------------+------+--------------------+------+------+--------------------+--------------------+---------+-------------+
|           ip|                time|method|                path|status|  size|            referrer|          user_agent|anomalous|     category|
+-------------+--------------------+------+--------------------+------+------+--------------------+--------------------+---------+-------------+
| 54.36.149.86|2019-01-22 03:59:...|   GET|/image/8739/speci...|   200|179495|https://www.zanbi...|Mozilla/5.0 (X11;...|        0|not anomalous|
| 5.62.206.249|2019-01-22 03:59:...|   GET|      /settings/logo|   302|159434|https://www.zanbi...|Mozilla/5.0 (Wind...|        0|not anomalous|
| 54.36.149.16|2019-01-22 03:59:...|   GET|/product/4031/20/...|   302| 74066|https://www.zanbi...|Dalvik/2.1.0 (Lin...|        0|not anomalous|
|66.249.66.195|2019-01-22 03:59:...|   GET|/image/65274/prod...|   302| 66812|https://www-zanbi...|Mozilla/5.0 (Linu...|        0|

**Step 2:** Pre-process features 

In [3]:
!pip install numpy

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [4]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import unix_timestamp

# use UNIX time so it can be an integer
training_df = training_df.withColumn(
    "unix_time", 
    unix_timestamp("time", "yyyy-MM-dd HH:mm:ss.SSS")
)

# Define all categorical fields to index
categorical_cols = ["ip", "method", "path", "referrer", "user_agent"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_idx", handleInvalid="keep") for col in categorical_cols]

# Update VectorAssembler to include it
assembler = VectorAssembler(
    inputCols=["status", "size", "unix_time"] + [col + "_idx" for col in ["ip", "method", "path", "referrer", "user_agent"]],
    outputCol="features"
)

# note that REQUEST_TYPE = method
# note that REQUEST ARGUMENT = path 

label_indexer = StringIndexer(inputCol="anomalous", outputCol="label")  # for binary classification
# or: StringIndexer(inputCol="category", outputCol="label") for multi-class

print(training_df.columns)

# Combine steps
pipeline = Pipeline(stages=indexers + [assembler, label_indexer])
pipeline_model = pipeline.fit(training_df)
print("Pipeline fit completed")
pipeline_model.save("/data/pipeline_model_2")
print("Pipeline saved")

log_transformed = pipeline_model.transform(training_df)

# show whole dataframe
log_transformed.show(5)

# show features only 
log_transformed.select("features").show(truncate=False)

['ip', 'time', 'method', 'path', 'status', 'size', 'referrer', 'user_agent', 'anomalous', 'category', 'unix_time']


                                                                                

Pipeline fit completed


                                                                                

Pipeline saved
+-------------+--------------------+------+--------------------+------+------+--------------------+--------------------+---------+-------------+----------+------+----------+--------+------------+--------------+--------------------+-----+
|           ip|                time|method|                path|status|  size|            referrer|          user_agent|anomalous|     category| unix_time|ip_idx|method_idx|path_idx|referrer_idx|user_agent_idx|            features|label|
+-------------+--------------------+------+--------------------+------+------+--------------------+--------------------+---------+-------------+----------+------+----------+--------+------------+--------------+--------------------+-----+
| 54.36.149.86|2019-01-22 03:59:...|   GET|/image/8739/speci...|   200|179495|https://www.zanbi...|Mozilla/5.0 (X11;...|        0|not anomalous|1548129551|  79.0|       0.0|    59.0|        11.0|          25.0|[200.0,179495.0,1...|  0.0|
| 5.62.206.249|2019-01-22 03:59:.

**Step 3:** Split into train and test sets (use random seed to get consistent results)

In [6]:
train_data, test_data = log_transformed.randomSplit([0.8, 0.2], seed=42)

**Step 4:** Train a random forest classifier to do binary classification

In [7]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    maxBins=1024  # default is 32, need to override it
)
rf_model = rf.fit(train_data)

                                                                                

**Step 5:** Make predictions on test set

In [8]:
predictions = rf_model.transform(test_data)

**Step 6:** Evaluate model's performance

In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, when

# Compute AUC
auc_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = auc_evaluator.evaluate(predictions)

# Get predicted label and actual
predicted = predictions.select(
    col("label").cast("integer"),
    col("prediction").cast("integer")
)

# Calculate confusion matrix components
tp = predicted.filter("label = 1 AND prediction = 1").count()
tn = predicted.filter("label = 0 AND prediction = 0").count()
fp = predicted.filter("label = 0 AND prediction = 1").count()
fn = predicted.filter("label = 1 AND prediction = 0").count()

# Derived metrics
accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) else 0
precision = tp / (tp + fp) if (tp + fp) else 0
recall = tp / (tp + fn) if (tp + fn) else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0

# Print all metrics
print(f"AUC:       {auc:.3f}")
print(f"Accuracy:  {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall:    {recall:.3f}")
print(f"F1 Score:  {f1:.3f}")

                                                                                

AUC:       0.926
Accuracy:  0.978
Precision: 0.811
Recall:    0.730
F1 Score:  0.768


**Step 8:** Save model for future use

In [10]:
rf_model.save('/data/random_forest_binary_classifier')

**Step 9:** 
Create a streaming DataFrame in Spark that reads data from a Kafka topic named "topic_test" and starts
processing from the beginning of the topic's log using the earliest available offset. Use kafka:9093 as the bootstrap
server.

In [11]:
df_streamed_raw = (spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "topic_test").load())

In [12]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

# convert byte stream to string
df_streamed_kv = (df_streamed_raw
    .withColumn("key", df_streamed_raw["key"].cast(StringType()))
    .withColumn("value", df_streamed_raw["value"].cast(StringType())))

test_query = (df_streamed_kv 
              .writeStream \
              .format("memory") # output to memory \
              .outputMode("update") # only write updated rows to the sink \
              .queryName("test_query_table")  # Name of the in memory table \
              .start())

25/05/06 02:28:54 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9f21a094-9c0d-4255-a544-c99cfe4a38e7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

The following cell should display a table populated with values being streamed from you Kafka producer.

In [13]:
spark.sql("select * from test_query_table").show()

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|1255|{"ip_address": "5...|topic_test|        0|117856|2025-05-06 02:28:...|            0|
|1256|{"ip_address": "5...|topic_test|        0|117857|2025-05-06 02:28:...|            0|
|1257|{"ip_address": "5...|topic_test|        0|117858|2025-05-06 02:28:...|            0|
|1258|{"ip_address": "1...|topic_test|        0|117859|2025-05-06 02:28:...|            0|
|1259|{"ip_address": "5...|topic_test|        0|117860|2025-05-06 02:28:...|            0|
|1260|{"ip_address": "5...|topic_test|        0|117861|2025-05-06 02:28:...|            0|
|1261|{"ip_address": "6...|topic_test|        0|117862|2025-05-06 02:28:...|            0|
|1262|{"ip_address": "6...|topic_test|        0|117863|2025-05-06 02:28:...|            0|

In [14]:
test_query.stop()

25/05/06 02:29:19 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5a9657fe is aborting.
25/05/06 02:29:19 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5a9657fe aborted.
25/05/06 02:29:19 WARN TaskSetManager: Lost task 0.0 in stage 272.0 (TID 272, 172.18.0.7, executor 1): TaskKilled (Stage cancelled)


The following cells contain code that take the streamed dataframe and formats it properly into a table. If any of the given cells fails, there might be a formatting issue with one of your previous solutions. 

In [15]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

event_schema = StructType([
    StructField("ip_address", StringType()),
    StructField("date_time", StringType()),
    StructField("request_type", StringType()),
    StructField("request_arg", StringType()),
    StructField("status_code", StringType()),
    StructField("response_size", StringType()),
    StructField("referrer", StringType()),
    StructField("user_agent", StringType())
])

# Parse the events from JSON format
df_parsed = (df_streamed_kv
           # Sets schema for event data
           .withColumn("value", from_json("value", event_schema))
          )

In [16]:
df_formatted = (df_parsed.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.ip_address").alias("ip_address")
    ,col("value.date_time").alias("date_time")
    ,col("value.request_type").alias("request_type")
    ,col("value.request_arg").alias("request_arg")
    ,col("value.status_code").alias("status_code")
    ,col("value.response_size").cast(IntegerType()).alias("response_size")
    ,col("value.referrer").alias("referrer")
    ,col("value.user_agent").alias("user_agent")
))

In [17]:
# Write the parsed data to console
query = (df_formatted.writeStream.format("console").outputMode("append").trigger(processingTime='5 seconds').start())

25/05/06 02:29:33 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b91db047-3f1d-4342-9d6c-9095cefb26e1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+----------+---------+------------+-----------+-----------+-------------+--------+----------+
|event_key|event_topic|event_timestamp|ip_address|date_time|request_type|request_arg|status_code|response_size|referrer|user_agent|
+---------+-----------+---------------+----------+---------+------------+-----------+-----------+-------------+--------+----------+
+---------+-----------+---------------+----------+---------+------------+-----------+-----------+-------------+--------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+--------------------+-------------+--------------------+------------+--------------------+-----------+-------------+--------------------+--------------------+
|event_key|event_topic|     event_timestamp|   ip_address|           date_time|request_type|   

In [18]:
# Print the name of active streams (This may be useful during debugging)
for s in spark.streams.active:
    print(f"ID:{s.id} | NAME:{s.name}")

ID:8fdffd46-7106-4170-b487-8212e479fa32 | NAME:None


In [19]:
query.stop()

**Step 10:** Batch/Rolling window incoming data, make predictions on it 

In [20]:
from pyspark.ml import PipelineModel
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.sql.functions import *
from pyspark.sql.functions import unix_timestamp

kafka_timestamp_format = "dd/MMM/yyyy:HH:mm:ss Z"

# Convert date_time from string to timestamp using the correct format
df_formatted = df_formatted.withColumn(
    "event_time",
    coalesce(
        to_timestamp(col("date_time"), kafka_timestamp_format),
    )
)

# Filter out rows where timestamp parsing failed (resulted in null)
df_formatted = df_formatted.filter(col("event_time").isNotNull())

# create windows
windowed_df = df_formatted.withWatermark("event_time", "10 seconds")

# adjust names to match the dataset used for training
renamed_df = windowed_df.selectExpr(
    "ip_address as ip",
    "request_type as method",
    "request_arg as path",
    "status_code as status",
    "response_size as size",
    "referrer",
    "user_agent",
    "event_time"
)

# convert data to ints and calculate unix_time
renamed_df = renamed_df \
    .withColumn("status", coalesce(col("status").cast("int"), lit(0))) \
    .withColumn("size", coalesce(col("size").cast("int"), lit(0))) \
    .withColumn("unix_time", unix_timestamp("event_time"))

# Filter again for safety before applying models, ensuring unix_time is not null
renamed_df = renamed_df.filter(col("unix_time").isNotNull())

# Load transformation pipeline and classifier
pipeline_model = PipelineModel.load("/data/pipeline_model_1")
rf_model = RandomForestClassificationModel.load("/data/random_forester_binary_classifier")
transformed_df = pipeline_model.transform(renamed_df)

# make predictions
predictions = rf_model.transform(transformed_df)

# Get only anomalies
anomalies_df = predictions.filter(col("prediction") == 1.0)

# Print full rows if anomalies are detected
query = anomalies_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start()

25/05/06 02:30:07 WARN StringIndexerModel: Input column anomalous does not exist during transformation. Skip StringIndexerModel for this column.
25/05/06 02:30:07 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4dd22472-17c4-43dd-92a6-d6a862c3e7a1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+----+------+----+--------+----------+----------+---------+------+----------+--------+------------+--------------+--------+-------------+-----------+----------+
|ip |method|path|status|size|referrer|user_agent|event_time|unix_time|ip_idx|method_idx|path_idx|referrer_idx|user_agent_idx|features|rawPrediction|probability|prediction|
+---+------+----+------+----+--------+----------+----------+---------+------+----------+--------+------------+--------------+--------+-------------+-----------+----------+
+---+------+----+------+----+--------+----------+----------+---------+------+----------+--------+------------+--------------+--------+-------------+-----------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+----+------+----+--------+----------+----------+---------+------+----------+--------+------------+-------

In [21]:
query.stop()

25/05/06 02:30:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@2d967e5 is aborting.
25/05/06 02:30:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@2d967e5 aborted.
25/05/06 02:30:31 WARN TaskSetManager: Lost task 0.0 in stage 454.0 (TID 460, 172.18.0.6, executor 0): TaskKilled (Stage cancelled)
