## Recursive Least Square with Kafka and Spark streaming 

This notebook demonstrates how to implement a distributed algorithm for estimating the coefficients
of multiple linear models on streaming data retrieved from a Kafka producer.
Inference is achieved using the Recursive Least Squares (RLS) algorithm, by running many
models in parallel with different forgetting factors.

Massive parallelism enables rapid validation of the optimal value for the only hyper-parameter
of the model, namely the forgetting factor.
The algorithm is scalable both w.r.t. to the number of explained variables and the number of
models run in parallel. Let's note that running multiple models in parallel and fitting
linear models on different output variables are both embarassingly parallel tasks.

Let $B \in \mathbb{R}^{n \times m}$ be the coefficients to be learnt, $x_t \in \mathbb{R}^n$ the input sample
of dimensionality $n$ retrieved at step $t$, $w \in \mathcal{N}(0, 1)$ some Gaussian noise
and $y \in \mathbb{R}^m$ the $m$ output variables to be predicted.
The linear regression fitted by the RLS algorithm can be formally described as such:

$$
y = x^T B + w
$$

However, such vectorized description of the model is not scalable and should be split
w.r.t. to the explained variable, in such a way that each machine in the cluster computes
the coefficients associated to a single explained variable $y_j$:

$$
y_j = x^T B_{\cdot j} + w
$$

Where $B_{\cdot j}$ is the jth column of matrix $B$. Actually, this notebook provides both implementations and 
privileges the vectorized one for small systems, for which the NumPy implementation of the dot-product
has a competitive advantage over Spark.

### General import

Let's import the general-purpose libraries and Spark modules.

In [115]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import time
import re
import ast
import numpy as np
import matplotlib.pyplot as plt
import os

### Start Spark session and initialize DStream

Let's initialize the Spark session as well as the DStream. It should be noted that everytime the stream is
stopped, the notebook should be re-run from this cell.

In [116]:
# Global constants
TOPIC = 'dataLinearModel'
BATCH_INTERVAL = 1

# Change command line Spark job submission arguments
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 ' + \
                                '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ' + \
                                '--conf spark.driver.memory=2g  pyspark-shell'

# Initialize Spark session and retrieve context
spark = SparkSession.builder.master('local[2]').appName('KafkaReceive').getOrCreate()
sc = spark.sparkContext

# Create streaming context, with required batch interval
ssc = StreamingContext(sc, BATCH_INTERVAL)

# Checkpointing needed for stateful transforms
ssc.checkpoint('checkpoint')

# Create a DStream that represents streaming data from Kafka, for the required topic
dstream = KafkaUtils.createDirectStream(ssc, [TOPIC], {'metadata.broker.list': 'localhost:9092'})
# IMPORTANT NOTE: The following line may not succeed on Windows, but can be used instead:
# dstream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {topic: 1})

### 1) RLS algorithm state representation - Vectorized approach

Rather than implementing arbitrary object classes (which Spark can not handle in a stable manner),
let's represent the current state of RLS algorithm execution with tuples.
More formally, let's denote current state as a 7-uple of the following form:

$$
(k, B^{(t)}, V^{(t)}, \nu, sse, t, \text{None})
$$

where $k$ is a string representing the identifier of current model, $B^{(t)} \in \mathbb{R}^{n \times m}$ is
the coefficient matrix at time $t$, $V^{(t)}$ is matrix $V \in \mathbb{R}^{n \times n}$ at time $t$,
$\nu$ is the forgetting factor of current model, $sse$ is the average square error of all predicted samples until step $t$,
and the last element is the index of the explained variable to be fitted.
In the vectorized implementation, all explained variables are fitted and the last element is thus set to None.

In [118]:
def create_state(key, n, m, nu):
    """Creates a new RLS state.
    
    Parameters:
        key (str): Identifier of current model.
        n (int): Number of input/explicative variables.
        m (int): Number of output/explained variables.
        nu (float): Forgetting factor of current model.
    
    Returns:
        tuple: RLS state at the beginning of the algorithm's execution.
    """
    Beta = np.squeeze(np.zeros((n, m)))
    return (key, Beta, np.eye(n), nu, 0., 0., None)

