## Persistence model with Kafka and Spark streaming 

This notebook provides an example of a persistent model on streaming data coming from a Kafka producer. 

This notebook uses 
* the [Python client for the Apache Kafka distributed stream processing system](http://kafka-python.readthedocs.io/en/master/index.html) to receive messages from a Kafka cluster. 
* [Spark streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html) for processing the streaming data


### General import

In [None]:
import time
import re, ast
import numpy as np
import os
import padasip as pa

### Start Spark session


In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar ' +\
                                '--conf spark.driver.memory=2g  pyspark-shell'

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("KafkaReceiveMB") \
    .getOrCreate()

### Connect to Kafka server on topic persistence

In [None]:
#This function creates a connection to a Kafka stream
#You may change the topic, or batch interval
#The Zookeeper server is assumed to be running at 127.0.0.1:2181
#The function returns the Spark context, Spark streaming context, and DStream object
def getKafkaDStream(spark,topic='persistence',batch_interval=10):

    #Get Spark context
    sc=spark.sparkContext

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    #Create a DStream that represents streaming data from Kafka, for the required topic 
    dstream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {topic: 1})
    
    return [sc,ssc,dstream]


## Parameters for the Exponientally Weighted Average

$ v_t = \beta \cdot v_{t-1} + (1-\beta) \cdot \theta_t $

Where $v_t$ is the prediction, $\beta$ is the probability and $\theta_t$ is observation

In [None]:
beta = 0.9

In [None]:
def exponentially_weighted_average(v, theta):
    return beta*v + (1-beta)*theta

In [None]:
#Update function: new_values are the set of values received during the batch interval, state is the current state
#The state is assumed to be a list of two values: the last temperature, and output data for day 8 (predictions, truth, seconds)
#The function estimates the prediction error on the set of new values, and update the state for the persistence model
def update_ewa(new_values, state): 

    last_temperature=state[0]
    sensorToPredict=state[1]
    output_day8=state[2]
    pred_mod = state[3]

    if len(new_values)>0 :
        #Transforms list of values into an array
        array_values=np.array(new_values)
        
        #if day 8
        if np.floor(array_values[0][1] / 86400)==7:
            
            predictions=[]
            truth=[]
            seconds=[]
            
            v = 0
            #Go through all measurements
            for i in range(0,array_values.shape[0]):
                
                if array_values[i,2]==sensorToPredict:                    
                    # Get the the last temperature saw or the last possible temperature
                    t = last_temperature[:i][-1] if i >= len(last_temperature) else last_temperature[i]
                    
                    # Compute the exponentially weighted average
                    # vt = B*v(t-1) + (1-B)*t
                    # where vt is the prediction, B is a probability and t is the last temperature shown
                    prediction = exponentially_weighted_average(v, t)
                    
                    # Append the prediction
                    predictions.append(prediction)
                    
                    # Set the v for the next computation of EWA
                    v = prediction
                    
                    truth.append(array_values[i,0])
                    seconds.append(array_values[i,1])
                    
            #Store data in state
            output_day8=[predictions,truth,seconds]
            
        else:
            if array_values[0][1] % 86400<8:
                #Before day 8, adapt your model with measurements of the current batch
                # Store the last temperature for each measures
                
                # initialize the last temperatures array to 0
                last_temperature = [0 for i in range(0,array_values.shape[0])]
                for i in range(0,array_values.shape[0]):
                    
                    # If the sensor is one who whant to predict
                    if array_values[i, 2]==sensorToPredict:
                        last_temperature[i] = np.float(array_values[i,0])
        
    #Update state
    state=[last_temperature, sensorToPredict,output_day8, pred_mod]
        
    #state is now the last received measurement
    return (state)

In [None]:
def update_rls(new_values, state):
    filt=state[0]
    sensorToPredict=state[1]
    output_day8=state[2]
    pred_mod=state[3]
    
    # Code for SVM prediction model process here
    #if day 8
    if np.floor(array_values[0][1] / 86400)==7:

        predictions=[]
        truth=[]
        seconds=[]

        v = 0
        #Go through all measurements
        for i in range(0,array_values.shape[0]):

            if array_values[i,2]==sensorToPredict:                  

                # Predict the value
                prediction = filt.predict(i)

                # Append the prediction
                predictions.append(prediction)

                # Set the v for the next computation of EWA
                v = prediction

                truth.append(array_values[i,0])
                seconds.append(array_values[i,1])

        #Store data in state
        output_day8=[predictions,truth,seconds]

    else:
        if array_values[0][1] % 86400<8:
            for i in range(0,array_values.shape[0]):

                # If the sensor is one who whant to predict
                if array_values[i, 2]==sensorToPredict:
                    filt.adapt(array_values[i,0], array_values[i,1])

    #Update state
    state=[filt, sensorToPredict,output_day8, pred_mod]
        
    #state is now the last received measurement
    return (state)

