### Assignment 2 Part B<br>

Name: Manmeet Singh<br>
Id: 30749476

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from datetime import timezone,datetime 
from itertools import chain
from pyspark.sql import functions as F
from pyspark.sql.types import *

from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import substring, length, col, expr
from pyspark.sql.functions import regexp_replace, concat

from pyspark.ml import PipelineModel

spark_conf = SparkConf().setMaster("local[2]").setAppName("SparkStreaming").set("spark.sql.session.timeZone", "UTC")
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate() 

In [None]:
# reading streaming data for process
topic = "process_producer"
process = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
# reading streaming data for memory
topic = "memory_producer"
memory = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
# Process schema
process_type = ArrayType(StructType([
    StructField("sequence",IntegerType(),True),
    StructField("machine",IntegerType(),True),
    StructField("PID",IntegerType(),True),
    StructField("TRUN",IntegerType(),True),
    StructField("TSLPI",IntegerType(),True),
    StructField("TSLPU",IntegerType(),True),
    StructField("POLI",StringType(),True),
    StructField("NICE",IntegerType(),True),
    StructField("PRI",IntegerType(),True),
    StructField("RTPR",IntegerType(),True),
    StructField("CPUNR",IntegerType(),True),
    StructField("Status",StringType(),True),
    StructField("EXC",IntegerType(),True),
    StructField("State",StringType(),True),
    StructField("CPU",DoubleType(),True),
    StructField("CMD",StringType(),True),
    StructField("ts",TimestampType(),True)]))
process = process.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# Memory schema
memory_type = ArrayType(StructType([
    StructField("sequence",IntegerType(),True),
    StructField("machine",IntegerType(),True),
    StructField("PID",DoubleType(),True),
    StructField("MINFLT",StringType(),True),
    StructField("MAJFLT",StringType(),True),
    StructField("VSTEXT",StringType(),True),
    StructField("VSIZE",DoubleType(),True),
    StructField("RSIZE",StringType(),True),
    StructField("VGROW",StringType(),True),
    StructField("RGROW",StringType(),True),
    StructField("MEM",DoubleType(),True),
    StructField("CMD",StringType(),True),
    StructField("ts",TimestampType(),True)]))
memory = memory.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# Parsing Process Values
process = process.select(F.from_json(F.col("value").cast("string"),process_type).alias('parsed_value'))
process = process.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

In [None]:
# Parsing Memory Values
memory = memory.select(F.from_json(F.col("value").cast("string"),memory_type).alias('parsed_value'))
memory = memory.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

In [None]:
process = process.select(
    F.col("unnested_value.sequence").alias("sequence"),
    F.col("unnested_value.machine").alias("machine"),
    F.col("unnested_value.PID").alias("PID"),
    F.col("unnested_value.TRUN").alias("TRUN"),
    F.col("unnested_value.TSLPI").alias("TSLPI"),
    F.col("unnested_value.TSLPU").alias("TSLPU"),
    F.col("unnested_value.POLI").alias("POLI"),
    F.col("unnested_value.NICE").alias("NICE"),
    F.col("unnested_value.PRI").alias("PRI"),
    F.col("unnested_value.RTPR").alias("RTPR"),
    F.col("unnested_value.CPUNR").alias("CPUNR"),
    F.col("unnested_value.Status").alias("Status"),
    F.col("unnested_value.EXC").alias("EXC"),
    F.col("unnested_value.State").alias("State"),
    F.col("unnested_value.CPU").alias("CPU"),
    F.col("unnested_value.CMD").alias("CMD"),
    F.col("unnested_value.ts").alias("ts"))

In [None]:
memory = memory.select(
    F.col("unnested_value.sequence").alias("sequence"),
    F.col("unnested_value.machine").alias("machine"),
    F.col("unnested_value.PID").alias("PID"),
    F.col("unnested_value.MINFLT").alias("MINFLT"),
    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
    F.col("unnested_value.VSIZE").alias("VSIZE"),
    F.col("unnested_value.RSIZE").alias("RSIZE"),
    F.col("unnested_value.VGROW").alias("VGROW"),
    F.col("unnested_value.RGROW").alias("RGROW"),
    F.col("unnested_value.MEM").alias("MEM"),
    F.col("unnested_value.CMD").alias("CMD"),
    F.col("unnested_value.ts").alias("ts"))

In [None]:
from pyspark.sql.functions import substring,length
colList = ['MINFLT', 'MAJFLT', 'VSTEXT', 'RSIZE', 'VGROW', 'RGROW']

for column in colList:
    # trim space from numeric data
    memory = memory.withColumn(column, F.trim(F.col(column)))
    string = 'substring({0},1,length({0})-1)'.format(column)
    # remove K from numeric data and multiply with 1000
    memory = memory.withColumn(column,\
    F.when(F.col(column).contains('K'),F.expr(string).cast('int')*1000).otherwise(F.col(column).cast('int')))
    # remove M from numeric data and multiply with 1000000
    memory = memory.withColumn(column,\
    F.when(F.col(column).contains('M'),F.expr(string).cast('int')*1000000).otherwise(F.col(column).cast('int')))
    
process = process.withColumn("NICE", F.when(F.col("PRI")==0, 0).otherwise(F.col("PRI")-120))

