# 3. Streaming application using Spark Structured Streaming

### 1. SparkSession is created using a SparkConf object, which would use two local cores with a proper application name, and use UTC as the timezone

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml import PipelineModel


spark = SparkSession \
    .builder \
    .appName("Assignment 2B") \
    .getOrCreate()

### 2. From the Kafka producers in Task 1.1 and 1.2, ingest the streaming data into Spark Streaming for both process and memory activities

In [2]:
topic = "Streaming_Linux_process"
process_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", topic) \
  .load()

In [3]:
process_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
topic = "Streaming_Linux_memory"
memory_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", topic) \
  .load()

In [5]:
memory_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### 3. Then the streaming data format should be transformed into the proper formats following the metadata file schema for both process and memory, similar to assignment 2A

### Memory activity

In [6]:
query = memory_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [7]:
query.stop()

In [8]:
memory_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
memory_schema = ArrayType(StructType([
    StructField("sequence", StringType(),True),
    StructField("machine", StringType(),True),
    StructField("PID", StringType(),True),
    StructField("MINFLT", StringType(),True),
    StructField("MAJFLT", StringType(),True),
    StructField("VSTEXT", StringType(),True),
    StructField("VSIZE", StringType(),True),
    StructField("RSIZE", StringType(),True),
    StructField("VGROW", StringType(),True),
    StructField("RGROW", StringType(),True),
    StructField("MEM", StringType(),True),
    StructField("CMD", StringType(),True),
    StructField("ts", StringType(),True)
]))


In [10]:
#handling JSON format array
memory_df = memory_df.select(F.from_json(F.col("value").cast("string"), memory_schema).alias('parsed_value'))

In [11]:
memory_df.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sequence: string (nullable = true)
 |    |    |-- machine: string (nullable = true)
 |    |    |-- PID: string (nullable = true)
 |    |    |-- MINFLT: string (nullable = true)
 |    |    |-- MAJFLT: string (nullable = true)
 |    |    |-- VSTEXT: string (nullable = true)
 |    |    |-- VSIZE: string (nullable = true)
 |    |    |-- RSIZE: string (nullable = true)
 |    |    |-- VGROW: string (nullable = true)
 |    |    |-- RGROW: string (nullable = true)
 |    |    |-- MEM: string (nullable = true)
 |    |    |-- CMD: string (nullable = true)
 |    |    |-- ts: string (nullable = true)



In [12]:
memory_df = memory_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  

In [13]:
memory_df.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- sequence: string (nullable = true)
 |    |-- machine: string (nullable = true)
 |    |-- PID: string (nullable = true)
 |    |-- MINFLT: string (nullable = true)
 |    |-- MAJFLT: string (nullable = true)
 |    |-- VSTEXT: string (nullable = true)
 |    |-- VSIZE: string (nullable = true)
 |    |-- RSIZE: string (nullable = true)
 |    |-- VGROW: string (nullable = true)
 |    |-- RGROW: string (nullable = true)
 |    |-- MEM: string (nullable = true)
 |    |-- CMD: string (nullable = true)
 |    |-- ts: string (nullable = true)



