# [INFO-H515 - Big Data Scalable Analytics](https://uv.ulb.ac.be/course/view.php?id=85246?username=guest)

## TP 3 - Streaming forecasting (RLS and ML) with Kafka and Spark Streaming

#### *Gianluca Bontempi, Jacopo De Stefani and Theo Verhelst*

####  29/04/2020


## Streaming analytics with Kafka and Spark streaming 

This notebook provides an example for performing machine learning analytics on streaming data coming from a Kafka producer. 

The first part contains examples of simple __stateless__ processing scripts.

The second part introduces __stateful__ streamining processing in order to perform more complex operations, like cumulative sum, sequential estimation of mean and variance.

Predictive analytics is also implemented thanks to stateful processing.

Two main __streaming learning__ strategies are proposed: 
* an online linear learning where he coefficient estimation is achieved using the __recursive least square (RLS)__ algorithm
* a __mini-batch strategy__ where a learning model is fit to most recent data

A __racing__ strategy is implemented as well where two different models (e.g. two linear models with different forgetting factors or a linear and a nonlinear regressor) are fitted to the same data.

The data are produced in the notebook `KafkaTimeSeriesProducer`: two producers are considered, a linear one and a nonlinear one.

This notebook uses:
* the [Python client for the Apache Kafka distributed stream processing system](http://kafka-python.readthedocs.io/en/master/index.html) to receive messages from a Kafka cluster. 
* [Spark streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html) for processing the streaming data



## General imports

In [None]:
import time
import re, ast
import numpy as np
import os
import glob


from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor 
from sklearn import linear_model
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from sklearn.linear_model import LinearRegression

### Start Spark session


In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4050 '+\
                        '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 '+\
        '--conf spark.driver.memory=2g  pyspark-shell'

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("KafkaReceive") \
    .getOrCreate()

### Function to connect Spark Streaming and Kafka on a given topic

In [None]:
#This function creates a connection to a Kafka stream
#You may change the topic, or batch interval
#The Zookeeper server is assumed to be running at 127.0.0.1:2181
#The function returns the Spark context, Spark streaming context, and DStream object
def getKafkaDStream(spark,topic='persistence',batch_interval=10):

    #Get Spark context
    sc=spark.sparkContext

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    #Create a DStream that represents streaming data from Kafka, for the required topic 
    dstream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {topic: 1})
    
    return [sc,ssc,dstream]


## Raw data consumption

First, let's make sure that Kafka is running and ready to receive messages:

1. Launch `kafka_startup_script.sh` from a terminal.
2. Check the kafka.log and zookeeper.log files in the `/home/guest`folder for the correct startup of the service.

Let's have a look to the standard procedure while dealing with a Producer-Consumer architecture:

1. Look at the data format in the producer (`KafkaTimeSeriesProducer`) notebook. 
2. Start the Data production by running the corresponding cell in the producer notebook. 
3. Move to the consumer notebook (i.e. this notebook).

**N.B** Check that the names of the produced and receiver topic are the __same__!

In [None]:
# Get the DStream object containing the streaming data sent on the dataLinearModel topic
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=1)
# Pretty print the content of dstream
dstream.pprint()

To start data consumption you have to launch the Dstream. This is equivalent to opening a data tap on the receiving side.

**Exercise:**

* Look at the time label: is it related to the batch interval you set? What about the data format?

In [None]:
ssc.start()

To stop data reception you have to stop the Dstream (close the tap).

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

**Tip:** Clear all outputs before starting the Streaming again with ssc.start()! 

**Exercise:**

* Change the batch interval value (in the consumer) and observe the impact on the buffer. What can you deduce?

# Stateless processing

The first examples in this notebook will introduce the concept of **stateless** processing.

In this case, the Spark transformations are applied directly to the Discretized Stream (Dstream) produced by Kafka.

A Discretized Stream (cf. [Spark Streaming Programming guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)) is simply a continuous collection of Spark RDD.

Applying a transformation to a Dstream basically boils down to applying the same transformation to each underlying RDD.

## Converting the data to a suitable format

Before being able to actually perform a transformation, we need to de-serialize the data obtained from Kafka.
Since we want to do numerical processing, the raw data format is not suitable. 
Let's see an example of data conversion to a suitable numeric format.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=10)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.pprint()

In [None]:
ssc.start()

**Exercise:**