**4. For process and memory, respectively, create a new column “CMD_PID”
concatenating “CMD” and “PID” columns, and a new column “event_time” as
timestamp format based on the unix time in “ts” column**

In [None]:
# New columns for process and memory data
process = process.withColumn("CMD_PID", concat(F.col('CMD'),F.col("PID")))
memory = memory.withColumn("event_time",F.col('ts'))

**5. Persist the transformed streaming data in parquet format for both process and
memory**

In [None]:
# Parquet file for process
query_file_sink_process = process.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process.parquet")\
        .option("checkpointLocation", "process.parquet/checkpoint")\
        .start()

In [None]:
# Parquet file for memory
query_file_sink_memory = memory.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "memory.parquet")\
        .option("checkpointLocation", "memory.parquet/checkpoint")\
        .start()

In [None]:
query_file_sink_process.stop()

In [None]:
query_file_sink_memory.stop()

In [None]:
# Reading process Parquet file
query_file_sink_df_process = spark.read.parquet("process.parquet/")
query_file_sink_df_process.printSchema()
query_file_sink_df_process.show()

In [None]:
# Reading memory Parquet file
query_file_sink_df_memory = spark.read.parquet("memory.parquet/")
query_file_sink_df_memory.printSchema()
query_file_sink_df_memory.show()

**6. Load the machine learning models given 6 , and use the models to predict whether
each process or memory streaming record is an attack event, respectively**

In [None]:
# Loading Pipeline Model
PipelineProcess = PipelineModel.load('process_pipeline_model')
PipelineMemory = PipelineModel.load('memory_pipeline_model')

In [None]:
# prediction
Process_transformed = PipelineProcess.transform(process)
Memory_transformed = PipelineMemory.transform(memory)

**7. Using the prediction result, and monitor the data following the requirements below**

In [None]:
# track of the approximate count of such events in every 2-min window for each machine for process and memory
windowedCounts_process = Process_transformed \
    .withWatermark("ts", "120 seconds") \
    .where('prediction==1')\
    .groupBy(F.window(process.ts, "120 seconds"),F.col('machine'))\
    .agg(F.approx_count_distinct("CMD_PID").alias("total"))\
    .select("window","machine","total")

windowedCounts_memory = Memory_transformed \
    .withWatermark("ts", "120 seconds") \
    .where('prediction==1')\
    .groupBy(F.window(memory.ts, "120 seconds"),F.col('machine'))\
    .agg(F.count("machine").alias("total"))\
    .select("window","machine","total")

In [None]:
query_process = windowedCounts_process \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

query_memory = windowedCounts_memory \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [None]:
query_process.stop()

In [None]:
query_memory.stop()

In [None]:
# Renaming columns before joining, Process
processFormatted = Process_transformed\
            .selectExpr("sequence as processSequence","machine as processMachine", "CMD as CMD_process"\
                        ,"PID AS PID_process","TRUN","TSLPI","TSLPU","POLI","NICE"\
                       ,"PRI","RTPR","CPUNR","Status","EXC","State","CPU","ts as process_ts","CMD_PID"\
                        ,"prediction as processPrediction")


In [None]:
# Renaming columns before joining, Memory
memoryFormatted = Memory_transformed\
            .selectExpr("sequence as memorySequence","machine as memoryMachine", "CMD as CMD_memory","PID as PID_memory"\
                        ,"MINFLT","MAJFLT","VSTEXT","VSIZE","RSIZE","VGROW","RGROW","MEM"\
                        ,"ts as memory_ts","event_time",'prediction as memoryPrediction')

In [None]:
# Merging process and memory data where attack is 1
data_merged = processFormatted.join(memoryFormatted,expr("""
    CMD_process = CMD_memory AND PID_process = PID_memory AND
    processPrediction = 1 AND
    memoryPrediction = 1 AND
    process_ts <= memory_ts + interval 30 seconds AND memory_ts <= process_ts + interval 30 seconds
"""))
data_merged = data_merged.withColumn("processingTime",F.lit(int(datetime.now().replace(tzinfo=timezone.utc).timestamp())))

In [None]:
# Parquet file for merged data
attack_data = data_merged.select("processSequence","processMachine"\
                                 ,"CMD_process","PID_process"\
                                 ,"TRUN","TSLPI","TSLPU","POLI","NICE"\
                                 ,"PRI","RTPR","CPUNR","Status","EXC","State","CPU"\
                                 ,"process_ts","processPrediction"\
                                 ,"memorySequence","memoryMachine"\
                                 ,"CMD_memory","PID_memory"\
                                 ,"MAJFLT","VSTEXT","VSIZE","RSIZE","VGROW","RGROW","MEM"\
                                 ,"memory_ts","event_time","memoryPrediction","processingTime")\
        .writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process_memory_attack.parquet")\
        .option("checkpointLocation", "process_memory_attack.parquet/checkpoint")\
        .start()

In [None]:
attack_data.stop()

In [None]:
# Reading Parquet file
query_file_sink_attack_data = spark.read.parquet("process_memory_attack.parquet/")
query_file_sink_attack_data.printSchema()
query_file_sink_attack_data.show()