In [1]:
import findspark
findspark.init()
import json
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [2]:
# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('OutlierDetection')
         # Add kafka package
         .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .config("spark.mongodb.input.uri","mongodb://127.0.0.1/RizkyApp")
         .config("spark.mongodb.output.uri","mongodb://127.0.0.1/RizkyApp")
         .getOrCreate())
spark.conf.set("spark.sql.caseSensitive", "true")
sc = spark.sparkContext

In [3]:
jsonFormatSchema = spark.read.json("schema/schema.json")
rawData = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") # kafka server
  .option("subscribe", "netflowmeter") # topic
  .option("startingOffsets", "latest") 
  .load())

In [4]:
parsedData = rawData.selectExpr("cast (value as string) as json").select(F.from_json("json",jsonFormatSchema.schema).alias("data")).select("data.*")
featureExtraction = parsedData.select(F.col('flow_id'), F.col('src_ip'), F.col('src_port'), F.col('dst_ip'), F.col('dst_port'), F.col('protocol'), F.col('timestamp'),F.col("extractFeature.*"))

In [5]:
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [6]:
data_flat = flatten_df(featureExtraction)
duplicate_columns = ['ActivePacket_max', 'ActivePacket_mean', 'ActivePacket_min', 'ActivePacket_std', 'IdlePacket_max', 'IdlePacket_mean', 'IdlePacket_min','IdlePacket_std',]
data_flat = data_flat.drop(*duplicate_columns)

In [7]:
df_column = spark.read.csv('data/Benign_tok.csv', header="true", inferSchema =True)

In [8]:
vector_assembler = VectorAssembler(inputCols=df_column.columns, outputCol="vector_feature")
data_flat = vector_assembler.transform(data_flat)

In [9]:
# model
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler


benign_df = pd.read_csv('data/Benign_tok.csv')

scaler = StandardScaler()
X = scaler.fit(benign_df).transform(benign_df)


In [10]:
from outlier import OutlierDetection

clf = OutlierDetection(benign=X, cluster_n=80, percentile=95)

In [11]:
def model_udf(x):
  newlist = [x]
  normalization = scaler.fit(benign_df).transform(newlist)
  z = clf.predict(normalization)
  y = int(z[0])
  return y

label_udf = F.udf(model_udf, IntegerType())
data =  data_flat.withColumn('Label', label_udf(data_flat['vector_feature']))

In [12]:
from datetime import datetime 
from decimal import Decimal

def date_udf(x):
  dec = Decimal(x)
  c = datetime.fromtimestamp(int(dec)/1000).strftime("%Y-%m-%d %H:%M:%S")
  return c

datetime_udf = F.udf(date_udf, StringType())
pred_df= data.withColumn('datetime', datetime_udf(data['timestamp']))

In [13]:
kolom = ['vector_feature','timestamp']
pred_df = pred_df.drop(*kolom)

In [14]:
def write_mongo_row(df, epoch_id):
    df.write.format("mongo").mode("append").option("database","RizkyApp").option("collection", "density_pcap").save()
    pass

query=pred_df.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()

KeyboardInterrupt: 