* Compare the suitable data format with the previous format. What is the main difference?


In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Applying a map transformation to a Dstream

The examples below apply a map transformation to each element of the batch without storing any state information.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.pprint()
maxS=dataS.map(lambda x: np.max(x[1:]))
maxS.pprint()

In [None]:
ssc.start()

**Exercise:**

* Compare the content of the Dstream before and after the transformation. What is the applied transformation?


In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

**Exercise:**

* Adapt the code of the previous exercise in order to compute the mean of the $x_i$ values at each time step.


In [None]:
## Your solution
## ...

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Custom presentation of the received information 

Here you can find an example of using a `map` transformation to add some descriptive text to identify the received values.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.map(lambda x: "Stream counter : "+ str(x[0])).pprint()

In [None]:
ssc.start()

As the first element of the transmitted data is the counter value, we can see which line is currently sent by the producer. If you do not stop the producer this value will continue growing. Given the stationary nature of our data set, this has not much of an impact for the following analysis....

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Output to the disk

To save the content of the Dstream to the disk you can use the Spark function `saveAsTextFiles(prefix, [suffix])`.

The file name at each batch interval is generated based on the prefix and suffix parameters, as well as the current time in milliseconds: `prefix-<TIME_IN_MS>[.suffix]`.

**Warning:** This command creates a new file for each batch: if the `batch_interval` value is low this can fill your directory rapidly.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=10)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.map(lambda x: "stream counter="+ str(x[0])).pprint()
prefix='test'
dataS.map(lambda x: "stream counter="+ str(x[0])).saveAsTextFiles(prefix)


Let the streaming run for two epochs only and then look at your current directory.

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

Here you have an example on how to visualize the directories containing the content of the processed batch 

In [None]:
search_dir = "./"
files = list(filter(os.path.isdir, glob.glob(search_dir + prefix +"*")))
files.sort(key=lambda x: os.path.getmtime(x))
files

Note that each directory is splitted in several parts.

In [None]:
fyhat=files[0]+"/part-00000"
F=open(fyhat, "r")
F.read().replace("\n",", ")

In [None]:
fyhat=files[0]+"/part-00001"
F=open(fyhat, "r")
F.read().replace("\n",", ")

# Stateful processing

As opposed to **stateless** processing, **stateful** process implies the existence of a state.

According to the [Spark Streaming Programming guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html), the key concept for **stateful** processing is the `updateStateByKey`:

>The `updateStateByKey` operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.
>
>    1. *Define the state* - The state can be an arbitrary data type.
>    2. *Define the state update function* - Specify with a function how to update the state using the previous state and the new values from an input stream.

The following examples will clarify how to create and initialize the state and how to perform updates of the state through the combination of `updateStateByKey` and user-defined functions `updateFunctions`.
First example of stateful streaming processing obtained with an `updateFunction` called by the command __updateStateByKey__.

## Single state value - No memory of the state

In this first example, you will learn how to define the state as well as to code a simple `updateFunction` which takes into account only the first value in the streaming buffer and which does not perform a state update.

Let's start by defining the state:

In [None]:
# Initialize an empty vector with n elements
n=11
v=np.zeros((1,n)) 

state1=(v,0) # State is a 2 element tuple: (vector of size n,integer)

# Transform the state into an RDD
initialStateRDD = sc.parallelize([('state', state1)]) 

Let's continue by defining the user defined `updateFunction`.

Note that all the update functions in the following cells share the same structure:

```python 
def updateFunction(new_values, state): 
    L=len(new_values) # Size of the message buffer
    if (L>0): # If there are new values in the buffer,
        (...) = state # Unpacking the state
        # Perform state update
        # ...
        # ...
        return new_state  # Then return a new value for the state
    else: # If there are no new values
        return state # then return the unmodified state
```

In [None]:
def updateFunction(new_values, state):
    L=len(new_values) # size buffer
    if (L>0):
        (d, N) = state # Unpacking the state
        value=new_values[0] ## take only the first value
        d=np.abs(value[1:]).reshape(1,-1) ## d does NOT depend on the 'state' input
        return (d,L)  ## return the value and size buffer
        
    else:
        return state

Finally, let's conclude by applying the `updateFunction` on the Spark Streaming Dstream by means of the `updateStateByKey` operation.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.pprint()

dataS=dataS.flatMap(lambda x: [('state',1.0*np.array(x))])

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
updatedS.pprint()


