In [11]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.ml.pipeline import PipelineModel
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col,isnan, isnull,count,when
from pyspark.sql.functions import regexp_extract,regexp_replace
from pyspark.sql.functions import col,isnan, isnull,count,when
from pyspark.sql.functions import expr
from pytz import timezone
import datetime as dt

from pyspark.sql.types import BooleanType, StructType, StringType, TimestampType

master = 'local[2]'        
app_name = 'Application for Streaming'

config  = SparkConf().setMaster(master).setAppName(app_name)
spark = SparkSession.builder.config(conf=config).config('spark.sql.session.timeZone', 'UTC').getOrCreate()

sc = spark.sparkContext

### PROCESS 

In [12]:
topic1 = 'process'

df_process = spark \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
          .option("subscribe", topic1) \
          .load()

New_Process_df = df_process.select(F.col('value').cast('string').alias('value'))

Process_schema = ArrayType(StructType([
    StructField('sequence',StringType(),True),
    StructField('machine',StringType(),True),
    StructField('PID',StringType(),True),
    StructField('TRUN',StringType(),True),
    StructField('TSLPI',StringType(),True),
    StructField('TSLPU',StringType(),True),
    StructField('POLI',StringType(),True),
    StructField('NICE',StringType(),True),
    StructField('PRI',StringType(),True),
    StructField('RTPR',StringType(),True),
    StructField('CPUNR',StringType(),True),
    StructField('Status',StringType(),True),
    StructField('EXC', StringType(), True),
    StructField('State', StringType(), True),
    StructField('CPU', StringType(), True),
    StructField('CMD', StringType(), True),
    StructField('ts',StringType(),True)
    ]))

New_Process_df = New_Process_df.select(F.from_json(F.col('value').cast("string"),Process_schema).alias('parsed_value'))

New_Process_df = New_Process_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  

New_Process_df = New_Process_df.select(F.col('unnested_value.sequence').alias('sequence'),
                        F.col('unnested_value.machine').alias('machine'),
                        F.col('unnested_value.PID').alias('PID'),
                        F.col('unnested_value.TRUN').alias('TRUN'),
                        F.col('unnested_value.TSLPI').alias('TSLPI'),
                        F.col('unnested_value.TSLPU').alias('TSLPU'),
                        F.col('unnested_value.POLI').alias('POLI'),
                        F.col('unnested_value.NICE').alias('NICE'),
                        F.col('unnested_value.PRI').alias('PRI'),
                        F.col('unnested_value.RTPR').alias('RTPR'),
                        F.col('unnested_value.CPUNR').alias('CPUNR'),
                        F.col('unnested_value.Status').alias('Status'),
                         F.col('unnested_value.EXC').alias('EXC'),
                         F.col('unnested_value.State').alias('State'),
                         F.col('unnested_value.CPU').alias('CPU'),
                         F.col('unnested_value.CMD').alias('CMD'),
                        F.col('unnested_value.ts').alias('ts'))

mapping = {'0':'0','100':'20','101':'19','109':'11','110':'10','120':'0','121':'1','130':'10','139':'19'}

New_Process_df = New_Process_df.select(F.col('sequence').cast('integer').alias('sequence'),
                        F.col('machine').cast('integer').alias('machine'),
                        F.col('PID').cast('integer').alias('PID'),
                        F.col('TRUN').cast('integer').alias('TRUN'),
                        F.col('TSLPI').cast('integer').alias('TSLPI'),
                        F.col('TSLPU').cast('integer').alias('TSLPU'),
                        F.col('POLI').cast('string').alias('POLI'),
                        F.col('NICE').cast('integer').alias('NICE'),
                        F.col('PRI').cast('integer').alias('PRI'),
                        F.col('RTPR').cast('integer').alias('RTPR'),
                        F.col('CPUNR').cast('integer').alias('CPUNR'),
                        F.col('Status').cast('string').alias('Status'),
                        F.col('EXC').cast('integer').alias('EXC'),
                        F.col('State').cast('string').alias('State'),
                        F.col('CPU').cast('integer').alias('CPU'),
                        F.col('CMD').cast('string').alias('CMD'),
                        F.col('ts').cast('integer').alias('ts'))

