In [1]:
%pip install findspark pymongo

Note: you may need to restart the kernel to use updated packages.


# Import Package

In [2]:
import os
import time
import json
import pyspark
import findspark
from gridfs import GridFS
from decimal import Decimal
from datetime import datetime
from pymongo import MongoClient

from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.feature import StandardScalerModel

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sklearn.metrics import confusion_matrix
from pyspark.ml.feature import VectorAssembler, StandardScaler

findspark.init()

# DB Connections

In [3]:
mongo_uri = "mongodb://mongodb:27017/mataelanglab.stream_result"

# Spark session & context

In [4]:
jarsPackages = "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2" \
    +",org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1"

In [5]:
spark = (SparkSession
         .builder
         .master('local[2]')
         .appName('DecisionTree-Stream')
         .config("spark.executor.memory", "4g") #optional
         .config("spark.executor.cores","1") #optional
         .config("spark.jars.packages", jarsPackages)
         .config("spark.mongodb.input.uri",mongo_uri)
         .config("spark.mongodb.output.uri",mongo_uri)
         .getOrCreate())
spark.conf.set("spark.sql.caseSensitive", "true")
sc = spark.sparkContext

# Load Model & Schema

In [6]:
input_path = "/home/jovyan/output"

In [7]:
# Create Decision Tree classifer object
dt = DecisionTreeClassificationModel()

# Load Model
dtModel = DecisionTreeClassificationModel.load(input_path+"/spark-model/decission-tree/feature-importance/model")

In [8]:
# Load StandardScaler
scaler = StandardScalerModel.load(input_path+"/spark-model/decission-tree/feature-importance/standard-scaler")

In [9]:
payloadSchema = spark.read.option("multiline", "true").json(input_path+"/schema/data.json")

# Structured Streaming

## Get kafka Bootstrap Server URI

In [10]:
bootstrapServers = os.environ['BOOTSTRAP_SERVERS']
print(bootstrapServers)

broker:29092


## Structured Streaming Process

In [11]:
streamingDF = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers) # kafka server
    .option("subscribe", "netflowmeter") # topic
    .option("startingOffsets", "latest")
    .load())

## Cast Stream result to Dataframe

In [12]:
df = streamingDF \
    .selectExpr("cast (value as string) as json") \
    .select(F.get_json_object("json","$.payload").alias('payload')) \
    .select(F.from_json("payload",payloadSchema.schema).alias("data")) \
    .select("data.*")

# Choose Used Feature 
(From DecissionTree-FeatureImportance)

In [None]:
featureImportance = [
    'total_fwd_packet',
    'bwd_packets_per_s',
    'down_per_up_ratio',
    'flow_duration',
    'fwd_packets_per_s',
    'fwd_seg_size_min',
    'syn_flag_count',
    'flow_iat_mean',
    'idle_max',
    'idle_mean',
    'packet_length_min',
]

# Pre-Processing

In [14]:
#Vector Assembler
vector_assembler = VectorAssembler(inputCols=feature, outputCol="SS_features")
df = vector_assembler.transform(df.na.fill(0))

#Standard Scaler
df = scaler.transform(df) # featureName is 'scaledFeature'

# Prediction Process

In [15]:
df = dtModel.transform(df)

In [16]:
df = df.withColumn("prediction", df["prediction"].cast(IntegerType()))

In [17]:
def prediction(x):
    switcher = {
        0: "Normal",
        1: "Neris",
        2: "Rbot",
        3: "Virut",
        4: "IRC"
    }
    
    return switcher.get(x, "Anomaly")

In [18]:
labeled = F.udf(prediction, StringType())
df = df.withColumn('str_label', labeled(df['prediction']))

## Cast UTC-Timestamp to String Datetime

In [19]:
def date_convert(x):
    timestamp = int(x)
    
    return datetime.fromtimestamp(timestamp/1000000).strftime("%Y-%m-%d %H:%M:%S")

In [20]:
str_date = F.udf(date_convert, StringType())
df = df.withColumn('datetime', str_date(df['timestamp']))

## Remove Processing Feature

In [21]:
post_process_feature = [
    'SS_features',
    'scaledFeatures',
    'timestamp',
    'rawPrediction',
    'probability',
    'prediction'
]

result = df.drop(*post_process_feature)

# Write Result to MongoDB

In [22]:
def write_mongo_row(data, epoch_id):
    data.write \
    .format("mongo") \
    .mode("append") \
    .option("database","mataelanglab") \
    .option("collection", "stream_result") \
    .save()
    pass

In [23]:
query = result.writeStream.foreachBatch(write_mongo_row).start()

In [None]:
query.awaitTermination()

In [None]:
sc.stop()