In [None]:
ssc.start()

**Exercise:**

* Compare the raw data and the content of the state. What can you observe?


In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Single state value - With memory of the state

In this case the output of the function does not depend only on the current values in the buffer but also on the previous state. 

Let's start with defining the state and the `updateFunction`:

In [None]:
# Initialize an empty vector with n elements
n=11
v=np.zeros((1,n)) 

state1=(v,0) # State is a 2 element tuple: (vector of size n,integer)

# Transform the state into an RDD
initialStateRDD = sc.parallelize([('state', state1)]) 

In [None]:
def updateFunction(new_values, state): 
    ## Sum of absolute values of all vectors in the buffer
    L=len(new_values)
    if (L>0):
        for l in np.arange(L):
            value=new_values[l]
            d=state[0]+np.abs(value[1:]).reshape(1,-1) ### d DOES depend  on state !!
        return (d,L)  
        
    else:
        return state

**Exercise:**

* Compare this `updateFunction` with the one in the section before. What can you observe?
* What operation is being performed inside this `updateFunction`?
* Run the code below to check your answers.

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.pprint()

dataS=dataS.flatMap(lambda x: [('state',1.0*np.array(x))])

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
updatedS.pprint()

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Multiple values - with memory of the state

Let's see here an example of usage of multiple values in the buffer (as opposed to the previous examples, only using the last value in the buffer).

Note that the structure of the `updateFunction` remains the same as before, with the addition of a loop for iterating through the different elements of the buffer `new_values`.

```python 
def updateFunction(new_values, state): 
    L=len(new_values) # Size of the message buffer
    if (L>0): # If there are new values in the buffer,
        (...) = state # Unpacking the state
        for l in np.arange(L):
            value=new_values[l]
            # Perform state update
            # ...
            # ...
        return new_state  # Then return the updated state
    else: # If there are no new values
        return state # then return the unmodified state
```

In this case, all the different values of the state are stacked in a matrix.
Beware that, with this implementation, the size of this matrix keeps growing indefinitely.

In [None]:
# Initialize an empty vector with n elements
n=11
v=np.zeros((1,n)) 

state1=(v,0) # State is a 2 element tuple: (vector of size n,integer)

# Transform the state into an RDD
initialStateRDD = sc.parallelize([('state', state1)]) 

In [None]:
def updateFunction(new_values, state):
    L=len(new_values)  ## size of the buffer
    if (L>0):
        D=state[0]
        for l in np.arange(L): ## loop over all the values of the buffer
            value=new_values[l]
            D=np.vstack((D,value[1:].reshape(1,-1)))
        return (D,D.shape,L)  
        
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.pprint()

dataS=dataS.flatMap(lambda x: [('state',1.0*np.array(x))])

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
updatedS.pprint()

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Sequential estimation of sampled means and variances by state update


A more complex example of stateful processing is given by the sequantial estimation of the mean of a random variable.

Average batch formulation: $\mu_{(N)}=\frac{1}{N} \sum_{i=1}^N z_i $

Average sequential formulation:  $\mu_{(N)}=\mu_{(N-1)} +\frac{1}{N} (z_N -\mu_{(N-1)} ) $


In [None]:
muhat=0
N=0
state1=(muhat,N)
## state with two components: estimate of mu and number of samples

initialStateRDD = sc.parallelize([('state', state1)])

In [None]:
def updateFunction(new_values, state): 
    ## Update the sequential estimate of sample mean and sample variance of x[0] 
    ## (i.e. second element of value vector, the first being the state)
    L=len(new_values) ## size of the buffer
    if (L>0 ):
        (muhat,N) = state # Unpacking the state
        for l in np.arange(L):
            N=N+1
            value=new_values[l]
            muhatold=muhat
            muhat=muhat+1.0/N*(value[2]-muhat)
        return (muhat,N)          
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.map(lambda x: "stream counter="+ str(x[0])).pprint()

dataS=dataS.flatMap(lambda x: [('state',1.0*np.array(x))])
dataS.map(lambda x: "Input="+ str(x)).pprint()

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
updatedS.map(lambda x : 'After '+str(x[1][1])+' samples: muhat='+str(x[1][0])).pprint()

In [None]:
ssc.start()

**N.B.**: With the dataLinearModel topic, mean estimation must converge to 0.

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

**Exercise**

