### Step 1
Initialize Spark Session

In [25]:
from pyspark import SparkConf
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, when, count, col

In [26]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.appName("Analysis in Spark").master("local[2]").getOrCreate()


spark.conf.set("spark.sql.session.timeZone", "UTC")           # set timezone to UTC

### Step 2
Connection to Kafka Producer/Broker and subscribe to the topic and load data from Kafka topic with <code>readStream</code>

In [27]:
topic = "process"
df_process= spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [28]:
topic = "memory"
df_memory= spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [29]:
df_process.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [30]:
df_memory.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [31]:
df_process = df_process.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [32]:
query_process= df_process \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [33]:
query_process.stop()

In [34]:
df_memory = df_memory.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query_memory= df_memory \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()


In [35]:
query_memory.stop()

### Define schema for each to remove parse-level

In [36]:
schema_process = StructType([    
    StructField('sequence', IntegerType(), True), 
    StructField('machine', IntegerType(), True),
    StructField('PID', IntegerType(), True),
    StructField('TRUN', IntegerType(), True),
    StructField('TSLPI', IntegerType(), True),
    StructField('TSLPU', IntegerType(), True),
    StructField('POLI', StringType(), True),
    StructField('NICE', IntegerType(), True),
    StructField('PRI', IntegerType(), True),            
    StructField('RTPR', IntegerType(), True),            
    StructField('CPUNR', IntegerType(), True),            
    StructField('Status', StringType(), True),
    StructField('EXC', IntegerType(), True),            
    StructField('State', StringType(), True),            
    StructField('CPU', IntegerType(), True),            
    StructField('CMD', StringType(), True),
    StructField('ts', StringType(), True)
    ,            


])

In [37]:
schema_memory = StructType([    
    StructField('sequence', IntegerType(), True), 
    StructField('machine', IntegerType(), True),
    StructField('PID', IntegerType(), True),
    StructField('MINFLT', IntegerType(), True),
    StructField('MAJFLT', IntegerType(), True),
    StructField('VSTEXT', IntegerType(), True),
    StructField('VSIZE', DoubleType(), True),
    StructField('RSIZE', DoubleType(), True),
    StructField('VGROW', DoubleType(), True),            
    StructField('RGROW', DoubleType(), True),            
    StructField('MEM', DoubleType(), True),            
    StructField('CMD', StringType(), True),
    StructField('ts', StringType(), True)


])

In [38]:
df_process=df_process.select(F.from_json(F.col("value").cast("string"), schema_process).alias('parsed_value'))
df_memory=df_memory.select(F.from_json(F.col("value").cast("string"), schema_memory).alias('parsed_value'))


