1. SparkSession is created using a SparkConf object, which would use two local cores
with a proper application name, and use UTC as the timezone 3

In [None]:
# Import SparkConf class into program
from pyspark import SparkConf
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'


# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[2]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Linux system hacking Detection"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)


In [None]:
# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

#Method 1: Using SparkSession
#We have to keep the maxPartition bytes by seeing the size of our csv files and in such a way we get 4 partitions
spark = SparkSession.builder.config(conf=spark_conf).config("spark.sql.session.timeZone", "UTC").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

2. From the Kafka producers in Task 1.1 and 1.2, ingest the streaming data into Spark
Streaming for both process and memory activities.

In [None]:
topic = "process"
df_process = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
topic = "memory"
df_memory = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
df_process.printSchema()

In [None]:
df_memory.printSchema()

In [None]:
df_process = df_process.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
df_memory = df_memory.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
df_process.printSchema()

In [None]:
df_memory.printSchema()

Defining the schemas for both the activities.

In [None]:
schema_process = ArrayType(StructType([    
    StructField('sequence', IntegerType(), True), 
    StructField('machine', IntegerType(), True),
    StructField('PID', IntegerType(), True),
    StructField('TRUN', IntegerType(), True),
    StructField('TSLPI', IntegerType(), True),
    StructField('TSLPU', IntegerType(), True),
    StructField('POLI', StringType(), True),
    StructField('NICE', IntegerType(), True),
    StructField('PRI', IntegerType(), True),
    StructField('RTPR', IntegerType(), True),
    StructField('CPUNR', IntegerType(), True),
    StructField('Status', StringType(), True),
    StructField('EXC', IntegerType(), True),
    StructField('State', StringType(), True),
    StructField('CPU', FloatType(), True),
    StructField('CMD', StringType(), True),
    StructField('ts', StringType(), True)
]))

In [None]:
df_process=df_process.select(F.from_json(F.col("value").cast("string"), schema_process).alias('parsed_value'))

In [None]:
df_process.printSchema()

In [None]:
df_process = df_process.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      

In [None]:
df_process.printSchema()