Modify the state and the update function to perform the sequential computation of the variance. 

The state should contain two additional variables:  $\sigma^2_{(N)}$ and $S_{(N)}$.

The formulas definining the state update are the following ones:

- Sample variance batch formulation: $\sigma^2_{(N)}=\frac{1}{N} \sum_{i=1}^N (z_i -\mu_{(N)})^2= 
\left(\frac{1}{N} \sum_{i=1}^N z_i^2 \right) -  \left(\frac{1}{N} \sum_{i=1}^N z_i \right) ^2 $

- Sample variance sequential formulation:  $\sigma^2_{(N)} ={S_{(N)}/N}$ where $$S_{(N)}= S_{(N-1)}+N(N-1) (\mu_{(N)}- \mu_{(N-1)})^2 $$ 

**Solution**

In [None]:
## Your solution - State
## ...

In [None]:
def updateFunction(new_values, state): 
    ## Your solution
    ## ...

In [None]:
## Your solution
## ...

In [None]:
ssc.start()

**N.B.**: With the dataLinearModel topic, mean estimation must converge to 0. Variance estimation must converge to 1.

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Recursive Least Squares (streaming - single model)

The following equations define a single step of the Recursive Least Squares (with forgetting factor $\nu$).

\begin{equation*}
\begin{cases}
V_{(t)}&=\frac{1}{\nu} \left(V_{(t-1)}
-\frac{V_{(t-1)} x^T_{t} x_{t} V_{(t-1)}}{1+ x_{t} V_{(t-1)} x^T_{t}} \right)\\[3pt]
\alpha_{(t)}&= V_{(t)} x^T_{t} \\[3pt]
e&= y_{t}- x_{t} \hat{\beta}_{(t-1)}  \\[3pt]
\hat{\beta}_{(t)}&=\hat{\beta}_{(t-1)}+ \alpha_{(t)} e \\[3pt]
\end{cases}
\end{equation*}

where $V$ is the covariance matrix and $\beta$ is the set of parameters of the linear model.
Details in the Streaming Analytics slides.


**Exercise**

The implementation of the state and an `updateFunction` to implement RLS in a streaming fashion is given below.

* Write the implementation of `RLSstep` function in order to perform the update of the state of the RLS model according to the equations given above.


In [None]:
def RLSstep(y,x,beta,V,nu):
    ## Your solution
    ## ...

In [None]:
n=10 # number of features
beta1=np.zeros(n+1)  ## initial parameter vector for model 1
v0=10 ## initialization covariance
V1=np.diag(np.zeros(n+1)+v0) ## initial covariance matrix for model 1
nu1=1.0 # forgetting factor for model 1

recentSize=10
D=np.zeros((recentSize,n+1)) #np.random.rand(recentSize,n+1) #np.zeros((20,n+1))+1
E=np.zeros((1,1))
mse=0
N=0
state1=(beta1,V1,nu1,mse,N,D,E)
initialStateRDD = sc.parallelize([('rls', state1)])

In [None]:
def updateFunction(new_values, state): 
    ## RLS update function
    ## Only update with first value of RDD.
    L=len(new_values)
    
    if (L>0):
        # Extract the data from state variable
        (beta,V,nu,sse,N,DD,E) = state
        #beta=state[0] ## Beta value
        #V=state[1]   ## V matrix
        #nu=state[2]   ## Forgetting factor
        #sse=state[3]  ## Sum of squared errors
        #N=state[4]   ## Number of treated samples
        #DD=state[5]  ## Analyzed values
        #E=state[6]   ## Error measure
        
        # Extract the values from the new_values variable
        yx=new_values[0] #Only using the first element in the batch
        i=yx[0] 
        y=yx[1]
        x=yx[2:]
        n=len(x)
        beta.shape=(n+1,1)
        
        # Compute RLS state update
        RL=RLSstep(y,x,beta,V,nu)
        
        # Update the state values using results from RLSstemp
        # Update beta and V values
        beta=RL[0]
        V=RL[1]
        E=np.append(E,RL[2])
        
        # Update sum of squares - for MSE computation
        sse=sse+pow(RL[2],2.0)
        
        # Append analyzed values to DD matrix
        d=yx[1:]
        d.shape=(1,n+1)
        DD=np.vstack((d,DD[:-1,:]))
        
        return (beta,V,nu,sse/(N+1.0),N+1,DD,E)  ## update formula mod1
        
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))