In [39]:
df_process.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- sequence: integer (nullable = true)
 |    |-- machine: integer (nullable = true)
 |    |-- PID: integer (nullable = true)
 |    |-- TRUN: integer (nullable = true)
 |    |-- TSLPI: integer (nullable = true)
 |    |-- TSLPU: integer (nullable = true)
 |    |-- POLI: string (nullable = true)
 |    |-- NICE: integer (nullable = true)
 |    |-- PRI: integer (nullable = true)
 |    |-- RTPR: integer (nullable = true)
 |    |-- CPUNR: integer (nullable = true)
 |    |-- Status: string (nullable = true)
 |    |-- EXC: integer (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- CPU: integer (nullable = true)
 |    |-- CMD: string (nullable = true)
 |    |-- ts: string (nullable = true)



In [40]:
df_process_new = df_process.select(
                    F.col("parsed_value.sequence").alias("sequence"),
                    F.col("parsed_value.machine").alias("machine"),
                    F.col("parsed_value.PID").alias("PID"),
                    F.col("parsed_value.TRUN").alias("TRUN"),
                    F.col("parsed_value.TSLPI").alias("TSLPI"),
                    F.col("parsed_value.TSLPU").alias("TSLPU"),
                    F.col("parsed_value.POLI").alias("POLI"),
                    F.col("parsed_value.NICE").alias("NICE"),
                    F.col("parsed_value.PRI").alias("PRI"),
                    F.col("parsed_value.RTPR").alias("RTPR"),
                    F.col("parsed_value.CPUNR").alias("CPUNR"),
                    F.col("parsed_value.Status").alias("Status"),
                    F.col("parsed_value.EXC").alias("EXC"),
                    F.col("parsed_value.State").alias("State"),
                    F.col("parsed_value.CPU").alias("CPU"),
                    F.col("parsed_value.CMD").alias("CMD"),
                    F.col("parsed_value.ts").alias("ts")
                )

In [41]:
df_process_new.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: integer (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: string (nullable = true)



In [42]:
df_memory.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- sequence: integer (nullable = true)
 |    |-- machine: integer (nullable = true)
 |    |-- PID: integer (nullable = true)
 |    |-- MINFLT: integer (nullable = true)
 |    |-- MAJFLT: integer (nullable = true)
 |    |-- VSTEXT: integer (nullable = true)
 |    |-- VSIZE: double (nullable = true)
 |    |-- RSIZE: double (nullable = true)
 |    |-- VGROW: double (nullable = true)
 |    |-- RGROW: double (nullable = true)
 |    |-- MEM: double (nullable = true)
 |    |-- CMD: string (nullable = true)
 |    |-- ts: string (nullable = true)



In [43]:
df_memory_new = df_memory.select(
                    F.col("parsed_value.sequence").alias("sequence"),
                    F.col("parsed_value.machine").alias("machine"),
                    F.col("parsed_value.PID").alias("PID"),
                    F.col("parsed_value.MINFLT").alias("MINFLT"),
                    F.col("parsed_value.MAJFLT").alias("MAJFLT"),
                    F.col("parsed_value.VSTEXT").alias("VSTEXT"),
                    F.col("parsed_value.VSIZE").alias("VSIZE"),
                    F.col("parsed_value.RSIZE").alias("RSIZE"),
                    F.col("parsed_value.VGROW").alias("VGROW"),
                    F.col("parsed_value.RGROW").alias("RGROW"),
                    F.col("parsed_value.MEM").alias("MEM"),
                    F.col("parsed_value.CMD").alias("CMD"),
                    F.col("parsed_value.ts").alias("ts")
                )

In [44]:
df_memory_new.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- MINFLT: integer (nullable = true)
 |-- MAJFLT: integer (nullable = true)
 |-- VSTEXT: integer (nullable = true)
 |-- VSIZE: double (nullable = true)
 |-- RSIZE: double (nullable = true)
 |-- VGROW: double (nullable = true)
 |-- RGROW: double (nullable = true)
 |-- MEM: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: string (nullable = true)



### Transform columns 

In [45]:
df_memory_new = df_memory_new.withColumn("MINFLT", regexp_replace("MINFLT", 'K', '000'))

df_memory_new = df_memory_new.withColumn("MAJFLT", regexp_replace("MAJFLT", 'K', '000'))
df_memory_new = df_memory_new.withColumn("MAJFLT", regexp_replace("MAJFLT", '224.2M', '224200000'))

df_memory_new = df_memory_new.withColumn("VSTEXT", regexp_replace("VSTEXT", 'K', '000'))
df_memory_new = df_memory_new.withColumn("VSTEXT", regexp_replace("VSTEXT", '224.2M', '224200000'))

df_memory_new = df_memory_new.withColumn("RSIZE", regexp_replace("RSIZE", 'K', '000'))
df_memory_new = df_memory_new.withColumn("RSIZE", regexp_replace("RSIZE", '224.2M', '224200000'))

df_memory_new = df_memory_new.withColumn("VGROW", regexp_replace("VGROW", 'K', '000'))
df_memory_new = df_memory_new.withColumn("VGROW", regexp_replace("VGROW", ' ',''))
df_memory_new = df_memory_new.withColumn("VGROW", regexp_replace("VGROW", '224.2M', '224200000'))


df_memory_new = df_memory_new.withColumn("RGROW", regexp_replace("RGROW", 'K', '000'))
df_memory_new = df_memory_new.withColumn("RGROW", regexp_replace("RGROW", '224.2M', '224200000'))

#### converting the below columns to appropriaye datatype as per metadata

df_memory_new = df_memory_new.withColumn("MINFLT", df_memory_new["MINFLT"].cast(DoubleType()))
df_memory_new = df_memory_new.withColumn("MAJFLT", df_memory_new["MAJFLT"].cast(DoubleType()))
df_memory_new = df_memory_new.withColumn("VSTEXT", df_memory_new["VSTEXT"].cast(DoubleType()))
df_memory_new = df_memory_new.withColumn("RSIZE", df_memory_new["RSIZE"].cast(DoubleType()))
df_memory_new = df_memory_new.withColumn("VGROW",df_memory_new["VGROW"].cast(DoubleType()))
df_memory_new = df_memory_new.withColumn("RGROW",df_memory_new["RGROW"].cast(DoubleType()))


### Define a function to map PRI values on to NICE column using a user defined function

In [46]:
def get_NICE(x):
    if x == 0:
        return 0 
    else:
        val = 19
        diff = 139 - x
        return val - diff
    
udf_NICE = udf(get_NICE, IntegerType())
    

In [47]:
df_process_new = df_process_new.withColumn("NICE", udf_NICE(col("PRI")))

In [None]:
query_process= df_process_new \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
query_process.stop()

### Creating new columns "CMD-PID"  From  " CMD" and  "PID"  and column "event_time"

In [48]:
df_memory_new = df_memory_new.withColumn("PID", df_memory_new["PID"].cast(StringType()))
df_process_new = df_process_new.withColumn("PID", df_process_new["PID"].cast(StringType()))


In [49]:
def concat(x,y):
        return str(x) + str(y)
    
udf_concat = udf(concat, StringType())
df_memory_new = df_memory_new.withColumn("CMD-PID", udf_concat(col("CMD") , col("PID")))
df_process_new = df_process_new.withColumn("CMD-PID", udf_concat(col("CMD") , col("PID")))


In [50]:
from pyspark.sql.functions import from_unixtime
df_memory_new = df_memory_new.withColumn("ts", df_memory_new["ts"].cast(StringType()))
df_process_new = df_process_new.withColumn("ts", df_process_new["ts"].cast(StringType()))

df_memory_new = df_memory_new.withColumn("event_time",from_unixtime(df_memory_new.ts, "yyyy-MM-dd HH:mm:ss"))
df_process_new = df_process_new.withColumn("event_time",from_unixtime(df_process_new.ts, "yyyy-MM-dd HH:mm:ss"))

df_memory_new = df_memory_new.withColumn("PID", df_memory_new["PID"].cast(IntegerType())) # convert back for prediction
df_process_new = df_process_new.withColumn("PID", df_process_new["PID"].cast(IntegerType())) # convert back for prediction


In [52]:
query_memory.stop()

#### convert unix timestamp to timezone

### Process data in paraquet format

In [None]:
query_file_sink_process = df_process_new.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process_parquet/clickstream_df")\
        .option("checkpointLocation", "parquet/clickstream_df/checkpoint")\
        .start()

In [None]:
query_file_sink_memory = df_memory_new.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "memory_parquet/clickstream_df")\
        .option("checkpointLocation", "parquet/clickstream_df/checkpoint")\
        .start()

### Loading pipeline models for process and memory ad predicting real time stream


In [None]:
import zipfile

a = zipfile.ZipFile('process_pipeline_model.zip')
a.extractall()


In [None]:

b = zipfile.ZipFile('memory_pipeline_model.zip')
b.extractall()


In [53]:
#Loading the Pipeline Model From the filesystem
from pyspark.ml import PipelineModel
pipelineModel_process = PipelineModel.load("process_pipeline_model")
pipelineModel_memory = PipelineModel.load("memory_pipeline_model")


In [54]:
prediction_memory = pipelineModel_memory.transform(df_memory_new)
prediction_process = pipelineModel_process.transform(df_process_new)

In [57]:
# Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    df.show(20,False)

In [62]:
final_pred_memory = prediction_memory.select("machine","prediction").groupby('machine','prediction').agg(F.count('prediction').alias('Count'))
query1 = final_pred_memory.writeStream.outputMode("complete")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='2 minutes')\
        .start()