New_Process_df.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: integer (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



In [13]:
def foreach_batch_function(df, epoch_id):
    
    df.show(10,False)
    
q1 = New_Process_df.writeStream.foreachBatch(foreach_batch_function).start()   

In [14]:
process_query = New_Process_df \
    .writeStream \
    .outputMode("append") \
    .format("console").start()


In [15]:
q1.stop()

KeyboardInterrupt: 

### MEMORY

In [None]:
topic2 = 'memory'
df_memory = spark \
          .readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
          .option("subscribe", topic2) \
          .load()

New_Memory_df= df_memory.selectExpr("CAST(value AS STRING)")

memory_schema = ArrayType(StructType([
    StructField('sequence',StringType(),True),  
    StructField('machine',StringType(),True),
    StructField('PID',StringType(),True),
    StructField('MINFLT',StringType(),True),
    StructField('MAJFLT',StringType(),True),
    StructField('VSTEXT',StringType(),True),
    StructField('VSIZE',StringType(),True),
    StructField('RSIZE',StringType(),True),
    StructField('VGROW',StringType(),True),
    StructField('RGROW',StringType(),True),
    StructField('MEM',StringType(),True),
    StructField('CMD',StringType(),True),
    StructField('ts',StringType(),True)
]))

New_Memory_df = New_Memory_df.select(F.from_json(F.col('value').cast("string"),memory_schema).alias('parsed_value'))

New_Memory_df = New_Memory_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

New_Memory_df = New_Memory_df.select(F.col('unnested_value.sequence').alias('sequence'),   
                        F.col('unnested_value.machine').alias('machine'),
                        F.col('unnested_value.PID').alias('PID'),
                        F.col('unnested_value.MINFLT').alias('MINFLT'),
                        F.col('unnested_value.MAJFLT').alias('MAJFLT'),
                        F.col('unnested_value.VSTEXT').alias('VSTEXT'),
                        F.col('unnested_value.VSIZE').alias('VSIZE'),
                        F.col('unnested_value.RSIZE').alias('RSIZE'),
                        F.col('unnested_value.VGROW').alias('VGROW'),
                        F.col('unnested_value.RGROW').alias('RGROW'),
                        F.col('unnested_value.MEM').alias('MEM'),
                        F.col('unnested_value.CMD').alias('CMD'),
                        F.col('unnested_value.ts').alias('ts'))

for i in New_Memory_df.columns:
    New_Memory_df=New_Memory_df.withColumn(i, F.when(New_Memory_df[i].rlike('0K') == True, regexp_replace(i,'K','')).otherwise(New_Memory_df[i]))
    New_Memory_df=New_Memory_df.withColumn(i, F.when(New_Memory_df[i].rlike('K') == True, regexp_replace(i,'K','000')).otherwise(New_Memory_df[i]))
    New_Memory_df=New_Memory_df.withColumn(i, F.when(New_Memory_df[i].rlike('\s') == True, regexp_replace(i,'\s','')).otherwise(New_Memory_df[i]))
    New_Memory_df=New_Memory_df.withColumn(i, F.when(New_Memory_df[i].rlike('\d+\.[1-9]M') == True, regexp_replace(i,'\.','')).otherwise(New_Memory_df[i]))
    New_Memory_df=New_Memory_df.withColumn(i, F.when(New_Memory_df[i].rlike('\d+M') == True, regexp_replace(i,'M','00000')).otherwise(New_Memory_df[i]))

New_Memory_df =New_Memory_df.select(F.col('sequence').cast('integer').alias('sequence'),   
                        F.col('machine').cast('integer').alias('machine'),
                        F.col('PID').cast('integer').alias('PID'),
                        F.col('MINFLT').cast('integer').alias('MINFLT'),
                        F.col('MAJFLT').cast('integer').alias('MAJFLT'),
                        F.col('VSTEXT').cast('integer').alias('VSTEXT'),
                        F.col('VSIZE').cast('integer').alias('VSIZE'),
                        F.col('RSIZE').cast('integer').alias('RSIZE'),
                        F.col('VGROW').cast('integer').alias('VGROW'),
                        F.col('RGROW').cast('integer').alias('RGROW'),
                        F.col('MEM').cast('integer').alias('MEM'),
                        F.col('CMD').cast('string').alias('CMD'),
                        F.col('ts').cast('integer').alias('ts'))

New_Memory_df.printSchema()

In [None]:
def foreach_batch_function(df, epoch_id):
   
    df.show(10,False)

q1 = New_Memory_df.writeStream.foreachBatch(foreach_batch_function).start()   


In [None]:
memory_query = New_Memory_df \
    .writeStream \
    .outputMode("append") \
    .format("console")\
    .option("truncate","false")\
    .start()

In [None]:
memory_query.stop()

In [None]:
#Process

New_Process_df = New_Process_df.withColumn('CMD_PID',F.concat(F.expr('CMD'), F.expr('PID').cast('string')).alias('CMD_PID'))

New_Process_df = New_Process_df.withColumn('event_time', F.col('ts').cast('timestamp'))

Window_Process_df = New_Process_df.withWatermark('event_time', '20 seconds')

#Memory

New_Memory_df = New_Memory_df.withColumn('CMD_PID',F.concat(F.expr('CMD'), F.expr('PID').cast('string')).alias('CMD_PID'))

New_Memory_df = New_Memory_df.withColumn('event_time', F.col('ts').cast('timestamp'))

Window_Memory_df = New_Memory_df.withWatermark('event_time', '20 seconds')


In [None]:
if 'checkpoints' in os.listdir():
    print('Checkpoint folder exists')
else:
    os.mkdir('checkpoints')


Window_Process_df.writeStream.format("parquet").option("checkpointLocation", "checkpoints").option("path", "process/process.parquet").start()
    
Window_Memory_df.writeStream.format("parquet").option("checkpointLocation", "checkpoints").option("path", "process/memory.parquet").start()


In [None]:
ProcessModel = PipelineModel.load('process_pipeline_model')

MemoryModel = PipelineModel.load('memory_pipeline_model')

In [None]:
#Process
Process_df = ProcessModel.transform(Window_Process_df)
Process_df = Process_df.select(F.col('sequence'), F.col('machine'),F.col('PID'),F.col('TRUN'),F.col('TSLPI'),\
                 F.col('TSLPU'),F.col('POLI'),F.col('NICE'),F.col('PRI'),F.col('RTPR'),\
                 F.col('CPUNR'),F.col('Status'), F.col('EXC'), F.col('State'),F.col('CPU'),F.col('CMD'),F.col('ts'),\
                 F.col('event_time'), F.col('CMD_PID'),F.col('prediction'))

#Memory 
Memory_df = MemoryModel.transform(Window_Memory_df)
Memory_df = Memory_df.select(F.col('sequence'),F.col('machine'),F.col('PID'),F.col('MINFLT'),F.col('MAJFLT'),F.col('VSTEXT'),\
                 F.col('VSIZE'),F.col('RSIZE'),F.col('VGROW'),F.col('RGROW'),F.col('MEM'),F.col('CMD'),F.col('ts'),\
                 F.col('event_time'),F.col('CMD_PID'),F.col('prediction'))

In [None]:
def foreach_batch_function(df, epoch_id):
   
    df.show(10,False)

In [None]:
#Process
windowed_Process_df = Process_df.filter(F.col('prediction') == 1.0)
windowed_Process_df = windowed_Process_df.select(F.col('machine'),F.col('event_time'),F.col('CMD_PID'))\
                .groupBy('machine', F.window('event_time', "2 Minutes")).agg(F.approx_count_distinct('CMD_PID').alias('count'))


Process_sink = windowed_Process_df.writeStream \
    .queryName("Process_sink") \
    .outputMode("complete") \
    .format("memory") \
    .start()

#Memory
windowed_Memory_df = Memory_df.filter(F.col('prediction') == 1.0)
windowed_Memory_df = windowed_Memory_df.select(F.col('machine'),F.col('event_time'),F.col('CMD_PID'))\
                .groupBy('machine', F.window('event_time', "2 Minutes")).agg(F.approx_count_distinct('CMD_PID').alias('count'))

Memory_sink = windowed_Memory_df.writeStream \
    .queryName("Memory_sink") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [None]:
def foreach_batch_function(df, epoch_id):   
    df.show(20,False)
q1 = windowed_Memory_df.writeStream.foreachBatch(foreach_batch_function).start()   

In [None]:
Window_process = Process_df.withColumnRenamed('CMD_PID','wp_CMD_PID')
Window_process = Window_process.withWatermark('event_time','30 seconds')

Window_memory = Memory_df.withColumnRenamed('CMD_PID','wm_CMD_PID')
Window_memory = Window_memory.withWatermark('event_time','30 seconds')

Join_df = Window_process.join(Window_memory,expr("""wp_CMD_PID == wm_CMD_PID"""),"inner")

Join_df.withColumn('processing_time', F.lit(dt.datetime.now(timezone('UTC'))))

In [None]:
Join_df.writeStream.format("parquet")\
        .option("checkpointLocation", "checkpoints")\
        .option("path", "process/process_memory_attack.parquet").start()

In [None]:
q1 = Join_df.writeStream.foreachBatch(foreach_batch_function).start()

In [None]:
def init_plots():
    try:
        width = 9.5
        height = 6
        fig = plt.figure(figsize=(width,height)) 
        fig.subplots_adjust(hspace=0.8)
        ax = fig.add_subplot(111) 
        ax.set_xlabel('Time')
        ax.set_ylabel('Counts')
        ax.title.set_text('Time Vs Value')
        fig.suptitle('Supected_Attacks') 
        fig.show() 
        fig.canvas.draw() 
        return fig, ax
    except Exception as ex:
        print(str(ex))

In [None]:
import time
import matplotlib.pyplot as plt
%matplotlib notebook

fig, ax = init_plots()

            msg = msg.append(data)
            
            if True:
                
                t_0 = dt.datetime.utcfromtimestamp(int(list(msg['event_ts'])[0])).strftime('%M')
                t_1 = dt.datetime.utcfromtimestamp(int(list(msg['event_ts'])[-1])).strftime('%M')
               
                ax.clear()
                temp = msg.groupby(['machine','event_ts']).count().reset_index()
                
                for i in range(0,len(temp)):
                    temp.loc[i,'event_time'] = str(dt.datetime.utcfromtimestamp(int(temp.loc[i,'event_time'])).strftime('%H:%M:%S'))
                 
                
                mach4 = temp[temp['machine']=='4']
                mach5 = temp[temp['machine']=='5']
                mach6 = temp[temp['machine']=='6']
                mach7 = temp[temp['machine']=='7']
                mach8 = temp[temp['machine']=='8']
                plt.plot(mach4['event_ts'],mach4['sequence'], label = '4', c='blue')
                plt.plot(mach5['event_ts'],mach5['sequence'], label = '5', c='green')
                plt.plot(mach6['event_ts'],mach6['sequence'], label = '6', c='black')
                plt.plot(mach7['event_ts'],mach7['sequence'], label = '7', c='red')
                plt.plot(mach8['event_ts'],mach8['sequence'], label = '8', c='purple')
                plt.xticks(rotation=45)
              
                ax.legend()
                fig.show()
                fig.canvas.draw()
                if abs(int(t_1)-int(t_0)) >= 2:
                    msg = msg[msg['ts'] !=list(msg['ts'])[0]].copy()
        plt.close('all')

    except Exception as ex:
        print(str(ex))