States are being updated by Spark by repeated calls to a function which has to
be defined beforehand. Such function should be able to handle multiple
input examples (sample size greater than one) at once. Therefore, let's define
a decorator for the automatically adapts the update function by calling the later
one time per input example. Such decorator is useful since it can be reused for
the parallel implementation as well.

In [120]:
def stochastic(func):
    """Decorator for state update functions.
    
    Iteratively calls the update function for each
    input example received from Kafka.
    
    Parameters:
        func (callable): State update function.
    
    Returns:
        callable: A new state update function.
    """
    def new_func(new_values, state):
        if len(new_values) > 0:
            # Call the update function on each example,
            # in the order in which they have been received.
            for i in range(len(new_values)):
                # Update current state
                new_state = func(new_values[i], state)
                state = new_state
            return new_state
        else:
            return state
    new_func.__name__ = func.__name__ # Preserve the name
    return new_func

Now let's implement the actual state update function that will perform one step of the RLS algorithm.
Let $V \in \mathbb{R}^{n \times n}$ be a matrix that is required to be keft up-to-date at each
step of the algorithm in order to compute $\alpha \in \mathbb{R}^n$, a vector representing the lengths
of the steps performed in the parameter space.
Let $x_t \in \mathbb{R}^n$ be the vector of input variables observed at time $t$ and $y^{(t)}$ the corresponding
vector of output variables at time $t$.

