### Importing Packages

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark.sql.functions as funcs
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
.master("local[4]")\
.appName("ReadFromCsv")\
.config("spark.driver.memory","3g")\
.config("spark.executor.memory", "4g")\
.getOrCreate()

In [4]:
'''logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)'''

'logger = spark.sparkContext._jvm.org.apache.log4j\nlogger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)\nlogger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)'

## Predict on Stream

# 1. Load Dataset

In [5]:
iris = spark.read \
.format("csv")\
.option("header", True)\
.option("sep", ",")\
.option("inferSchema", "True")\
.load("TrainDf.csv")

In [6]:
iris.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- srv_count: integer (nullable = true)
 |-- serror_rate: double (nullable = true)
 |-- srv_serror_rate: double (nul

# 2. Data Preparation for Training

In [7]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

In [8]:
#train, test = data.randomSplit([0.70, 0.30])

### 2.1 Data Vectorization and String Indexing

In [9]:
label_indexer = StringIndexer(inputCol = "status", outputCol = "label")

label_indexer_model = label_indexer.fit(iris)
new_df = label_indexer_model.transform(iris)

In [10]:
feature_cols = iris.columns[:-1]

In [11]:
assembler = VectorAssembler(inputCols = feature_cols, outputCol = 'vec_features')
assembler_df = assembler.transform(new_df)

In [12]:
normal = assembler_df.where(funcs.col("status") == "normal")

### 2.2 Feature Reduction using PCA (Principal Component Analysis)

In [13]:
from pyspark.ml.feature import PCA

pca = PCA(k=9, inputCol="vec_features", outputCol="features")
pcaModel = pca.fit(normal)
normal_reduction_df = pcaModel.transform(normal)

In [14]:
#normal_reduction_df.toPandas().head(3)

# 3. Train Model (K-Means Clustering)

### 3.1 Training of Data

In [15]:
from pyspark.ml.clustering import KMeans

k_num = 2
kmeans = KMeans(featuresCol='features',k=k_num, maxIter=100)
model = kmeans.fit(normal_reduction_df)

In [16]:
wssse = model.computeCost(normal_reduction_df)
print("With K= ",k_num)
print("Within Set Sum of Squared Errors = " + str(wssse))
print('--'*30)

With K=  2
Within Set Sum of Squared Errors = 3989379345491703.0
------------------------------------------------------------


### 3.1.1 Prediction Training Dataset

In [17]:
pca = PCA(k=9, inputCol="vec_features", outputCol="features")
pcaModel = pca.fit(assembler_df)
test_reduction_df = pcaModel.transform(assembler_df)

predictions = model.transform(test_reduction_df)
predictions = predictions.select("features","label","prediction")
predictions.toPandas().head()

Unnamed: 0,features,label,prediction
0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,0
1,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,0
2,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,0
3,"[-234.14534818723268, 8152.938638227429, -1.34...",0.0,0
4,"[-199.11045038797118, 419.94760229094754, -0.8...",0.0,0


### 3.1.2 Calculation of Silhouette Score

In [18]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9999063686630117


### 3.2 Analysing of Trained model

#### Analysing of how many labels have in a cluster ?

In [19]:
predictions.select("prediction", "label").groupBy("prediction", "label").count()\
.orderBy("prediction", "label", ascending=True).toPandas().head(10)

Unnamed: 0,prediction,label,count
0,0,0.0,67342
1,0,1.0,58624
2,1,0.0,1
3,1,1.0,6


In [20]:
predictions.select("prediction", "label")\
.groupBy("prediction", "label").count()\
.orderBy("prediction", "label", ascending=True).withColumn("status",
funcs.when(funcs.col("label").isin(1), "Anomaly")\
.otherwise("Normal")).toPandas().head()

Unnamed: 0,prediction,label,count,status
0,0,0.0,67342,Normal
1,0,1.0,58624,Anomaly
2,1,0.0,1,Normal
3,1,1.0,6,Anomaly


### 3.3 Calculation of centroids for every cluster

In [21]:
train_clusters = model.clusterCenters()

traind_clusters = {int(i):[float(train_clusters[i][j]) for j in range(len(train_clusters[i]))] 
              for i in range(len(train_clusters))}
train_clusters

[array([-1.20423278e+04,  3.48623574e+03,  1.60883673e+02,  1.94950168e+02,
        -1.24698823e+02,  8.15386326e+01,  1.56155160e+00, -2.08317658e+00,
        -1.04482667e+00]),
 array([-8.98448834e+07,  1.46547928e+06, -9.88035469e+02,  6.53455066e+02,
        -7.73814664e+02,  1.75194282e+02, -1.31331999e+02, -3.31423494e+00,
        -5.18382902e+00])]

In [22]:
train_df_centers = spark.sparkContext.parallelize([(k,)+(v,) for k,v in traind_clusters.items()]).toDF(['prediction','center'])
train_df_centers.toPandas().head()

Unnamed: 0,prediction,center
0,0,"[-12042.327840369986, 3486.2357374678886, 160...."
1,1,"[-89844883.37984084, 1465479.2835174028, -988...."


In [23]:
train_pred_df = predictions.withColumn('prediction',funcs.col('prediction').cast(IntegerType()))
train_pred_df.toPandas().head()

Unnamed: 0,features,label,prediction
0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,0
1,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,0
2,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,0
3,"[-234.14534818723268, 8152.938638227429, -1.34...",0.0,0
4,"[-199.11045038797118, 419.94760229094754, -0.8...",0.0,0


### Joining of centroid and feature dataframes

In [24]:
train_pred_df = train_pred_df.join(train_df_centers,on='prediction',how='left')
train_pred_df.toPandas().head()

Unnamed: 0,prediction,features,label,center
0,0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,"[-12042.327840369986, 3486.2357374678886, 160...."
1,0,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,"[-12042.327840369986, 3486.2357374678886, 160...."
2,0,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,"[-12042.327840369986, 3486.2357374678886, 160...."
3,0,"[-234.14534818723268, 8152.938638227429, -1.34...",0.0,"[-12042.327840369986, 3486.2357374678886, 160...."
4,0,"[-199.11045038797118, 419.94760229094754, -0.8...",0.0,"[-12042.327840369986, 3486.2357374678886, 160...."


### 3.4 Finding Anomaly Values

#### Getting distance values

In [25]:
get_dist = funcs.udf(lambda features, center :
                 float(features.squared_distance(center)),FloatType())

#### Sorting the furthest distance values

In [26]:
train_pred_df = train_pred_df.withColumn('dist',get_dist(funcs.col('features'),funcs.col('center')))
train_pred_df.toPandas().sort_values(by="dist",ascending=False).head(10)

Unnamed: 0,prediction,features,label,center,dist
62229,0,"[-344699.28443941544, 1309937355.56201, -18580...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",1.715927e+18
125972,1,"[-1379963840.675653, -363125.98013465933, -723...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.66441e+18
125968,1,"[-1167519457.088633, -307222.9200287654, -2060...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.161386e+18
125969,1,"[-693375615.6526163, -182456.42189769473, -217...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",3.642521e+17
125967,1,"[-621568642.3748622, -163560.10928325454, 1875...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",2.827328e+17
95024,0,"[-105334.45555511035, 400291046.93912166, 3082...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",1.602301e+17
71625,0,"[-105334.45746944736, 400291046.9404984, 30890...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",1.602301e+17
125966,1,"[-381709077.7164838, -100442.98113029718, 2370...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",8.518716e+16
125970,1,"[-217277332.47841534, -57174.04977907944, 2848...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.624135e+16
49320,0,"[-24418806.23856078, 111734.38670165755, -665....",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",595701800000000.0


### 3.5 Calculation of Threshold Value according to distance

### 3.5.1 Average distance for every cluster 

In [27]:
averageDistance = train_pred_df.filter(funcs.col("label") == 0.0).groupBy("prediction")\
.agg(funcs.avg("dist").alias("avgDist"))
averageDistance.toPandas().head()

Unnamed: 0,prediction,avgDist
0,1,30755570000000.0
1,0,59241370000.0


### 3.5.2 Maximum distance for every cluster 

In [28]:
maxDistance = train_pred_df.filter(funcs.col("label") == 0.0).groupBy("prediction")\
.agg(funcs.max("dist").alias("maxDist"))
maxDistance.toPandas().head()

Unnamed: 0,prediction,maxDist
0,1,30755570000000.0
1,0,595701800000000.0


### Predicted Dataframe

In [29]:
train_pred_df.toPandas().head(5)

Unnamed: 0,prediction,features,label,center,dist
0,0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",145752576.0
1,0,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",153923984.0
2,0,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",157467920.0
3,0,"[-234.14534818723268, 8152.938638227429, -1.34...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",161380816.0
4,0,"[-199.11045038797118, 419.94760229094754, -0.8...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",149992128.0


#### Joining of predicted and threshold dataframes

In [30]:
anomalyDetection = train_pred_df.join(maxDistance, maxDistance.prediction == train_pred_df.prediction)

anomalyDetection.toPandas().head(10)

Unnamed: 0,prediction,features,label,center,dist,prediction.1,maxDist
0,1,"[-381709077.7164838, -100442.98113029718, 2370...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",8.518716e+16,1,30755570000000.0
1,1,"[-621568642.3748622, -163560.10928325454, 1875...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",2.827328e+17,1,30755570000000.0
2,1,"[-1167519457.088633, -307222.9200287654, -2060...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.161386e+18,1,30755570000000.0
3,1,"[-693375615.6526163, -182456.42189769473, -217...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",3.642521e+17,1,30755570000000.0
4,1,"[-217277332.47841534, -57174.04977907944, 2848...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.624135e+16,1,30755570000000.0
5,1,"[-89583366.54655331, 7005079.189098301, 2188.4...",0.0,"[-89844883.37984084, 1465479.2835174028, -988....",30755570000000.0,1,30755570000000.0
6,1,"[-1379963840.675653, -363125.98013465933, -723...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.66441e+18,1,30755570000000.0
7,0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",145752600.0,0,595701800000000.0
8,0,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",153924000.0,0,595701800000000.0
9,0,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",157467900.0,0,595701800000000.0


### 3.6 Assigning of labels as normal or anomaly

In [31]:
detected_df = anomalyDetection.withColumn("detected", funcs.when(anomalyDetection.dist > anomalyDetection.maxDist, "Anomaly").otherwise("Normal"))
detected_df.toPandas().head(20)

Unnamed: 0,prediction,features,label,center,dist,prediction.1,maxDist,detected
0,1,"[-381709077.7164838, -100442.98113029718, 2370...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",8.518716e+16,1,30755570000000.0,Anomaly
1,1,"[-621568642.3748622, -163560.10928325454, 1875...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",2.827328e+17,1,30755570000000.0,Anomaly
2,1,"[-1167519457.088633, -307222.9200287654, -2060...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.161386e+18,1,30755570000000.0,Anomaly
3,1,"[-693375615.6526163, -182456.42189769473, -217...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",3.642521e+17,1,30755570000000.0,Anomaly
4,1,"[-217277332.47841534, -57174.04977907944, 2848...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.624135e+16,1,30755570000000.0,Anomaly
5,1,"[-89583366.54655331, 7005079.189098301, 2188.4...",0.0,"[-89844883.37984084, 1465479.2835174028, -988....",30755570000000.0,1,30755570000000.0,Normal
6,1,"[-1379963840.675653, -363125.98013465933, -723...",1.0,"[-89844883.37984084, 1465479.2835174028, -988....",1.66441e+18,1,30755570000000.0,Anomaly
7,0,"[-490.9999645758216, -0.12919664884781498, 0.1...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",145752600.0,0,595701800000000.0,Normal
8,0,"[-145.9999684495366, -0.03840430885426629, 0.4...",0.0,"[-12042.327840369986, 3486.2357374678886, 160....",153924000.0,0,595701800000000.0,Normal
9,0,"[4.088872553689712e-05, 3.0127997739169755e-07...",1.0,"[-12042.327840369986, 3486.2357374678886, 160....",157467900.0,0,595701800000000.0,Normal


### 3.7 Evaluation of result using Confusion Matrix

In [32]:
conf_matrix = detected_df.withColumn("label",funcs.when(funcs.col("label").isin(1), "Anomaly").otherwise("Normal"))\
.groupBy("label","detected").count()
conf_matrix.toPandas().head(20)

Unnamed: 0,label,detected,count
0,Anomaly,Normal,58621
1,Anomaly,Anomaly,9
2,Normal,Normal,67343


### 3.7.1 Calculation of Accuracy 

In [33]:
all_df = detected_df.count()
tptn = conf_matrix.filter(conf_matrix.label == conf_matrix.detected).agg(funcs.sum("count")).select("sum(count)").toPandas().head()
tptn = tptn.at[0, 'sum(count)']

In [34]:
accuracy = tptn / all_df
print("Accuracy: ", accuracy)

Accuracy:  0.53465425130782


### 3.7.2 Calculation of Recall

In [35]:
tp = conf_matrix.filter((conf_matrix.label == "Normal") & ( conf_matrix.detected == "Normal")).select("count").toPandas()
fn = conf_matrix.filter((conf_matrix.label == "Anomaly") & ( conf_matrix.detected == "Normal")).select("count").toPandas()
tp = tp.at[0, 'count']
fn = fn.at[0, 'count']

In [36]:
recall = tp / (tp + fn)
print("Recall: ", recall)

Recall:  0.5346210028262043


# 4. Streaming Process

In [37]:
schema = StructType(
[
    StructField("duration", FloatType(), True),
    StructField("src_bytes", FloatType(), True),
    StructField("dst_bytes", FloatType(), True),
    StructField("land", FloatType(), True),
    StructField("wrong_fragment", FloatType(), True),
    StructField("urgent", FloatType(), True),
    StructField("hot", FloatType(), True),
    StructField("num_failed_logins", FloatType(), True),
    StructField("logged_in", FloatType(), True),
    StructField("num_compromised", FloatType(), True),
    StructField("root_shell", FloatType(), True),
    StructField("su_attempted", FloatType(), True),
    StructField("num_root", FloatType(), True),
    StructField("num_file_creations", FloatType(), True),
    StructField("num_shells", FloatType(), True),
    StructField("num_access_files", FloatType(), True),
    StructField("num_outbound_cmds", FloatType(), True),
    StructField("is_host_login", FloatType(), True),
    StructField("is_guest_login", FloatType(), True),
    StructField("count", FloatType(), True),
    StructField("srv_count", FloatType(), True),
    StructField("serror_rate", FloatType(), True),
    StructField("srv_serror_rate", FloatType(), True),
    StructField("rerror_rate", FloatType(), True),
    StructField("srv_rerror_rate", FloatType(), True),
    StructField("same_srv_rate", FloatType(), True),
    StructField("diff_srv_rate", FloatType(), True),
    StructField("srv_diff_host_rate", FloatType(), True),
    StructField("dst_host_count", FloatType(), True),
    StructField("dst_host_srv_count", FloatType(), True),
    StructField("dst_host_same_srv_rate", FloatType(), True),
    StructField("dst_host_diff_srv_rate", FloatType(), True),
    StructField("dst_host_same_src_port_rate", FloatType(), True),
    StructField("dst_host_srv_diff_host_rate", FloatType(), True),
    StructField("dst_host_serror_rate", FloatType(), True),
    StructField("dst_host_srv_serror_rate", FloatType(), True),
    StructField("dst_host_rerror_rate", FloatType(), True),
    StructField("dst_host_srv_rerror_rate", FloatType(), True),
    
    StructField("status", StringType(), True)
])

In [38]:
iris_data = spark.readStream \
.format("csv")\
.option("header", True)\
.option("sep", ",")\
.schema(schema)\
.load("data")\

In [39]:
iris.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- srv_count: integer (nullable = true)
 |-- serror_rate: double (nullable = true)
 |-- srv_serror_rate: double (nul

In [40]:
features_array = iris_data.selectExpr("""array(
CAST(duration AS FLOAT),
CAST(src_bytes AS FLOAT), 
CAST(dst_bytes AS FLOAT), 
CAST(land AS FLOAT),
CAST(wrong_fragment AS FLOAT), 
CAST(urgent AS FLOAT), 
CAST(hot AS FLOAT), 
CAST(num_failed_logins AS FLOAT), 
CAST(logged_in AS FLOAT),
CAST(num_compromised AS FLOAT), 
CAST(root_shell AS FLOAT),
CAST(su_attempted AS FLOAT), 
CAST(num_root AS FLOAT),
CAST(num_file_creations AS FLOAT), 
CAST(num_shells  AS FLOAT), 
CAST(num_access_files  AS FLOAT),
CAST(num_outbound_cmds  AS FLOAT), 
CAST(is_host_login  AS FLOAT),
CAST(is_guest_login  AS FLOAT), 
CAST(count AS FLOAT), 
CAST(srv_count AS FLOAT),
CAST(serror_rate AS FLOAT), 
CAST(srv_serror_rate AS FLOAT),
CAST(rerror_rate AS FLOAT), 
CAST(srv_rerror_rate AS FLOAT),
CAST(same_srv_rate AS FLOAT),
CAST(diff_srv_rate AS FLOAT), 
CAST(srv_diff_host_rate AS FLOAT),
CAST(dst_host_count AS FLOAT),
CAST(dst_host_srv_count AS FLOAT), 
CAST(dst_host_same_srv_rate AS FLOAT),
CAST(dst_host_diff_srv_rate AS FLOAT),
CAST(dst_host_same_src_port_rate AS FLOAT),
CAST(dst_host_srv_diff_host_rate AS FLOAT), 
CAST(dst_host_serror_rate AS FLOAT),
CAST(dst_host_srv_serror_rate AS FLOAT),
CAST(dst_host_rerror_rate AS FLOAT), 
CAST(dst_host_srv_rerror_rate AS FLOAT)

) as arr""", 
                                      "status")

## 4.1 Vectorization of data

In [41]:
tovec_udf = funcs.udf(lambda r: Vectors.dense(r), VectorUDT())
data_stream = features_array.withColumn("vec_features", tovec_udf("arr"))

## 4.2 Feature Reduction 

In [42]:
reduced_data_stream = pcaModel.transform(data_stream)

# 5. Prediction Process

### 5.1 Prediction of Streaming Data

In [43]:
streaming_prediction = model.transform(reduced_data_stream)

In [44]:
streaming_prediction.printSchema()

root
 |-- arr: array (nullable = false)
 |    |-- element: float (containsNull = true)
 |-- status: string (nullable = true)
 |-- vec_features: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)



### 5.2 Finding Anomaly

In [45]:
train_df_centers.toPandas().head()

Unnamed: 0,prediction,center
0,0,"[-12042.327840369986, 3486.2357374678886, 160...."
1,1,"[-89844883.37984084, 1465479.2835174028, -988...."


In [46]:
df_pred = streaming_prediction.withColumn('prediction',funcs.col('prediction').cast(IntegerType()))

In [47]:
df_pred = df_pred.join(train_df_centers,on='prediction',how='left')

### 5.2.1 Getting Euclidean Distance for features

In [48]:
get_dist = funcs.udf(lambda features, center : 
                 float(features.squared_distance(center)),FloatType())

df_pred = df_pred.withColumn('dist',get_dist(funcs.col('features'),funcs.col('center')))
df_pred = df_pred.withColumnRenamed("prediction", "cluster")

### 5.2.2 Calculation of Average and Maximum Distance for Streaming

In [49]:
df_pred.printSchema()

root
 |-- cluster: integer (nullable = false)
 |-- arr: array (nullable = false)
 |    |-- element: float (containsNull = true)
 |-- status: string (nullable = true)
 |-- vec_features: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- center: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- dist: float (nullable = true)



In [50]:
maxDist_streaming = df_pred.join(maxDistance, maxDistance.prediction == df_pred.cluster)
avgDist_streaming = df_pred.join(averageDistance, averageDistance.prediction == df_pred.cluster)

In [51]:
detection = maxDist_streaming.withColumn("detected", funcs.when(maxDist_streaming.dist > maxDist_streaming.maxDist, "Anomaly").otherwise("Normal"))
detection = detection.select("features", "prediction", "dist", "maxDist", "status", "detected")

In [52]:
detection.printSchema()

root
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)
 |-- dist: float (nullable = true)
 |-- maxDist: float (nullable = true)
 |-- status: string (nullable = true)
 |-- detected: string (nullable = false)