+-------+----------+-----+
|machine|prediction|Count|
+-------+----------+-----+
|4      |1.0       |676  |
|7      |0.0       |603  |
|8      |1.0       |423  |
|5      |1.0       |854  |
|4      |0.0       |582  |
|8      |0.0       |845  |
|7      |1.0       |644  |
|6      |1.0       |546  |
|6      |0.0       |618  |
|5      |0.0       |321  |
+-------+----------+-----+

+-------+-----+
|machine|Count|
+-------+-----+
|6      |2227 |
|5      |2308 |
|4      |2338 |
|8      |2445 |
|7      |2413 |
+-------+-----+

+-------+----------+-----+
|machine|prediction|Count|
+-------+----------+-----+
|8      |1.0       |17   |
|8      |0.0       |32   |
+-------+----------+-----+

+-------+----------+-----+
|machine|prediction|Count|
+-------+----------+-----+
|4      |1.0       |792  |
|7      |0.0       |659  |
|8      |1.0       |494  |
|5      |1.0       |980  |
|4      |0.0       |668  |
|8      |0.0       |976  |
|7      |1.0       |737  |
|6      |1.0       |620  |
|6      |0.0    

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 46634)
ERROR:py4j.java_gateway:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent cal

In [60]:
query_1.stop()

NameError: name 'query_1' is not defined

+-------+-----+
|machine|Count|
+-------+-----+
|6      |1199 |
|5      |1224 |
|4      |1231 |
|8      |1366 |
|7      |1365 |
+-------+-----+

+-------+----------+-----+
|machine|prediction|Count|
+-------+----------+-----+
|4      |1.0       |233  |
|7      |0.0       |215  |
|8      |1.0       |150  |
|5      |1.0       |227  |
|4      |0.0       |163  |
|8      |0.0       |232  |
|7      |1.0       |223  |
|6      |1.0       |149  |
|6      |0.0       |188  |
|5      |0.0       |87   |
+-------+----------+-----+

+-------+-----+
|machine|Count|
+-------+-----+
|6      |1408 |
|5      |1474 |
|4      |1484 |
|8      |1552 |
|7      |1587 |
+-------+-----+

+-------+----------+-----+
|machine|prediction|Count|
+-------+----------+-----+
|4      |1.0       |383  |
|7      |0.0       |307  |
|8      |1.0       |233  |
|5      |1.0       |379  |
|4      |0.0       |284  |
|8      |0.0       |387  |
|7      |1.0       |338  |
|6      |1.0       |270  |
|6      |0.0       |293  |
|5     

In [None]:
query_memory.stop()

In [None]:
query_process= prediction_process\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
query_process.stop()