$$
\begin{align}
\begin{cases}
    V^{(t)} & = \frac{1}{\nu} \Bigg( V^{(t-1)} - \frac{V^{(t-1)} x_t^T x_t V^{(t-1)}}{1 + x_t V^{(t-1)} (x_t^T} \Bigg) \\
    \alpha_t & = V^{(t)} x_t^T \\
    e^{(t)} & = y^{(t)} - x_t \hat{B}^{(t-1)} \\
    \hat{B}^{(t)} & = \hat{B}^{(t-1)} + \alpha_t^T e^{(t)}
\end{cases}
\end{align}
$$

$e^{(t)}$ are the errors given current model parameters on the example $(x_t, y^{(t)})$,
and $e^{(t)}_i$ is the error on variable $y_i$ specifically. Given the current state:

$$
\big(k, B^{(t)}, V^{(t)}, \nu, sse^{(t)}, t, \text{None}\big)
$$

the update function computes the next step as:

$$
\big(k, B^{(t+1)}, V^{(t+1)}, \nu, sse^{(t+1)}, t+1, \text{None})
$$

where $sse^{(t+1)} = \frac{(t \cdot sse^{(t)} + e^{(t+1)})}{t + 1}$.

In [None]:
@stochastic
def full_state_update(new_example, state):
    """Perform one RLS step and return the new state.
    
    Parameters:
        new_example (tuple): Tuple containing the model
            identifier and the input example itself.
            Input example is a NumPy array
            of size (1 + m + n), where the first element
            is the example identifier, the m next elements
            are the values of output variables and the n last
            elements are the values of input variables.
        state (tuple): Current state of the algorithm.
    
    Returns:
        tuple: New state after performing one RLS step.
    """
    yx = new_example[1] # Input example
    key, Beta, V, nu, sse, N, _ = state # Unpack state
    n_inputs, n_outputs = Beta.shape
    
    # i is the example identifier, y the output variables
    # and x the input variables
    i, y, x = yx[0], yx[1:n_outputs+1], yx[n_outputs+1:]

    # Compute error
    err = y - np.dot(x, Beta)
    sse = (N * sse + np.mean(err ** 2.)) / (N + 1)

    # Update V matrix
    xtx = np.outer(x, x)
    V = (V - (np.dot(V, np.dot(xtx, V))) / (1. + np.dot(x, np.dot(V, x)))) / nu

    # Update coefficients
    alpha = np.dot(V, x)
    new_Beta = Beta + np.outer(alpha, err)
    return (key, new_Beta, V, nu, sse, N + 1, None)

### 2) RLS algorithm state representation - Parallel approach

In the parallel approach, the RLS agorithm is scalable not only w.r.t. to the number of models,
**but also w.r.t. the number of output variables**. Therefore, RLS states should be redefined in such a way that
only necessary coefficients are represented:

$$
(k, B_{\cdot j}^{(t)}, V^{(t)}, \nu, sse, t, j)
$$

$B_{\cdot j}$ is the jth column of matrix $B$ and $j$ is the index of the explained variable to be fitted.
It must be noted that the model is split into $m$ parts and matrix $V$ is computed for each varaible $y_j$:
each node of the cluster can thus compute matrix $V$ itself instead of waiting for another machine to do it.

First, a function for splitting a state into multiple partial states should be defined.
Each partial state contains a single column of the coefficient matrix $B$.
It should be highlighted that such function is **called only once** per model before starting
streaming data.

In [121]:
def split_state(state):
    """Splits a full state into multiple partial states.
    
    Each partial state is associated to a single output variable
    and shares the same key with the original (full) state.
    
    Parameters:
        state (tuple): Full state.
    
    Returns:
        list: A list of `m` tuples (states) where each is associated
            to a different output variables.
    """
    m = state[1].shape[1]
    partial_states = list()
    for j in range(m):
        key = state[0]
        beta = state[1][:, j] # jth column of Beta
        V, nu, sse, N, _ = state[2:]
        new_state = (key, beta, np.copy(V), nu, sse, N, j)
        partial_states.append(new_state)
    return partial_states

In order to retrieve the coefficients of the whole matrix $B$ as well as the error across all output variables,
another function has to be defined. Such function necessarily needs to be run in parallel w.r.t. the number of models.
For each model, the partial states are concatenated in order to obtain the full state, where the later is of the same
form as the states used in the vectorized approach.

The implementation is quite simple, as $k, V, \nu, t$ are assumed to be equal across all partial states of a same model.
The only operation to be performed is initializing the values of $B_{\cdot j}$ with the vector of partial state $j$,
for each partial state.

In [122]:
def combine_partial_states(states):
    """Combines partial states of a same model into a single state.
    
    Parameters:
        states (list): List of tuples where each tuple is
            a partial state.
    
    Returns:
        tuple: Full state.
    """
    # All the following info is retrieved from
    # the first state of the list but is supposed
    # to be the same everywhere
    n_inputs = len(states[0][1])
    n_outputs = len(states)
    key = states[0][0]
    V = states[0][2]
    nu = states[0][3]
    N = states[0][5]
    
    # Errors will be averaged across partial states.
    sses = list()
    
    # Initialize Beta column by column
    Beta = np.zeros((n_inputs, n_outputs))
    for state in states:
        j = state[6] # Index of the output variable
        Beta[:, j] = state[1]
        sses.append(state[4])
    sse = np.mean(sses)
    return (key, Beta, V, nu, sse, N, None)

Each RLS step is performed slightly differently than in the vectorized approach.
Matrix $V^{(t)}$ is updated exactly the same way. However, only the jth column 
of coefficient matrix $B$ are updated, based on the error on variable $y_j$ solely.

$$
\begin{align}
\begin{cases}
    \alpha_t & = V^{(t)} x_t^T \\
    e^{(t)} & = y_j^{(t)} - x_t \hat{B}_{\cdot j}^{(t-1)} \\
    \hat{B}_{\cdot j}^{(t)} & = \hat{B}_{\cdot j}^{(t-1)} + \alpha_t^T e^{(t)}
\end{cases}
\end{align}
$$

In [107]:
@stochastic
def partial_state_update(new_example, state):
    yx = new_example[1]
    key, beta, V, nu, sse, N, j = state
    n_inputs = len(beta)
    n_outputs = len(yx) - n_inputs - 1
    i, y, x = yx[0], yx[j+1], yx[n_outputs+1:]
    
    # Compute error
    err = y - np.dot(x, beta)
    sse = (N * sse + err ** 2.) / (N + 1)
    
    # Update V matrix
    xtx = np.outer(x, x)
    V = (V - (np.dot(V, np.dot(xtx, V))) / (1. + np.dot(x, np.dot(V, x)))) / nu

    # Update Beta matrix
    alpha = np.dot(V, x)
    new_beta = beta + alpha * err
    return (key, new_beta, V, nu, sse, N + 1, j)

### Global constants - RLS

The following constants are dedicated to the learning process itself.

In [108]:
# Number of input/explanatory variables
n = 10

# Number of output/explained variables
m = 8

# Threshold for using the parallel approach.
# When the number of output variables exceeds
# this threshold, the parallel approach is used
# instead of the vectorized one.
tau_local = 5

# Number of models to train in parallel
# for future validation of `nu`
n_models = 3

### Testing the state update functions

Let's quickly test both implementations of the RLS update function:

In [129]:
values = np.asarray([0] + [1.] * (n + m))
assert(full_state_update([('-', values)], create_state('-', n, m, 1.)))
state = create_state('-', n, 1, 1.)
assert(partial_state_update([('-', values)], split_state(create_state('-', n, m, 1.))[0]))

## Define streaming pipeline

---

<img src="report/imgs/lineage-graph.png">

In [128]:
# Creating one initial state per model
# Each model is assigned a different forgetting parameter
nus = np.linspace(0.5, 1.0, n_models)
initial_states = [create_state('mod%i' % (i + 1), n, m, nus[i]) for i in range(n_models)]

# Evaluate inputs to lists and convert them to arrays
array_dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))
#dstream.pprint()

