# Main Pipeline
Kafka Consumer -> PySpark -> Sketches -> Storing in InfluxDB

## Kafka Consumer receiving rows and storing them Dataframe 

In [8]:
!pip install kafka-python
!pip install mmh3
!pip install pysad
!pip install combo
!pip install influxdb_client



In [2]:
from kafka import KafkaConsumer
import pandas as pd
from pyspark.sql import SparkSession
import ast
import json

In [3]:
spark = SparkSession.builder.appName("Test").getOrCreate()

In [4]:
consumer = KafkaConsumer('swat_elte',bootstrap_servers=['kafka-server:9092'],
                         group_id=None,
                         # auto_offset_reset='earliest', 
                         api_version=(0,10,1))

### Consumer part
You will need to run producer notebook to get the data here

In [5]:
df = pd.DataFrame()
i = 0

for message in consumer:
    message = message.value
    if len(message) < 10:
        break
    try:
        df_row = pd.json_normalize(json.loads(message))
        df = pd.concat([df, df_row])
        if len(df) % 1000 == 0:
            i += 1000
            print("Rows Consumed: ", i)
    except NotImplementedError:
        pass


Rows Consumed:  1000
Rows Consumed:  2000
Rows Consumed:  3000
Rows Consumed:  4000
Rows Consumed:  5000
Rows Consumed:  6000
Rows Consumed:  7000
Rows Consumed:  8000
Rows Consumed:  9000
Rows Consumed:  10000
Rows Consumed:  11000
Rows Consumed:  12000
Rows Consumed:  13000
Rows Consumed:  14000


In [6]:
print(df.shape)
df.head()

(14996, 78)


Unnamed: 0,GMT +0,FIT 101,LIT 101,MV 101,P1_STATE,P101 Status,P102 Status,AIT 201,AIT 202,AIT 203,...,LSH 601,LSH 602,LSH 603,LSL 601,LSL 602,LSL 603,P6 STATE,P601 Status,P602 Status,P603 Status
0,2019-07-20 08:39:59,4.323736,492.896881,2,2,2,1,131.408615,9.313829,257.933868,...,Active,Active,Inactive,Inactive,Inactive,Active,2,1,1,1
0,2019-07-20 08:39:58,4.323736,492.4651,2,2,2,1,131.408615,9.316713,257.703156,...,Active,Active,Inactive,Inactive,Inactive,Active,2,1,1,1
0,2019-07-20 08:39:57,4.303558,492.3081,2,2,2,1,131.408615,9.317354,257.703156,...,Active,Active,Inactive,Inactive,Inactive,Active,2,1,1,1
0,2019-07-20 08:39:56,4.253915,491.405273,2,2,2,1,131.408615,9.317354,257.703156,...,Active,Active,Inactive,Inactive,Inactive,Active,2,1,1,1
0,2019-07-20 08:39:55,4.200429,491.169769,2,2,2,1,131.408615,9.319918,257.703156,...,Active,Active,Inactive,Inactive,Inactive,Active,2,1,1,1


In [7]:
sdf = spark.createDataFrame(df) 
sdf.show(5)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+-------------------+----------+----------+------+--------+-----------+-----------+----------+--------+----------+----------+--------+--------+--------+--------+-----+--------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+--------+----------+----------+----------+----------+----------+------+------+------+------+--------+-----------+-----------+-------+----------+-----------+----------+--------+--------+-----------+-----------+-----------+-----------+-----+----------+----------+----------+----------+-----------+-----------+-----------+----------+------+------+------+------+--------+-----------+-----------+----------+----------+----------+----------+-------+-------+--------+--------+--------+-------+--------+-----------+-----------+-----------+
|             GMT +0|   FIT 101|   LIT 101|MV 101|P1_STATE|P101 Status|P102 Status|   AIT 201| AIT 202|   AIT 203|   FIT 201|  LS 201|  LS 202| LSL 203|LSLL 203|MV201|P2_STATE|P201 Status|P202 Sta

## Define Sketches

In [9]:
from pysad.evaluation.metrics import AUROCMetric
from pysad.models.loda import LODA
from pysad.utils.data import Data
from pysad.models.integrations import ReferenceWindowModel
from pyod.models.hbos import HBOS
from pysad.transform.postprocessing import RunningAveragePostprocessor
from pysad.transform.preprocessing import InstanceUnitNormScaler
from pysad.transform.probability_calibration import GaussianTailProbabilityCalibrator
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm

In [10]:
"""
wavingCounter unbiased estimation of frequencies
wavingList store k' (k'>k) frequent items
hashFunc to hash element to 1 or -1 increase or decrease wavingCounter
element frequency
If its frequency is larger than the smallest frequency in the list,we exchange them. 

Based on this idea, we use a group of WavingCounters and lists, and add additional fields in the list to achieve
higher accuracy
"""