### 5.3 Sliding Windows Time

In [53]:
currentTimeDf = detection.withColumn("processingTime",funcs.current_timestamp())

In [54]:
currentTimeDf.printSchema()

root
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)
 |-- dist: float (nullable = true)
 |-- maxDist: float (nullable = true)
 |-- status: string (nullable = true)
 |-- detected: string (nullable = false)
 |-- processingTime: timestamp (nullable = false)



In [55]:
windowedCount = currentTimeDf.groupBy(funcs.window("processingTime", "4 seconds", "2 seconds"), "prediction", 
                                      "status", "detected", "dist", "maxDist")\
.avg("dist").orderBy("window")

# 6. Start Streaming

### 6.1 Option 1 - Using Sliding Window (Confusion Matrix)

In [56]:
confusion_matrix = detection.groupBy("status","detected").count().select("status", "detected","count")

In [57]:
confusion_matrix = currentTimeDf.groupBy(funcs.window("processingTime", "3 seconds", "1 seconds"),"status", "detected")\
.count().orderBy("window")

In [58]:
q = confusion_matrix.writeStream\
.outputMode("complete")\
.format("console")\
.option("truncate", "false")\
.start()

In [None]:
q.awaitTermination()

### 6.2 Option 2 - Using Append

In [84]:
q = windowedCount.writeStream\
.outputMode("complete")\
.format("console")\
.option("truncate", "false")\
.start()

In [None]:
q.awaitTermination()

### 6.3 Option 3 - Using Complete and Aggregate Function

In [72]:
group = df_pred.groupBy(["cluster","status"]).count()\
.orderBy("cluster", "status", ascending=True)

In [73]:
q = group.writeStream\
.outputMode("complete")\
.format("console")\
.start()

In [None]:
q.awaitTermination()