dstream = array_stream.flatMap(lambda x: [('mod%i' % (i + 1), ('mod%i' % (i + 1), x)) for i in range(n_models)])
#dstream.pprint()

full_state_RDD = sc.parallelize([(u'mod%i' % (i + 1), initial_states[i]) for i in range(n_models)])

if m <= tau_local:
    dstream = dstream.updateStateByKey(full_state_update, initialRDD=full_state_RDD)
else:
    partial_state_RDD = full_state_RDD.flatMap(lambda x: [(x[0], s) for s in split_state(x[1])])
    dstream = dstream.updateStateByKey(partial_state_update, initialRDD=partial_state_RDD)
    dstream = dstream.groupByKey().map(lambda x: (x[0], merge_partial_states(x[1])))
dstream.pprint()

dstream = dstream.map(lambda x: x[1][4])

# Only display error
#dstream.pprint()

<class 'pyspark.streaming.dstream.DStream'>


### Start streaming application

In [113]:
ssc.start()

-------------------------------------------
Time: 2019-08-25 14:23:25
-------------------------------------------
('mod1', ('mod1', array([ 0.30314386,  1.37409928,  0.41483387, -0.22854956,  2.01558654,
       -0.42718012,  1.90076558,  1.70822599, -0.95328089, -0.21190103]), array([[ 2.40698215e+01,  6.88396275e+00, -2.44011056e+01,
        -1.99218865e+01,  2.01201011e+01,  1.23746046e+00,
        -2.19737576e+01,  7.47240577e+00, -1.96805243e+01,
         2.51218080e+01],
       [ 6.88396275e+00,  9.22265953e+01, -5.49862729e+00,
        -5.71660070e+01, -3.29028004e+00, -2.45761784e+01,
        -5.67749165e+01, -4.60046321e+01, -4.96475832e-02,
         2.44523027e+01],
       [-2.44011056e+01, -5.49862729e+00,  1.23935067e+02,
        -2.47156256e+01, -1.84198652e+01,  2.24622868e+01,
        -2.73243498e+00, -5.40831446e+01,  2.09166340e+01,
        -6.08751354e+01],
       [-1.99218865e+01, -5.71660070e+01, -2.47156256e+01,
         1.22493429e+02, -1.69741747e+01, -3.59706977e

### Stop streaming

In [114]:
ssc.stop(stopSparkContext=True, stopGraceFully=False)