In [None]:
df_process_formatted = df_process.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.TRUN").alias("TRUN"),
                    F.col("unnested_value.TSLPI").alias("TSLPI"),
                    F.col("unnested_value.TSLPU").alias("TSLPU"),
                    F.col("unnested_value.POLI").alias("POLI"),
                    F.col("unnested_value.NICE").alias("NICE"),
                    F.col("unnested_value.PRI").alias("PRI"),
                    F.col("unnested_value.RTPR").alias("RTPR"),
                    F.col("unnested_value.CPUNR").alias("CPUNR"),
                    F.col("unnested_value.Status").alias("Status"),
                    F.col("unnested_value.EXC").alias("EXC"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.CPU").alias("CPU"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [None]:
df_process_formatted.printSchema()

Now lets do the same for the memory activity.

In [None]:
schema_memory = ArrayType(StructType([    
    StructField('sequence', IntegerType(), True), 
    StructField('machine', IntegerType(), True),
    StructField('PID', LongType(), True),
    StructField('MINFLT', StringType(), True),
    StructField('MAJFLT', StringType(), True),
    StructField('VSTEXT', StringType(), True),
    StructField('VSIZE', DoubleType(), True),
    StructField('RSIZE', StringType(), True),
    StructField('VGROW', StringType(), True),
    StructField('RGROW', StringType(), True),
    StructField('MEM', FloatType(), True),
    StructField('CMD', StringType(), True),
    StructField('ts', StringType(), True)
]))

In [None]:
df_memory=df_memory.select(F.from_json(F.col("value").cast("string"), schema_memory).alias('parsed_value'))

In [None]:
df_memory.printSchema()

In [None]:
df_memory = df_memory.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      

In [None]:
df_memory.printSchema()

In [None]:
df_memory_formatted = df_memory.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.MINFLT").alias("MINFLT"),
                    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
                    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
                    F.col("unnested_value.VSIZE").alias("VSIZE"),
                    F.col("unnested_value.RSIZE").alias("RSIZE"),
                    F.col("unnested_value.VGROW").alias("VGROW"),
                    F.col("unnested_value.RGROW").alias("RGROW"),
                    F.col("unnested_value.MEM").alias("MEM"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [None]:
df_memory_formatted.printSchema()

In [None]:
# Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    df.show(20,False)

In [None]:
query1 = df_memory_formatted.writeStream.outputMode("append")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

In [None]:
query1.stop()

3. Then the streaming data format should be transformed into the proper formats
following the metadata file schema for both process and memory, similar to
assignment 2A 4 (3%)
- The numeric values with extra spaces or “K” / “M” / “G” should be properly
transformed into their correct values
- The NICE value should also be restored based on the PRI values using their
relationship 5
- Hint - There is a mapping between PRI (priority) and NICE, as long as
the process is not yet finished during the last interval. For example,
- PRI 100 maps to NICE -20
- PRI 101 maps toNICE -19
- …
- PRI 139 maps to NICE 19
- Hint - If the process is finished, PRI and NICE would both be 0.

In [None]:
correct_data = udf(lambda z:float(z[0:-1])*1000 if 'K' in z\
           else (float(z[0:-1])*1000000 if 'M' in z\
                 else (float(z[0:-1])*1000000000 if 'G' in z\
                     else ( z.replace(" ","") if " " in z\
                           else z))))

cols = ['MINFLT', 'MAJFLT', 'VSTEXT', 'RSIZE', 'VGROW', 'RGROW']
for column in cols:
    df_memory_formatted = df_memory_formatted.withColumn(column, correct_data(col(column)).cast('double'))

In [None]:
correct_data = udf(lambda z:int(z-120) if z!=0 
                  else 0)

df_process_formatted = df_process_formatted.withColumn("NICE", correct_data("PRI").cast("int"))


In [None]:
df_memory_formatted = df_memory_formatted.withColumn('CMD_PID', F.concat(F.col('CMD'),F.lit('_'), F.col('PID')))

df_memory_formatted = df_memory_formatted.withColumn("event_time",F.col('ts').cast('timestamp'))

memory_watermark = df_memory_formatted \
    .withWatermark("event_time", "20 seconds")

In [None]:
query_process_formatted = df_process_formatted \
    .writeStream \
    .format("memory") \
    .queryName("process_formatted_sql") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
spark.sql("select * from process_formatted_sql").show()


In [None]:
df_process_formatted = df_process_formatted.withColumn('CMD_PID', F.concat(F.col('CMD'),F.lit('_'), F.col('PID')))

df_process_formatted = df_process_formatted.withColumn("event_time",F.col('ts').cast('timestamp'))

process_watermark = df_process_formatted \
    .withWatermark("event_time", "20 seconds")

In [None]:
query_memory_sink = df_memory_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process.parquet")\
        .option("checkpointLocation", "process.parquet/checkpoint")\
        .start()

In [None]:
query_memory_sink.stop()

In [None]:
query_process_sink = df_process_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "memory.parquet")\
        .option("checkpointLocation", "./checkpoint")\
        .start()

In [None]:
query_process_sink.stop()

In [None]:
mPath =  "../process_pipeline_model"

from pyspark.ml.pipeline import PipelineModel
pipelineModel = PipelineModel.load(mPath)

df_process_prediction = pipelineModel.transform(df_process_formatted)

In [None]:
mPath =  "../memory_pipeline_model"

from pyspark.ml.pipeline import PipelineModel
pipelineModel = PipelineModel.load(mPath)

df_memory_prediction = pipelineModel.transform(df_memory_formatted)

In [None]:
df_memory_prediction = pipelineModel.transform(df_memory_formatted)

In [None]:
df_memory_attack_count = df_memory_prediction \
    .filter(df_memory_prediction['prediction'] == 1.0)\
    .groupBy(window(df_memory_prediction.event_time, "120 seconds"), df_memory_prediction['machine'].alias("machine_id"), df_memory_prediction['CMD_PID'].alias("CMD_PID"))\
    .agg(F.sum("prediction").alias("count"))\
    .select("machine_id", "window", "count")\
    .sort("window")

In [None]:
query_memory_attack_count = df_memory_attack_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("memory_attack_count_sql") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
spark.sql("select * from memory_attack_count_sql").show()


In [None]:
df_process_attack_count = df_process_prediction \
    .filter(df_process_prediction['prediction'] == 1.0)\
    .groupBy(window(df_process_prediction.event_time, "120 seconds"), df_process_prediction['machine'].alias("machine_id"), df_process_prediction['CMD_PID'].alias("CMD_PID"))\
    .agg(F.sum("prediction").alias("count"))\
    .select("machine_id", "window", "count")\
    .sort("window")

In [None]:
query_process_attack_count = df_process_attack_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("process_attack_count_sql") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
spark.sql("select * from process_attack_count_sql")

#### 3.7.2

In [None]:
df_process_attack_count 

In [None]:
def init_plots():
    try:
        width = 9.5
        height = 6
        fig = plt.figure(figsize=(width,height)) # create new figure
        fig.subplots_adjust(hspace=0.8)
        ax = fig.add_subplot(111) # adding the subplot axes to the given grid position
        ax.set_xlabel('Time')
        ax.set_ylabel('Value')
        ax.title.set_text('Time Vs Value')
        fig.suptitle('Real-time uniform stream data visualization') # giving figure a title
        fig.show() # displaying the figure
        fig.canvas.draw() # drawing on the canvas
        return fig, ax
    except Exception as ex:
        print(str(ex))

In [None]:
import time
import matplotlib.pyplot as plt
%matplotlib notebook

fig, ax = init_plots()

while True:
    df_all = spark.sql("select * from memory_attack_count_sql order by machine_id").toPandas()
    # Get starting timestamp to plot both graphs
    start_time = df_all['window'][len(df_all)-1]
    df_reduced = spark.sql("select * from reduced_values where end_time>='"+str(start_time)+"' order by end_time desc").toPandas()
    
    x_all = df_all['window'].to_list()
    y_all = df_all['value'].to_list()
    x_reduced = df_reduced['end_time'].to_list()
    y_reduced = df_reduced['avg_value'].to_list()
    ax.clear()
    ax.plot(x_all, y_all, '-b', label='Original')
    ax.plot(x_reduced, y_reduced, '--r', label='Reduced')
    ax.set_xlabel('Time')
    ax.set_ylabel('Value')
    leg = ax.legend()
    fig.canvas.draw()
    
    time.sleep(5)

In [None]:
import time
import matplotlib.pyplot as plt
%matplotlib notebook

fig, ax = init_plots()

while True:
    df = spark.sql("select * from process_attack_count_sql").toPandas()
    
    if(len(df)>0):        
        x = df['word'].to_list()
        y = df['count'].to_list() 
        ax.clear()
        ax.plot(x, y)
        ax.set_xlabel('Time')
        ax.set_ylabel('Value')
        fig.canvas.draw()
        
    time.sleep(10)