dataS=dataS.flatMap(lambda x: [('rls',1.0*np.array(x))])

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)

## printing out updated values of the state
outbetaS=updatedS.map(lambda x: ": beta="+ np.array2string(x[1][0])).pprint()
outmseS=updatedS.map(lambda x: 'mse=' + str(x[1][3])).pprint()
outNS=updatedS.map(lambda x: x[1][4]).pprint()


In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

**Exercise**

If you look at the code above you will see that there is no guarantee that all samples are used. 
For instance, if the length `L` of the buffer in `new_values` is bigger than one (due to an excessive latency of the reception), the algorithm above will consider only the first vector `yx`. 

* How should the code be modified in order to consider all the elements of the buffer 'new_values'?

In [None]:
def RLSstep(y,x,beta,V,nu):
    ## Your solution
    ## ...

In [None]:
## Your solution - State
## ...

In [None]:
def updateFunction(new_values, state): 
    ## Your solution
    ## ...

In [None]:
## Your solution
## ...

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Recursive Least Squares (streaming - multiple models in parallel)

The code in the previous section can be extended to support multiple models running in parallel.

* The *state* is modified in order to include the states of the two different models.
    Each state contains a state of variables to keep the state of the model, as well as to keep track of MSE estimates. 
    A state is a list of 5 elements:
    * The first three are beta, V and mu, and define the state of the model (see RLS formulas in course)
    * The last two are an estimate of the MSE of the model, and the number of treated samples
   The two states are combined in a key-value RDD in the following way:
   ```python
    initialStateRDD = sc.parallelize([(u'rls1', state1),(u'rls2', state2)])
   ```
   with `rlsX` being the key and the value being the corresponding state

* The original Dstream is duplicated using a `flatMap` operation in order to create two key-pair streams, containing the key corresponding to the model, as well as a copy of the data in the original Dstream (`dataS` here).
```python
dataS2=dataS.flatMap(lambda x: [('rls1',('rls1',1.0*np.array(x))),
                            ('rls2',('rls2',1.0*np.array(x)))])
```

* The *update function* is modified to handle the new format of the stream (by means of the `key` variable), by storing this value from the incoming state and re-transmitting it in the updated state.

* Note that there is only a single `updateFunction`, since the `updateStateByKey` operation applies the update function in parallel to all the different keys (in this case, each key correspond to a model).


In [None]:
def RLSstep(y,x,n,beta,V,nu):
    x.shape=(1,n)
    x1=np.append(1,x)
    x1.shape=(1,n+1)
    
    V=1.0/nu*(V-V.dot(x1.T).dot(x1).dot(V)/(1.0+float(x1.dot(V).dot(x1.T))))
    alpha=V.dot(x1.T)
    yhat=x1.dot(beta)
    err=y-yhat
    beta=beta+alpha*err
    
    return(beta,V,err,yhat)

In [None]:
n=10 # number of features
beta1=np.zeros(n+1)  ## initial parameter vector for model 1
v0=10 ## initialization covariance
V1=np.diag(np.zeros(n+1)+v0) ## initial covariance matrix for model 1
nu1=1.0 # forgetting factor for model 1

batchSize=10
D=np.zeros((batchSize,n+1)) ## this is not used here 
E=np.zeros((1,1))
mse=0
N=0
state1=('mod1',beta1,V1,nu1,mse,N,D,E,0,0)

nu2=0.99 # forgetting factor for model 2

## Note that the only difference is the forgetting value
state2=('mod2',beta1,V1,nu2,mse,N,D,E,0,0)

initialStateRDD = sc.parallelize([(u'rls1', state1),
                                  (u'rls2', state2)])