In [14]:
#change the names of the columns accordingly
memory_df_formatted = memory_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.MINFLT").alias("MINFLT"),
                    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
                    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
                    F.col("unnested_value.VSIZE").alias("VSIZE"),
                    F.col("unnested_value.RSIZE").alias("RSIZE"),
                    F.col("unnested_value.VGROW").alias("VGROW"),
                    F.col("unnested_value.RGROW").alias("RGROW"),
                    F.col("unnested_value.MEM").alias("MEM"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [15]:
memory_df_formatted.printSchema()

root
 |-- sequence: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- MINFLT: string (nullable = true)
 |-- MAJFLT: string (nullable = true)
 |-- VSTEXT: string (nullable = true)
 |-- VSIZE: string (nullable = true)
 |-- RSIZE: string (nullable = true)
 |-- VGROW: string (nullable = true)
 |-- RGROW: string (nullable = true)
 |-- MEM: string (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: string (nullable = true)



In [16]:
query2 = memory_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [17]:
query2.stop()

In [18]:
### Cleaning and transforming the columns into proper format
memory_df_formatted = memory_df_formatted.withColumn('MINFLT', regexp_replace('MINFLT', 'K', '000'))
memory_df_formatted = memory_df_formatted.withColumn('MAJFLT', regexp_replace('MAJFLT', 'M', '00000'))
memory_df_formatted = memory_df_formatted.withColumn('VSTEXT', regexp_replace('VSTEXT', 'K', '000'))
memory_df_formatted = memory_df_formatted.withColumn('RSIZE', regexp_replace('RSIZE', 'K', '000'))
memory_df_formatted = memory_df_formatted.withColumn('RSIZE', regexp_replace('RSIZE', 'M', '00000'))
memory_df_formatted = memory_df_formatted.withColumn('VGROW', regexp_replace('VGROW', 'K', '000'))
memory_df_formatted = memory_df_formatted.withColumn('RGROW', regexp_replace('RGROW', 'K', '000'))

In [19]:
memory_df_formatted.printSchema()

root
 |-- sequence: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- MINFLT: string (nullable = true)
 |-- MAJFLT: string (nullable = true)
 |-- VSTEXT: string (nullable = true)
 |-- VSIZE: string (nullable = true)
 |-- RSIZE: string (nullable = true)
 |-- VGROW: string (nullable = true)
 |-- RGROW: string (nullable = true)
 |-- MEM: string (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: string (nullable = true)



In [20]:
#cast the columns to their proper datatypes
memory_df_formatted = memory_df_formatted.withColumn("sequence", memory_df_formatted["sequence"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("machine", memory_df_formatted["machine"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("PID", memory_df_formatted["PID"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("MINFLT", memory_df_formatted["MINFLT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("MAJFLT", memory_df_formatted["MAJFLT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("VSTEXT", memory_df_formatted["VSTEXT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("VSIZE", memory_df_formatted["VSIZE"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("RSIZE", memory_df_formatted["RSIZE"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("RGROW", memory_df_formatted["RGROW"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("RGROW", memory_df_formatted["MEM"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("ts", memory_df_formatted["ts"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("MEM", memory_df_formatted["MEM"].cast(IntegerType()))

In [21]:
memory_df_formatted = memory_df_formatted.withColumn('VGROW',regexp_replace('VGROW',' ',''))
memory_df_formatted = memory_df_formatted.withColumn("VGROW", memory_df_formatted["VGROW"].cast(DoubleType()))

In [22]:
memory_df_formatted.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- MINFLT: integer (nullable = true)
 |-- MAJFLT: integer (nullable = true)
 |-- VSTEXT: integer (nullable = true)
 |-- VSIZE: double (nullable = true)
 |-- RSIZE: double (nullable = true)
 |-- VGROW: double (nullable = true)
 |-- RGROW: double (nullable = true)
 |-- MEM: integer (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



### Process activity

In [23]:
query = process_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [24]:
query.stop()

In [25]:
#Define the schema for the structured datastream received
schema = ArrayType(StructType([
    StructField("sequence", StringType(),True),
    StructField("machine", StringType(),True),
    StructField("PID", StringType(),True),
    StructField("TRUN", StringType(),True),
    StructField("TSLPI", StringType(),True),
    StructField("TSLPU", StringType(),True),
    StructField("POLI", StringType(),True),
    StructField("NICE", StringType(),True),
    StructField("PRI", StringType(),True),
    StructField("RTPR", StringType(),True),
    StructField("CPUNR", StringType(),True),
    StructField("Status", StringType(),True),
    StructField("EXC", StringType(),True),
    StructField("State", StringType(),True),
    StructField("CPU", StringType(),True),
    StructField("CMD", StringType(),True),
    StructField("ts", StringType(),True)
]))

In [26]:
#handling JSON format array
process_df = process_df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))

In [27]:
process_df.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sequence: string (nullable = true)
 |    |    |-- machine: string (nullable = true)
 |    |    |-- PID: string (nullable = true)
 |    |    |-- TRUN: string (nullable = true)
 |    |    |-- TSLPI: string (nullable = true)
 |    |    |-- TSLPU: string (nullable = true)
 |    |    |-- POLI: string (nullable = true)
 |    |    |-- NICE: string (nullable = true)
 |    |    |-- PRI: string (nullable = true)
 |    |    |-- RTPR: string (nullable = true)
 |    |    |-- CPUNR: string (nullable = true)
 |    |    |-- Status: string (nullable = true)
 |    |    |-- EXC: string (nullable = true)
 |    |    |-- State: string (nullable = true)
 |    |    |-- CPU: string (nullable = true)
 |    |    |-- CMD: string (nullable = true)
 |    |    |-- ts: string (nullable = true)



In [28]:
process_df = process_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      

In [29]:
process_df.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- sequence: string (nullable = true)
 |    |-- machine: string (nullable = true)
 |    |-- PID: string (nullable = true)
 |    |-- TRUN: string (nullable = true)
 |    |-- TSLPI: string (nullable = true)
 |    |-- TSLPU: string (nullable = true)
 |    |-- POLI: string (nullable = true)
 |    |-- NICE: string (nullable = true)
 |    |-- PRI: string (nullable = true)
 |    |-- RTPR: string (nullable = true)
 |    |-- CPUNR: string (nullable = true)
 |    |-- Status: string (nullable = true)
 |    |-- EXC: string (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- CPU: string (nullable = true)
 |    |-- CMD: string (nullable = true)
 |    |-- ts: string (nullable = true)



In [30]:
#change the names of the columns accordingly
process_df_formatted = process_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.TRUN").alias("TRUN"),
                    F.col("unnested_value.TSLPI").alias("TSLPI"),
                    F.col("unnested_value.TSLPU").alias("TSLPU"),
                    F.col("unnested_value.POLI").alias("POLI"),
                    F.col("unnested_value.NICE").alias("NICE"),
                    F.col("unnested_value.PRI").alias("PRI"),
                    F.col("unnested_value.RTPR").alias("RTPR"),
                    F.col("unnested_value.CPUNR").alias("CPUNR"),
                    F.col("unnested_value.Status").alias("Status"),
                    F.col("unnested_value.EXC").alias("EXC"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.CPU").alias("CPU"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [31]:
process_df_formatted.printSchema()

root
 |-- sequence: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- TRUN: string (nullable = true)
 |-- TSLPI: string (nullable = true)
 |-- TSLPU: string (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: string (nullable = true)
 |-- PRI: string (nullable = true)
 |-- RTPR: string (nullable = true)
 |-- CPUNR: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: string (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: string (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: string (nullable = true)



In [32]:
query = process_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [33]:
query.stop()

In [34]:
#cast the columns to their proper datatypes
process_df_formatted = process_df_formatted.withColumn("sequence", process_df_formatted["sequence"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("machine", process_df_formatted["machine"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("PID", process_df_formatted["PID"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TRUN", process_df_formatted["TRUN"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TSLPI", process_df_formatted["TSLPI"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TSLPU", process_df_formatted["TSLPU"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("NICE", process_df_formatted["NICE"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("PRI", process_df_formatted["PRI"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("RTPR", process_df_formatted["RTPR"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("CPUNR", process_df_formatted["CPUNR"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("EXC", process_df_formatted["EXC"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("CPU", process_df_formatted["CPU"].cast(DoubleType()))
process_df_formatted = process_df_formatted.withColumn("ts", process_df_formatted["ts"].cast(IntegerType()))

In [35]:
#NICE value is restored based on the PRI values
process_df_formatted = process_df_formatted.withColumn("NICE", F.when(F.col("PRI")==0, 0).otherwise(F.col("PRI")-120))

In [36]:
query = process_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [37]:
query.stop()

In [38]:
process_df_formatted.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



### 4. For process and memory, respectively, create a new column “CMD_PID” concatenating “CMD” and “PID” columns, and a new column “event_time” as timestamp format based on the unix time in “ts” column (5%)
   ### - Allow 20-second tolerance for possible data delay on “event_time” using watermarking

### Process activity

In [39]:
#concatenate CMD and PID and make a new column CMD_PID
process_df_formatted = process_df_formatted.withColumn('CMD_PID', F.concat(F.col('CMD'),F.lit('_'), F.col('PID')))

In [40]:
#“event_time” as timestamp format based on the unix time in “ts” column is created
process_df_formatted=process_df_formatted.withColumn('event_time', F.current_timestamp()).withWatermark("event_time", "20 seconds")

In [41]:
process_df_formatted.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- CMD_PID: string (nullable = true)
 |-- event_time: timestamp (nullable = false)



### Memory activity

In [42]:
#concatenate CMD and PID and make a new column CMD_PID
memory_df_formatted = memory_df_formatted.withColumn('CMD_PID', F.concat(F.col('CMD'),F.lit('_'), F.col('PID')))

In [43]:
#“event_time” as timestamp format based on the unix time in “ts” column is created
memory_df_formatted=memory_df_formatted.withColumn('event_time', F.current_timestamp()).withWatermark("event_time", "20 seconds")

In [44]:
memory_df_formatted.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- MINFLT: integer (nullable = true)
 |-- MAJFLT: integer (nullable = true)
 |-- VSTEXT: integer (nullable = true)
 |-- VSIZE: double (nullable = true)
 |-- RSIZE: double (nullable = true)
 |-- VGROW: double (nullable = true)
 |-- RGROW: double (nullable = true)
 |-- MEM: integer (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- CMD_PID: string (nullable = true)
 |-- event_time: timestamp (nullable = false)



### 5. Persist the transformed streaming data in parquet format for both process and memory (5%)
####    - The process data should be stored in “process.parquet” in the same folder of your notebook, and the memory data should be stored in “memory.parquet” in the same folder of your notebook.

### Process activity

In [45]:
# Write into parquet files the unsuccessful requests partitioned by status code
process_query_file_sink = process_df_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process.parquet")\
        .option("checkpointLocation", "process.parquet/checkpoint_process")\
        .start()

In [46]:
#Stop the file_sink query
process_query_file_sink.stop()

### Memory activity

In [47]:
# Write into parquet files the unsuccessful requests partitioned by status code
memory_query_file_sink = memory_df_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "memory.parquet")\
        .option("checkpointLocation", "memory.parquet/checkpoint_memory")\
        .start()

In [48]:
#Stop the file_sink query
memory_query_file_sink.stop()

### 6. Load the machine learning models given, and use the models to predict whether each process or memory streaming record is an attack event, respectively

### Process activity

In [49]:
#load the process model given to us 
process_model = PipelineModel.load("process_pipeline_model")

In [50]:
#predict whether each process streaming record is an attack event
process_predictions = process_model.transform(process_df_formatted)

In [51]:
query = process_predictions \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [52]:
query.stop()

### Memory activity

In [53]:
#load the memory model given to us
memory_model = PipelineModel.load("memory_pipeline_model")

In [54]:
#predict whether each memory streaming record is an attack event
memory_predictions = memory_model.transform(memory_df_formatted)

In [55]:
query = memory_predictions \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [56]:
query.stop()

## 7. Using the prediction result, and monitor the data following the requirements below
 ###   a. If any program in one machine is predicted as an attack in EITHER process or memory activity prediction, it could be a false alarm or a potential attack. Keep track of the approximate count of such events in every 2-min window for each machine for process and memory, respectively, and write the stream into Spark Memory sink using complete mode.

### Process activity

In [57]:
#filter the attack events
process_attacks = process_predictions.filter(process_predictions.prediction == 1)

In [58]:
query = process_attacks \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [59]:
query.stop()

Reference: https://stackoverflow.com/questions/44033037/adding-constant-value-column-to-spark-dataframe/44033401

In [60]:
#filter attacks for machine no. 4
machine_4_filtered = process_attacks.filter(F.col('machine') == "4")
#aggregated result including the machine ID, the time window, and the counts is presented
process_machine4 = machine_4_filtered.groupBy(window(machine_4_filtered.event_time, "120 seconds"),machine_4_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(4))

In [61]:
query = process_machine4 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [62]:
query.stop()

In [63]:
#filter attacks for machine no. 5
machine_5_filtered = process_attacks.filter(F.col('machine') == "5")
process_machine5 = machine_5_filtered.groupBy(window(machine_5_filtered.event_time, "120 seconds"),machine_5_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(5))

In [64]:
query = process_machine5 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [65]:
query.stop()

In [66]:
#filter attacks for machine no. 6
machine_6_filtered = process_attacks.filter(F.col('machine') == "6")
#aggregated result including the machine ID, the time window, and the counts is presented
process_machine6 = machine_6_filtered.groupBy(window(machine_6_filtered.event_time, "120 seconds"),machine_6_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(6))

In [67]:
query = process_machine6 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [68]:
query.stop()

In [69]:
#filter attacks for machine no. 7
machine_7_filtered = process_attacks.filter(F.col('machine') == "7")
#aggregated result including the machine ID, the time window, and the counts is presented
process_machine7 = machine_7_filtered.groupBy(window(machine_7_filtered.event_time, "120 seconds"),machine_7_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(7))

In [70]:
query = process_machine7 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [71]:
query.stop()

In [72]:
#filter attacks for machine no. 8
machine_8_filtered = process_attacks.filter(F.col('machine') == "8")
#aggregated result including the machine ID, the time window, and the counts is presented
process_machine8 = machine_8_filtered.groupBy(window(machine_8_filtered.event_time, "120 seconds"),machine_8_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(8))

In [73]:
query = process_machine8 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [74]:
query.stop()

### Memory activity

In [75]:
#filter the attack events
memory_attacks = memory_predictions.filter(memory_predictions.prediction == 1)

In [76]:
query = memory_attacks \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [77]:
query.stop()

Reference: https://stackoverflow.com/questions/44033037/adding-constant-value-column-to-spark-dataframe/44033401

In [78]:
#filter attacks for machine no. 4
machine_4_filtered = memory_attacks.filter(F.col('machine') == "4")
#aggregated result including the machine ID, the time window, and the counts is presented
memory_machine4 = machine_4_filtered.groupBy(window(machine_4_filtered.event_time, "120 seconds"),machine_4_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(4))

In [79]:
query = memory_machine4 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [80]:
query.stop()

In [81]:
#filter attacks for machine no. 5
machine_5_filtered = memory_attacks.filter(F.col('machine') == "5")
memory_machine5 = machine_5_filtered.groupBy(window(machine_5_filtered.event_time, "120 seconds"),machine_5_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(5))

In [82]:
query = memory_machine5 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [83]:
query.stop()

In [84]:
#filter attacks for machine no. 6
machine_6_filtered = memory_attacks.filter(F.col('machine') == "6")
#aggregated result including the machine ID, the time window, and the counts is presented
memory_machine6 = machine_6_filtered.groupBy(window(machine_6_filtered.event_time, "120 seconds"),machine_6_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(6))


In [85]:
query = memory_machine6 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [86]:
query.stop()

In [87]:
#filter attacks for machine no. 7
machine_7_filtered = memory_attacks.filter(F.col('machine') == "7")
#aggregated result including the machine ID, the time window, and the counts is presented
memory_machine7 = machine_7_filtered.groupBy(window(machine_7_filtered.event_time, "120 seconds"),machine_7_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(7))


In [88]:
query = memory_machine7 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [89]:
query.stop()

In [90]:
#filter attacks for machine no. 8
machine_8_filtered = memory_attacks.filter(F.col('machine') == "8")
#aggregated result including the machine ID, the time window, and the counts is presented
memory_machine8 = machine_8_filtered.groupBy(window(machine_8_filtered.event_time, "120 seconds"),machine_8_filtered.prediction) \
                                .count()\
                                .select("window","count").withColumn("machineID", lit(8))

In [91]:
query = memory_machine8 \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [92]:
query.stop()

### b. If a program in one machine, having the same “CMD” and “PID” in both process and memory streaming data, is predicted as an attack in BOTH process and memory activity prediction, then this is considered as an attack event. Find the streaming events fulfilling the criteria, create a new column to record the processing time 8 and persist them in parquet.
#### - Note the program with the same “CMD” and “PID” might not be generated at the exact same event time. If the difference between the event times in process and memory is less than 30 seconds and the program fulfills the criteria of matching “CMD” and “PID”, then you should include them for the above checking.

In [104]:
#filter the attack events
process_attacks = process_predictions.filter(process_predictions.prediction == 1)

In [105]:
#filter the attack events
memory_attacks = memory_predictions.filter(memory_predictions.prediction == 1)

In [106]:
predicted_memory_df = memory_attacks.selectExpr("CAST(sequence AS STRING) AS key_memory",
                                                "CAST(machine AS STRING) AS machine_memory",
                                                "CAST(CMD AS STRING) AS CMD_memory", 
                                                "CAST(PID AS STRING) AS PID_memory",
                                                "CAST(prediction AS STRING) AS prediction_memory", 
                                                "CAST(CMD_PID AS STRING) AS CMD_PID_memory", 
                                                "CAST(event_time AS timestamp) AS event_time_memory")

In [107]:
predicted_process_df = process_attacks.selectExpr("CAST(sequence AS STRING) AS key_process", 
                                                  "CAST(machine AS STRING) AS machine_process", 
                                                  "CAST(CMD AS STRING) AS CMD_process", 
                                                  "CAST(PID AS STRING) AS PID_process",
                                                  "CAST(prediction AS STRING) AS prediction_process", 
                                                  "CAST(CMD_PID AS STRING) AS CMD_PID_process", 
                                                  "CAST(event_time AS timestamp) AS event_time_process")

In [112]:
#join the dataframes according to the sequence and filter the records where CMD and PID is same for process and memory
joined_df = predicted_memory_df.join(predicted_process_df,expr("""key_memory == key_process"""),"inner")\
                .filter("machine_memory == machine_process AND CMD_process == CMD_memory AND PID_process == PID_process") 

In [136]:
#make a new column to store the difference between the event time for process and memory activity
joined_df2 = joined_df.withColumn('processing_time', abs(joined_df['event_time_memory'].cast("long") - joined_df["event_time_process"].cast("long")))

In [129]:
joined_df2.printSchema()

root
 |-- key_memory: string (nullable = true)
 |-- machine_memory: string (nullable = true)
 |-- CMD_memory: string (nullable = true)
 |-- PID_memory: string (nullable = true)
 |-- prediction_memory: string (nullable = false)
 |-- CMD_PID_memory: string (nullable = true)
 |-- event_time_memory: timestamp (nullable = false)
 |-- key_process: string (nullable = true)
 |-- machine_process: string (nullable = true)
 |-- CMD_process: string (nullable = true)
 |-- PID_process: string (nullable = true)
 |-- prediction_process: string (nullable = false)
 |-- CMD_PID_process: string (nullable = true)
 |-- event_time_process: timestamp (nullable = false)
 |-- processing_time: double (nullable = true)



In [138]:
#filter records with processing time less than 30 seconds
joined_df3 = joined_df2.filter("processing_time < 30")

In [139]:
# Start running the query 
query = joined_df3 \
    .writeStream \
    .outputMode("Append")\
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .start()

In [140]:
query.stop()

In [None]:
# Write into parquet files 
process_memory_attack_parquet = joined_df3.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process_memory_attack.parquet")\
        .option("checkpointLocation", "process_memory_attack.parquet/checkpoint")\
        .start()


In [None]:
#Stop the file_sink query
process_memory_attack_parquet.stop()