class WavingSketch:
    def __init__(self, size):
        self.arr = np.zeros((size,),dtype='f,f,f,f')
        self.count = 0
    def update(self,elem):
        round(elem, 7)
        # bucket[0]-->count bucket[1]-->elem bucket[2]-->frequency bucket[3]-->flag
        bucket = self.arr[self.hashnum(elem)]
        # estimated frequency calc
        efreq = bucket[0]*self.hash1(elem)
        if elem == bucket[1]:
            bucket[2]+=1
            if bucket[3]==False:
                bucket[0]=bucket[0]+self.hash1(elem)
        elif bucket[1] == 0:
            bucket[1]=elem
            bucket[2]=1
            bucket[3]=True
        else:
            bucket[0]=bucket[0]+self.hash1(elem)
            if efreq >= bucket[0]:
                if bucket[3]==True:
                    bucket[0]=bucket[0]+efreq*self.hash1(elem)
                bucket[1]=elem
                bucket[2]=efreq+1
                bucket[3]=False
            
    def hash1(self,elem):
        val = -1
        if self.hashnum(elem)%2==0:
            val = 1
        return val
    def hashnum(self,elem):
        return hash(elem.__str__()) % len(self.arr)
    
    def ret(self):
        return self.arr
    
def findkfreq(arr,k):
    if k > len(arr):
        return -1
    freq=[]
    for i in arr:
        freq.append((i[0],i[1]))
    freq.sort(reverse = True)
    freq = freq[0:k]
    return freq
    #return [i[1] for i in freq]

def findChange(arr1,arr2,k):
    freq1=[]
    freq2=[]
    for i in arr1:
        freq1.append((i[0],i[1]))
    for j in arr2:
        freq2.append((j[0],j[1]))
    freqres=[]
    for i in range(len(freq1)):
        freqres.append(((freq2[i][0]-freq1[i][0],freq1[i][1])))
    freqres.sort(reverse = True)
    freqres = freqres[0:k]
    return freqres
    #return [i[1] for i in freqres]
    

## Defining DB credentials

In [11]:
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

token = 'Qd6R1JJWOOKfcKmBn0s50wHLqA-2LjLjEGJiUAGp1_RCes_JRe_bkELLYL5jqCXRQ1Haww3_mrHk_69yN88L0A=='
org = 'ELTE'
bucket = 'ELTE'

### Creating the streams, scaling, WavingSketch and define Anomaly Detector

In [12]:
# Create stream
stream = np.array(sdf.select("LIT 301", "FIT 401").collect())
# Stream for wavingsketch
attarr = np.array(sdf.select("FIT 401").collect())

# Scaled
min_max_scale = MinMaxScaler()
scaled = min_max_scale.fit_transform(stream)

# Define Anomaly Detection sketch
model = ReferenceWindowModel(model_cls=HBOS,  window_size=500, sliding_size=200,)
preprocessor = InstanceUnitNormScaler()  # Normalizer
postprocessor = RunningAveragePostprocessor(window_size=100)  # Running average postprocessor
calibrator = GaussianTailProbabilityCalibrator(running_statistics=True, window_size=500)
y_pred = []

# Define the WavingSketch 
sketch= WavingSketch(30)

## Save In InfluxDB

In [15]:
with influxdb_client.InfluxDBClient(url="http://influxdb:8086", token=token, org=org) as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)

    for i in tqdm(range(stream.shape[0])):
        
        # HBOS
        X = preprocessor.fit_transform_partial(stream[i])
        anomaly_score = model.fit_score_partial(X)
        anomaly_score = postprocessor.fit_transform_partial(anomaly_score)
        calibrated_score = calibrator.fit_transform_partial(anomaly_score)
        p = influxdb_client.Point("anomaly_score_hbos").field("Anomaly Score", calibrated_score)
        write_api.write(bucket, org, p) 
        
        # WavingSketch
        sketch.update(attarr[i][0])
        samples=sketch.ret()
        freq5 = findkfreq(samples,5)
        for freqs in freq5:
            p = influxdb_client.Point("WavingFreq FIT 401").field(str(freqs[1]), freqs[0])
            write_api.write(bucket, org, p)    
            
        # Save scaled data
        p = influxdb_client.Point("scaled_sensor_lit_301").field("Scaled Values", scaled[i][0])
        write_api.write(bucket, org, p) 
        p = influxdb_client.Point("scaled_sensor_fit_401").field("Scaled Values", scaled[i][1])
        write_api.write(bucket, org, p)    

100%|██████████| 14996/14996 [31:00<00:00,  8.06it/s] 