In [None]:
def updateFunction(new_values, state): 
    ## RLS update function
    ## Only update with first value of RDD. 
    L=len(new_values)  ## size of the buffer
    if (L>0):
        # Extract the data from state variable
        (model_key,beta,V,nu,sse,N,DD,E,y,yhat) = state
        #model_key=state[0] ## Unique identifier for the model
        #beta=state[1] ## Beta value
        #V=state[2]   ## V matrix
        #nu=state[3]   ## Forgetting factor
        #sse=state[4]  ## Sum of squared errors
        #N=state[5]   ## Number of treated samples
        #DD=state[6]  ## Analyzed values
        #E=state[7]   ## Error measure
        #y=state[8]   ## True value for the last value in the batch
        #yhat=state[9] ## Predicted value for the last value in the batch
        
        # Extract data from the new_values vector
        # new_values = (key,RDD([counter,y,x]))
        key=new_values[0][0]
        yx=new_values[0][1]   ### this is only the first value in the buffer
        i=yx[0]
        y=yx[1]
        x=yx[2:]
        n=len(x)
        beta.shape=(n+1,1)
        
        # Compute RLS state update
        RL=RLSstep(y,x,n,beta,V,nu)
        
        # Update the state values using results from RLSstep
        beta=RL[0]
        V=RL[1]
        err=RL[2]
        E=np.append(E,err)
        yhat=RL[3]
        
        # Update sum of squares - for MSE computation
        sse=sse+pow(err,2.0)
         
        # Append analyzed values to DD matrix
        d=yx[1:]
        d.shape=(1,n+1)
        DD=np.vstack((d,DD[:-1,:]))
        
        return (key,beta,V,nu,sse/(N+1.0),N+1,DD,E,y,yhat)  
        
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))

dataS.map(lambda x: "stream counter="+ str(x[0])).pprint()

dataS2=dataS.flatMap(lambda x: [('rls1',('rls1',1.0*np.array(x))),
                            ('rls2',('rls2',1.0*np.array(x)))])

updatedS=dataS2.updateStateByKey(updateFunction,initialRDD=initialStateRDD)#.pprint()

## printing out updated values of the states of the two models
outbetaS=updatedS.map(lambda x: x[1][0]+": beta="+ np.array2string(x[1][1])).pprint()
outmseS=updatedS.map(lambda x: x[1][0]+": mse="+ np.array2string(x[1][4])).pprint()
outNS=updatedS.map(lambda x: x[1][0]+": N="+ str(x[1][5])).pprint()
outyS=updatedS.map(lambda x: x[1][0]+": y="+ str(x[1][8])+ " yhat="+ str(x[1][9][0])).pprint()


In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

**Exercise**

If you look at the code above you will see that there is no guarantee that all samples are used. 
For instance, if the length `L` of the buffer in `new_values` is bigger than one (due to an excessive latency of the reception), the algorithm above will consider only the first vector `yx`. 

* How should the code be modified in order to consider all the element of the buffer 'new_values'?

In [None]:
def RLSstep(y,x,beta,V,nu):
    ## Your solution
    ## ...

In [None]:
## Your solution - State
## ...

In [None]:
def updateFunction(new_values, state): 
    ## Your solution
    ## ...

In [None]:
## Your solution
## ...

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

# ML batch predictor (racing of several models) 

To conclude this session, below you can find an example of the implementation of the parallel racing of multiple machine learning predictors (here *Random Forests* - `rf`, *Linear Model* - `lin`, *Gradient boosting* - `gb`, respectively).

As you can observe, the general structure of the state and the `updateFunction` is the same as the previous section, with:

* The *state* is modified in order to include the states of the three models.
    A state is a list of 4 elements:
    * `D` - The data batch to predict
    * `mse` - Current value for the MSE
    * `N` - Counter of the values 
    * `E` - Vector of historical error measues
   The state of each model is then combined in a key-value RDD with a key identifying the type of model (cf. `state1,state2,state3`).
   
* The original Dstream is duplicated using a `flatMap` operation in order to create one key-pair streams for each considered model 

* The *update function* is modified to employ the predictors from Scikit-learn to predict the last value for the batch (cf. `predict` function definition), in a leave-one-out fashion, with the training of the predictor being performed on the batch of previously collected data (`DD` variable).

* Note that there is only a single `updateFunction`, since the `updateStateByKey` operation applies the update function in parallel to all the different keys (in this case, each key correspond to a model).


In [None]:
n=10 # number of features

batchSize=50
D=np.zeros((batchSize,n+1)) 
## batch data size
E=np.zeros((1,1))
mse=0
N=0
state1=('rf',D,mse,N,E,0,0)
state2=('lin',D,mse,N,E,0,0)
state3=('gb',D,mse,N,E,0,0)


initialStateRDD = sc.parallelize([(u'rf', state1),
                                  (u'lin', state2),
                                 (u'gb', state3)])