In [None]:
#Update function: new_values are the set of values received during the batch interval, state is the current state
#The state is assumed to be a list of two values: the last temperature, and output data for day 8 (predictions, truth, seconds)
#The function estimates the prediction error on the set of new values, and update the state for the corresponding prediction model
def updateFunction(new_values, state): 
    pred_mod=state[3]
    if pred_mod == "EWA":
        return update_ewa(new_values, state)
    else:
        return update_rls(new_values, state)

### Define streaming pipeline

* We define one state, which is a list of two elements:
    * The last measurement
    * The output of predictions for sensor 1 for day 8
* We create a DStream, flat map with the sensor ID as key, update state for the stream, and save MSE

In [None]:
#Helper functions

#Print number of partitions and number of records for an RDD
def printInfoRDD(rdd):
    if rdd is not None:
        print("The RDD has "+str(rdd.getNumPartitions())+" partitions")
        print("The RDD has "+str(rdd.count())+" elements")
    else:
        print("No info to provide")
        
#Save state in global Python variable
def saveState(rdd):
    global state_global
    if rdd is not None:
        data=rdd.collect()
        state_global=data


In [None]:
#Initial state for EWA
last_measurement=None
sensorToPredict=1
output_day8=None
pred_mod = "EWA"

state1=[last_measurement, sensorToPredict, output_day8, pred_mod]

#Crée la state que tu as besoin pour SVM (sur le capteur 24 cf énoncé)
#Initial state for LRS
filt = pa.filters.FilterRLS(n=1, mu=0.98)
sensorToPredict_2=24
output_day8_2=None
pred_mod_2 = "LRS"
state2=[filt, sensorToPredict_2, output_day8_2, pred_mod_2]


#Batch interval (to be synchronized with KafkaSend)
interval=10

#This variable is used to retrieve state data (through saveState function)
state_global=None

#Create dtsream
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='mean_neighbor2',batch_interval=interval)

#Evaluate string content (a list) and cast as float value
dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
#Use this for debugging
#dstream.pprint()

#Group by sensor id. x[2] is here the sensorId (for example '1'), and x are the sensor measurement, seconds, sensorId and sensor type)
dstream=dstream.flatMap(lambda x: [(x[2],x)])
dstream.foreachRDD(printInfoRDD)
#Use this for debugging
#dstream.pprint()

#initialStateRDD = sc.parallelize([(sensorToPredict,state1)])

# Par exemple pour les 2 models, sinon pour tester change juste la state de la ligne du dessus
# A la fin du projet, faut qu'on puisse lancer les 2 modeles (EWA sur capteur 1, SVM sur capteur 24)
# et print les 2 plots
initialStateRDD = sc.parallelize([(sensorToPredict,state1), (sensorToPredict_2, state2)])
print("Number of partitions for StateRDD: "+str(initialStateRDD.getNumPartitions()))

state_stream=dstream.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
      
state_stream.foreachRDD(printInfoRDD)
state_stream.foreachRDD(saveState)
#Use this for debugging
#dstream.pprint()


### Start streaming application

In [None]:
#For synchronization with receiver (for the sake of the simulation), starts at a number of seconds multiple of five
current_time=time.time()
time_to_wait=interval-current_time%interval
time.sleep(time_to_wait)

ssc.start()

### Stop streaming

In [None]:
#Wait to receive all data up to day 8 before stopping
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Collect results, plot predictions, compute MSE

In [None]:
#For plots
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode()


In [None]:
# 0 = EWA, 1 = RLS
state_global[0]

In [None]:
[predictions,truth,seconds]=state_global[0][1][2]

In [None]:
MSE=np.mean((np.array(truth)-np.array(predictions))**2)
print("MSE of model on day 8: "+str(MSE))

In [None]:
trace_truth = go.Scatter(
    y = truth,
    x = seconds,
    name="Truth"
)

trace_predictions = go.Scatter(
    y = predictions,
    x = seconds,
    name="Predictions"
)

layout= go.Layout(
    title= 'Truth and predictions for sensor 1, day 8<br>Persistence model <br>'+\
            'Mean square error: '+str(MSE),
    xaxis= dict(
        title= 'Time (seconds)',
    ),
    yaxis=dict(
        title= 'Temperature',
    ),
    showlegend= True
)

fig= go.Figure(data=[trace_truth,trace_predictions], layout=layout)
iplot(fig)