In [None]:
def predict(mod,YX,q):
    N=YX.shape[0]
    Ntr=int(np.round(N/2))
    Ytr=YX[:Ntr,0]
    Xtr=YX[:Ntr,1:]
    Yts=YX[Ntr:,0]
    Xts=YX[Ntr:,1:]
    Nts=Xts.shape[0]
    if mod=="rf":
        regr = RandomForestRegressor(random_state=0)
    if mod=="gb":    
        regr = GradientBoostingRegressor(random_state=0)
    if mod=="lin":
        regr=LinearRegression()
    regr.fit(Xtr, Ytr)
    Yhat=regr.predict(q)
   

    return(Yhat)

In [None]:
def updateFunction(new_values, state): 
    L=len(new_values)
    
    if (L>0):
        # Extract the data from state variable
        (model_key,DD,mse,N,E,y,yhat) = state
        #model_key=state[0] ## Unique identifier for the model
        #DD=state[1]  ## Analyzed values
        #mse=state[2]  ## Mean squared errors
        #N=state[3]   ## Number of treated samples
        #E=state[4]   ## Error measure
        #y=state[5]   ## True value for the last value in the batch
        #yhat=state[6] ## Predicted value for the last value in the batch

        key=new_values[0][0]
          
        ## prediction of the output of the most recent value
        yx=new_values[-1][1]      
        i=yx[0]
        y=yx[1]
        x=yx[2:]
        x.shape=(1,n)
        N=N+1
        
        yhat=predict(key,DD,x) 
        err=y-yhat  ## prediction error for the latest values
        mse=mse+1.0/N*(pow(err,2.0)-mse) ## sequential update of MSE
        E=np.append(E,err)        
        
        ## batch update
        for l in np.arange(L):
            yx=new_values[l][1]      
            d=yx[1:]
            d.shape=(1,n+1)
            DD=np.vstack((d,DD[:-1,:]))
        return (key,DD,mse,N,E,y,yhat)  
        
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataNonLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))

dataS.map(lambda x: "stream counter="+ str(x[0])).pprint()

dataS2=dataS.flatMap(lambda x: [('rf',('rf',1.0*np.array(x))),
                            ('lin',('lin',1.0*np.array(x))),
                            ('gb',('gb',1.0*np.array(x)))
                               ])
updatedS=dataS2.updateStateByKey(updateFunction,initialRDD=initialStateRDD)

## printing out updated values of the states of the two models
outmseS=updatedS.map(lambda x: x[1][0]+": mse="+ str(x[1][2])).pprint()
outNS=updatedS.map(lambda x: x[1][0]+": N="+ str(x[1][3])).pprint()
outyS=updatedS.map(lambda x: x[1][0]+": y="+ str(x[1][5])+ " yhat="+ str(x[1][6])).pprint()


In [None]:
ssc.start()

Note that the batch is empty at the beginning. So the prediction becomes more accurate (and the MSE smaller) only when  the batch dataset if filled.

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

# Last but not least: debugging 

The main difficulty in programming streaming analytics code is the debugging of the code. Let's see an example

In [None]:
def WRONGupdateFunction(new_values, state): 
    L=len(new_values)
    ## size buffer
    if (L>0):
        ## value=new_values[0] ## CORRECT: take the first value
        value=new_values   ## WRONG
        d=np.abs(value[1:]).reshape(1,-1)+state[0] 
        return (d,L)  
        
    else:
        return state

In [None]:
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataLinearModel',batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
dataS.map(lambda x: "stream counter="+ str(x[0])).pprint()

dataS=dataS.flatMap(lambda x: [('state',1.0*np.array(x))])

n=11
v=np.zeros((1,n))

state1=(v,0)
## state with two components: vector of size 11 and an integer

initialStateRDD = sc.parallelize([('state', state1)])

updatedS=dataS.updateStateByKey(WRONGupdateFunction,initialRDD=initialStateRDD)
updatedS.pprint()

In [None]:
ssc.start()

If you run the wrong code you will see on the **notebook outbook** only the output of `pprint()` command but not the second output. 

At the same time if you go to the **terminal window** you will see a long series of messages like:

```
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
	at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
```

Most of those messages are very _low level_ and of no help but some refer to your actual code like
   
  **File "<ipython-input-141-72bbbd66e1c1>", line 7, in WRONGupdateFunction**
    
**ValueError: operands could not be broadcast together with shapes (1,372) (1,11)**

```
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
```

The problem was indeed in line 7 of your update function...
Correct it and launch again..

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)