INFO-H-515 Project <br>
2022–2023

# Phase 2 : Consumer
Gianluca Bontempi, Théo Verhelst, Cédric Simar <br>
Computer Science Department, ULB

## Information
Group Number : 5 <br>
Group Members : Rania Baguia (000459242), Hakim Amri (000459153), Julian Cailliau (000459856), Mehdi Jdaoudi (000457507)

## Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, rank, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.window import Window
from pyspark.streaming import StreamingContext
import json
import logging
import math
import numpy as np
import os
import datetime
import pandas as pd
import pickle
import re, ast
import socket
import time

## Configuring the consumer

In [None]:
spark = SparkSession \
    .builder \
    .master("local[10]")\
    .config("spark.executor.instances", "1") \
    .config("spark.executor.cores", "10") \
    .config("spark.executor.memory", "16G") \
    .appName("Consummer") \
    .getOrCreate()

# Let us retrieve the sparkContext object
sc=spark.sparkContext

sc.setLogLevel("ERROR")
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)


#### Loading the necessary information
We first load the sensors information, essentially sensor names. This allows us to programmatically create states for the following models.

In [None]:
sensors = []
with open("data/bikes_sensors.json", "r") as f:
    sensors = json.load(f)
    sensors = [
        sensor["properties"]["device_name"] for sensor in sensors["features"]
    ]

#### Set up the Stream function
This function will create a streaming context from a network socket.

In [None]:
def getDStream(sc, batch_interval):
    """
    Create a streaming context and a DStream from a network socket.

    Args:
        sc (SparkContext): The Spark context object.
        batch_interval (int): The time interval in seconds at which streaming data will be divided into batches.

    Returns:
        list: A list containing the streaming context (ssc) and the DStream (dstream).

    Raises:
        None

    Example:
        >>> sc = SparkContext(appName="StreamingExample")
        >>> ssc, dstream = getDStream(sc, 5)
    """

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    # Create a DStream that represents streaming data from a network socket
    dstream = ssc.socketTextStream("localhost", 9999)
    
    return [ssc,dstream]

#### Key notebook variables

In [None]:
NSENSORS = 18

## First Persistance Model
A simple naive model returning the last observed value per sensor running concurrently. The equation is like the following :
\begin{equation*}
\hat{y}(t+1, sensor) = y(t, sensor)
\end{equation*}

First, we need to define the state of such a model. This is done programmatically for every available sensor. It is worth noting that, the producer have the ability to send only few sensors. Then, on the consummer side, all the sensors will be available in the states, however, only the one sent will be updated. Those can be differentiated with the `N` state which is the number of forecast. They will have 0 forecasts. It is also worth noting that each sensor have its own record in the RDD with its name as key. This allows for parallel updates of the states across sensors.

In [None]:
PersModelMSE = [(i,{"SSE":0, "Forecast" : 0, "N":0, "N_MSE":0 ,"MSE":0}) for i in sensors]
PersModelRDD = sc.parallelize(PersModelMSE)

Then, we need to create the update function for updating the states. We went one step ahead by wrapping the function into a function that is returning the update function. This pattern allows us to pass custom arguments to update function that is passed to `.updateStateByKey()`. As such, we designed the function to be able to compute the MSE through the whole period, or between a specific time interval (which is usefull for the question 4).

In [None]:
def createUpdateFunction(MSE_range:tuple=(None, None),)  -> callable:
    """
    Create an update function for stateful streaming calculations.

    Args:
        MSE_range (tuple, optional): A tuple representing the range of 
        MSE (Mean Squared Error) dates in the format ("%Y-%m-%d", "%Y-%m-%d"). During this range,
        the MSE will be computed, considering the past as solely training. 
        Defaults to (None, None).

    Returns:
        function: The update function that calculates and updates the state based on new values.

    Raises:
        None

    Example:
        >>> update_func = createUpdateFunction(("2022-01-01", "2022-12-31"))
    """
    if MSE_range != (None, None):
        LB = datetime.datetime.strptime(MSE_range[0], "%Y-%m-%d")
        HB = datetime.datetime.strptime(MSE_range[1], "%Y-%m-%d")

    def updateFunction(new_values, state) -> dict:
        """
        Update function that calculates and updates the state based on new values.

        Args:
            new_values (list): A list of new values.
            state (dict): The current state.

        Returns:
            dict: The updated state.

        Raises:
            None
        """ 
        L=len(new_values) 
        if (L>0):
            initial_state = state
            Observations = sorted(new_values[0], key = lambda x: x[5], reverse=False)
            ObservationsCounts = [i[2] for i in Observations]
            for i in range(0, len(ObservationsCounts)):
                if initial_state["N"] != 0 :
                    if MSE_range == (None, None):
                        err = ObservationsCounts[i] - initial_state["Forecast"] 
                        initial_state["SSE"] = initial_state["SSE"] + err ** 2
                        initial_state["MSE"] = initial_state["SSE"]/initial_state["N"]
                    else :
                        if LB <= datetime.datetime.strptime(Observations[i][0], "%Y-%m-%d") <= HB:
                            if initial_state["N_MSE"] != 0 :
                                err = ObservationsCounts[i] - initial_state["Forecast"] 
                                initial_state["SSE"] = (initial_state["SSE"] if initial_state["SSE"] != -np.inf else 0) + err ** 2
                                initial_state["MSE"] = initial_state["SSE"]/initial_state["N_MSE"]
                            initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                        else :
                            initial_state["SSE"] = -np.inf
                            initial_state["MSE"] = -np.inf
                initial_state["Forecast"] =  ObservationsCounts[i]
                initial_state["N"] = initial_state["N"] + 1  
                
            return initial_state   
        else:
            return state
        
    return updateFunction

In [None]:
BATCH_INTERVAL = 10
# Get the DStream (discretized stream) using the specified batch interval
[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

# Flatten the input stream and convert each element to a tuple
dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

# Map each tuple by the sensor ID and partition the data by the number of sensors
dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)

# Create an update state function using the specified MSE range
updateStateFunction = createUpdateFunction(MSE_range=("2022-04-15", "2023-03-31"))

# Update the state of the DStream using the update state function, with an initial RDD provided
FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=PersModelRDD)

# Print the future forecasts to the console
FutureForecasts.pprint()

# Save each RDD in the DStream as a text file
FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile("FP_final_state_.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Weighted Persistence Model
A weighted persistence model returning the average of the last V values per sensor running concurrently with V = 2, 3, 4. The equation is like the following :

\begin{equation*}
\hat{y}(t+1, sensor, V) = \dfrac{\sum_{n=0}^{V-1} y(t-n, sensor)}{V}
\end{equation*}

The createUpdateFunction is very similar to the simple persistance model, except we add the ability to have an average window for the forecast. This is relevant for changing the parameter and evaluating the model again. Furthermore, a trick we used for evaluating the borders the batch, we just store the last observation for the average computation in the state, and then we add the observation at the begining of the upcomming batch. This allows us to simply "roll" the average computaion and not miss borders.

In [None]:
def createUpdateFunction(average_window:int, MSE_range:tuple=(None, None)) -> callable:
    """
    Create an update function for stateful stream processing.

    Args:
        average_window (int): The size of the moving average window for forecasting.
        MSE_range (tuple, optional): A tuple representing the range of 
        MSE (Mean Squared Error) dates in the format ("%Y-%m-%d", "%Y-%m-%d"). During this range,
        the MSE will be computed, considering the past as solely training.  
        Defaults to (None, None).

    Returns:
        function: The update function that calculates and updates the state based on new values.

    Raises:
        None

    Example:
        >>> update_func = createUpdateFunction(5, ("2022-01-01", "2022-12-31"))
    """
    if MSE_range != (None, None):
        LB = datetime.datetime.strptime(MSE_range[0], "%Y-%m-%d")
        HB = datetime.datetime.strptime(MSE_range[1], "%Y-%m-%d")

    def updateFunction(new_values, state) -> dict: 
        L=len(new_values) 
        if (L>0):
            initial_state = state
            Observations = sorted(initial_state["LastObservations"] + list(new_values[0]), key = lambda x: x[5], reverse=False)
            ObservationsCounts = [i[2] for i in Observations]

            for i in range(average_window-1, len(ObservationsCounts)):
                if initial_state["N"] != 0 :
                    if MSE_range == (None, None):
                        err = ObservationsCounts[i] - initial_state["Forecast"] 
                        initial_state["SSE"] = initial_state["SSE"] + err ** 2
                        initial_state["MSE"] = initial_state["SSE"]/initial_state["N"]
                    else :
                        if LB <= datetime.datetime.strptime(Observations[i][0], "%Y-%m-%d") <= HB:
                            if initial_state["N_MSE"] != 0 :
                                err = ObservationsCounts[i] - initial_state["Forecast"] 
                                initial_state["SSE"] = (initial_state["SSE"] if initial_state["SSE"] != -np.inf else 0) + err ** 2
                                initial_state["MSE"] = initial_state["SSE"]/initial_state["N_MSE"]
                            initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                        else :
                            initial_state["SSE"] = -np.inf
                            initial_state["MSE"] = -np.inf
                
                initial_state["Forecast"] =  np.mean(np.array(ObservationsCounts[i-average_window+1:i+1]))
                initial_state["N"] = initial_state["N"] + 1

            initial_state["LastObservations"] = Observations[-average_window + 1:]
                
            return initial_state   
        else:
            return state
        
    return updateFunction

#### Average Order 2

We can just set a parameter `WEIGHTED_AVG_LEN` which can be passed to the `createUpdateFunction`. The stream processing is similar to the begining. The only difference is the state to save, which adds the previous observations.

In [None]:
WEIGHTED_AVG_LEN = 2

In [None]:
PersModelAverage = [(i,{"SSE":0, "Forecast" : 0, "N":0, "N_MSE":0 ,"MSE":0, "LastObservations":[]}) for i in sensors]
PersModelAverageRDD = sc.parallelize(PersModelAverage)

In [None]:
[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)
    
updateStateFunction = createUpdateFunction(average_window = WEIGHTED_AVG_LEN, MSE_range=("2022-04-15", "2023-03-31"))

FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=PersModelAverageRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"WP_final_state_{WEIGHTED_AVG_LEN}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Average Order 3

In [None]:
WEIGHTED_AVG_LEN = 3

In [None]:
PersModelAverage = [(i,{"SSE":0, "Forecast" : 0, "N":0, "N_MSE":0 ,"MSE":0, "LastObservations":[]}) for i in sensors]
PersModelAverageRDD = sc.parallelize(PersModelAverage)

In [None]:
[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)
    
updateStateFunction = createUpdateFunction(average_window = WEIGHTED_AVG_LEN, MSE_range=("2022-04-15", "2023-03-31"))

FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=PersModelAverageRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"WP_final_state_{WEIGHTED_AVG_LEN}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Average Order 4

In [None]:
WEIGHTED_AVG_LEN = 4

In [None]:
PersModelAverage = [(i,{"SSE":0, "Forecast" : 0, "N":0, "N_MSE":0 ,"MSE":0, "LastObservations":[]}) for i in sensors]
PersModelAverageRDD = sc.parallelize(PersModelAverage)

In [None]:
[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)
    
updateStateFunction = createUpdateFunction(average_window = WEIGHTED_AVG_LEN, MSE_range=("2022-04-15", "2023-03-31"))

FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=PersModelAverageRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"WP_final_state_{WEIGHTED_AVG_LEN}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Recursive Least Squares
The recursive least squares is an online learning method. It can learn while observing passing data to predict following observation in a regression setting.  

\begin{equation*}
\begin{cases}
V_{(t)}&=\frac{1}{\nu} \left(V_{(t-1)}
-\frac{V_{(t-1)} x^T_{t} x_{t} V_{(t-1)}}{1+ x_{t} V_{(t-1)} x^T_{t}} \right)\\[3pt]
\alpha_{(t)}&= V_{(t)} x^T_{t} \\[3pt]
e&= y_{t}- x_{t} \hat{\beta}_{(t-1)}  \\[3pt]
\hat{\beta}_{(t)}&=\hat{\beta}_{(t-1)}+ \alpha_{(t)} e \\[3pt]
\end{cases}
\end{equation*}

where $V$ is the covariance matrix and $\beta$ is the set of parameters of the linear model.

We first tried this approach using only the last $n$ observation (embedding order). Then we added numerical features to the observation so it can better fine tune the prediction.

### Embedded RLS

The process is very similar to the weighted average. The difference is the forecasting strategy. In fact, we first initialize the parameters (beta, V and the forgetting factor) per sensor. Then we can use those values and make a forecast. After the first step, we can load those values, correct them with the forecasting error, and then making a forecast for the next data and update the states. 

In [None]:
def RLSstep(y: np.ndarray, yhat: np.ndarray, x: np.ndarray, Beta: np.ndarray, Var: np.ndarray, nu: float) -> tuple:
    """
    Performs a Recursive Least Squares (RLS) step for parameter estimation.

    Args:
        y (np.ndarray): The observed output values.
        yhat (np.ndarray): The predicted output values.
        x (np.ndarray): The input feature values.
        Beta (np.ndarray): The estimated model parameters.
        Var (np.ndarray): The estimated parameter covariance matrix.
        nu (float): A forgetting factor.

    Returns:
        tuple: A tuple containing updated values of yhat (predicted output), err (prediction error),
               Var (updated parameter covariance matrix), and Beta (updated model parameters).

    Raises:
        None

    Example:
        >>> y = np.array([1, 2, 3])
        >>> yhat = np.array([0.5, 1.5, 2.5])
        >>> x = np.array([[1, 2], [2, 3], [3, 4]])
        >>> Beta = np.array([0.1, 0.2])
        >>> Var = np.eye(2)
        >>> nu = 0.5
        >>> result = RLSstep(y, yhat, x, Beta, Var, nu)
    """
    Var = (1/nu)*(Var - ((Var.dot(x.T).dot(x).dot(Var))/(1+float(x.dot(Var).dot(x.T)))))
    alpha = Var.dot(x.T)
    err = y - yhat
    Beta = Beta + alpha*err
    yhat = x.dot(Beta)
    return (yhat, err, Var, Beta)

In [None]:
def createUpdateFunction(embeddingOrder: int, MSE_range: tuple=(None, None)) -> callable:
    """
    Create an update function for stateful stream processing using Recursive Least Squares (RLS) algorithm.

    Args:
        embeddingOrder (int): The embedding order for RLS algorithm.
        MSE_range (tuple, optional): A tuple representing the range of MSE (Mean Squared Error)
        dates in the format ("%Y-%m-%d", "%Y-%m-%d"). During this range,
        the MSE will be computed, considering the past as solely training. 
        Defaults to (None, None).

    Returns:
        callable: The update function that calculates and updates the state based on new values.

    Raises:
        None

    Example:
        >>> update_func = createUpdateFunction(3, ("2022-01-01", "2022-12-31"))
    """
    if MSE_range != (None, None):
        LB = datetime.datetime.strptime(MSE_range[0], "%Y-%m-%d")
        HB = datetime.datetime.strptime(MSE_range[1], "%Y-%m-%d")

    def UpdateFunction(new_values, state) -> dict:
        """
        Performs an update step for stateful stream processing using Recursive Least Squares (RLS) algorithm.

        Args:
            new_values (list): The new values to be processed.
            state (dict): The current state.

        Returns:
            dict: The updated state.

        Raises:
            None
        """
        if len(new_values) > 0:
            initial_state = state
            OBS = sorted((initial_state["LastObservations"]+new_values[0]), key= lambda x: x[5], reverse= False)
            observations = [i[2] for i in OBS]
            beta = initial_state["BetaVector"]
            beta.shape = (len(beta),1)
            for i in range(embeddingOrder-1, len(observations)): 
                x = np.array(observations[i-embeddingOrder+1:i+1])
                x = np.append(1,x)
                x.shape= (1,len(x))
                y = observations[i]
                RLS_results = RLSstep(y,initial_state["Forecast"], x, beta, initial_state["Variance"], initial_state["ForgettingFactor"] )
                initial_state["Forecast"] = RLS_results[0]
                beta = RLS_results[3]
                initial_state["Variance"] = RLS_results[2]
                initial_state["N"] = initial_state["N"] + 1
                err = RLS_results[1]
                if MSE_range == (None, None):
                    
                    initial_state["SSE"] = initial_state["SSE"] + err**2 if initial_state["N_MSE"] != 0 else 0                     
                    initial_state["MSE"] =  initial_state["SSE"]/initial_state["N_MSE"] if initial_state["N_MSE"] != 0 else 0 
                    initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                else: 
                    if LB <= datetime.datetime.strptime(OBS[i][0], "%Y-%m-%d") <= HB:
                        initial_state["SSE"] = initial_state["SSE"] if initial_state["SSE"] != -np.inf else 0
                        initial_state["SSE"] = initial_state["SSE"] + err**2
                        initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                        initial_state["MSE"] =  initial_state["SSE"]/initial_state["N_MSE"] 
                    else: 
                        initial_state["MSE"] = -np.inf
                        initial_state["SSE"] = -np.inf
                
            initial_state["BetaVector"] = beta  
            initial_state["LastObservations"] =  OBS[-embeddingOrder+1:] if embeddingOrder != 1 else []

            return initial_state
        else:
            return state

    return UpdateFunction

#### Embedding Order 1

The process is similar to the last models, except the states and the update function.

In [None]:
EMBEDDING_ORDER =  1
INITIALIZATION_VARIANCE = 10
RANGE = ("2022-04-15","2023-03-31")

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 10

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)

updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER, MSE_range=RANGE)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 2

In [None]:
EMBEDDING_ORDER =  2
INITIALIZATION_VARIANCE = 10
RANGE = ("2022-04-15","2023-03-31")

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 20

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)

updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER, MSE_range=RANGE)

FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 3

In [None]:
EMBEDDING_ORDER =  3
INITIALIZATION_VARIANCE = 10
RANGE = ("2022-04-15","2023-03-31")

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 20

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)

updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER, MSE_range=RANGE)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 4

In [None]:
EMBEDDING_ORDER =  4
INITIALIZATION_VARIANCE = 10
RANGE = ("2022-04-15","2023-03-31")

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 20

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)

updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER, MSE_range=RANGE)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_final_state_{EMBEDDING_ORDER}.txt"))


In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

### Addding numerical time features
Finally, we add the following time numerical features to the modelling:
- Hour of the day 
- Day of the week 
- Month of the year

In [None]:
def RLSstep(y: np.ndarray, yhat: np.ndarray, x: np.ndarray, Beta: np.ndarray, Var: np.ndarray, nu: float) -> tuple:
    """
    Performs a Recursive Least Squares (RLS) step for parameter estimation.

    Args:
        y (np.ndarray): The observed output values.
        yhat (np.ndarray): The predicted output values.
        x (np.ndarray): The input feature values.
        Beta (np.ndarray): The estimated model parameters.
        Var (np.ndarray): The estimated parameter covariance matrix.
        nu (float): A forgetting factor.

    Returns:
        tuple: A tuple containing updated values of yhat (predicted output), err (prediction error),
               Var (updated parameter covariance matrix), and Beta (updated model parameters).

    Raises:
        None

    Example:
        >>> y = np.array([1, 2, 3])
        >>> yhat = np.array([0.5, 1.5, 2.5])
        >>> x = np.array([[1, 2], [2, 3], [3, 4]])
        >>> Beta = np.array([0.1, 0.2])
        >>> Var = np.eye(2)
        >>> nu = 0.5
        >>> result = RLSstep(y, yhat, x, Beta, Var, nu)
    """
    Var = (1/nu)*(Var - ((Var.dot(x.T).dot(x).dot(Var))/(1+float(x.dot(Var).dot(x.T)))))
    alpha = Var.dot(x.T)
    err = y - yhat
    Beta = Beta + alpha*err
    yhat = x.dot(Beta)
    return (yhat, err, Var, Beta)

In [None]:
def createUpdateFunction(embeddingOrder: int, MSE_range: tuple=(None, None)) -> callable:
    """
    Create an update function for stateful stream processing using Recursive Least Squares (RLS) algorithm.

    Args:
        embeddingOrder (int): The embedding order for RLS algorithm.
        MSE_range (tuple, optional): A tuple representing the range of MSE (Mean Squared Error)
        dates in the format ("%Y-%m-%d", "%Y-%m-%d"). During this range,
        the MSE will be computed, considering the past as solely training. 
        Defaults to (None, None).

    Returns:
        callable: The update function that calculates and updates the state based on new values.

    Raises:
        None

    Example:
        >>> update_func = createUpdateFunction(3, ("2022-01-01", "2022-12-31"))
    """
    if MSE_range != (None, None):
        LB = datetime.datetime.strptime(MSE_range[0], "%Y-%m-%d")
        HB = datetime.datetime.strptime(MSE_range[1], "%Y-%m-%d")

    def UpdateFunction(new_values, state) -> dict:
        """
        Performs an update step for stateful stream processing using Recursive Least Squares (RLS) algorithm.

        Args:
            new_values (list): The new values to be processed.
            state (dict): The current state.

        Returns:
            dict: The updated state.

        Raises:
            None
        """
        if len(new_values) > 0:
            initial_state = state
            OBS = sorted((initial_state["LastObservations"]+new_values[0]), key= lambda x: x[5], reverse= False)
            observations = [i[2] for i in OBS]
            beta = initial_state["BetaVector"]
            beta.shape = (len(beta),1)
            for i in range(embeddingOrder-1, len(observations)): 
                x = np.array(observations[i-embeddingOrder+1:i+1]+list(OBS[i][6:]))
                x = np.append(1,x)
                x.shape= (1,len(x))
                y = observations[i]
                RLS_results = RLSstep(y,initial_state["Forecast"], x, beta, initial_state["Variance"], initial_state["ForgettingFactor"] )
                initial_state["Forecast"] = RLS_results[0]
                beta = RLS_results[3]
                initial_state["Variance"] = RLS_results[2]
                initial_state["N"] = initial_state["N"] + 1
                err = RLS_results[1]
                if MSE_range == (None, None):
                    
                    initial_state["SSE"] = initial_state["SSE"] + err**2 if initial_state["N_MSE"] != 0 else 0                     
                    initial_state["MSE"] =  initial_state["SSE"]/initial_state["N_MSE"] if initial_state["N_MSE"] != 0 else 0 
                    initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                else: 
                    if LB <= datetime.datetime.strptime(OBS[i][0], "%Y-%m-%d") <= HB:
                        
                        initial_state["SSE"] = initial_state["SSE"] if initial_state["SSE"] != -np.inf else 0
                        initial_state["SSE"] = initial_state["SSE"] + err**2
                        initial_state["N_MSE"] = initial_state["N_MSE"] + 1
                        initial_state["MSE"] =  initial_state["SSE"]/initial_state["N_MSE"] 
                    else: 
                        initial_state["MSE"] = -np.inf
                        initial_state["SSE"] = -np.inf
                
            initial_state["BetaVector"] = beta  
            initial_state["LastObservations"] =  OBS[-embeddingOrder+1:] if embeddingOrder != 1 else []

            return initial_state
        else:
            return state

    return UpdateFunction

#### Embedding Order 1

In [None]:
EMBEDDING_ORDER = 1
NUM_FEATURES = 3
INITIALIZATION_VARIANCE = 10

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 10

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

# Adding the numerical features to the data
def get_num_features(x):
    x = (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5]))
    date = datetime.datetime.strptime(x[0], "%Y-%m-%d")
    hour =(x[1]*15)//60
    x = x + (hour, date.weekday(), date.month)
    return x

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: get_num_features(x))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)


updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)\
    

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_num_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 2

In [None]:
EMBEDDING_ORDER = 2
NUM_FEATURES = 3
INITIALIZATION_VARIANCE = 10

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 30

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

def get_num_features(x):
    x = (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5]))
    date = datetime.datetime.strptime(x[0], "%Y-%m-%d")
    hour =(x[1]*15)//60
    x = x + (hour, date.weekday(), date.month)
    return x

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: get_num_features(x))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)


updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)\
    

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_num_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 3

In [None]:
EMBEDDING_ORDER = 3
NUM_FEATURES = 3
INITIALIZATION_VARIANCE = 10

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 20

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

def get_num_features(x):
    x = (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5]))
    date = datetime.datetime.strptime(x[0], "%Y-%m-%d")
    hour =(x[1]*15)//60
    x = x + (hour, date.weekday(), date.month)
    return x

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: get_num_features(x))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)


updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)\
    

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_num_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

#### Embedding Order 4

In [None]:
EMBEDDING_ORDER = 4
NUM_FEATURES = 3
INITIALIZATION_VARIANCE = 10

In [None]:
RLS = [
    (i,{
        "BetaVector": np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1),
        "Variance": np.diag(np.zeros(EMBEDDING_ORDER+NUM_FEATURES+1)+INITIALIZATION_VARIANCE),
        "ForgettingFactor":1,
        "SSE":0, 
        "Forecast" : 0, 
        "N":0, 
        "MSE":0,
        "N_MSE":0,
        "LastObservations":[]
        }) 
    for i in sensors
    ]
RLSRDD = sc.parallelize(RLS)

In [None]:
BATCH_INTERVAL = 20

[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

def get_num_features(x):
    x = (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5]))
    date = datetime.datetime.strptime(x[0], "%Y-%m-%d")
    hour =(x[1]*15)//60
    x = x + (hour, date.weekday(), date.month)
    return x

dataS = dstream\
    .flatMap(lambda x: [*np.array(ast.literal_eval(x))])\
    .map(lambda x: get_num_features(x))\

dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .partitionBy(NSENSORS)\
    .groupByKey()\
    .mapValues(list)


updateStateFunction = createUpdateFunction(embeddingOrder= EMBEDDING_ORDER)


FutureForecasts = dataPerSensor\
    .updateStateByKey(updateStateFunction, initialRDD=RLSRDD)\
    

FutureForecasts.pprint()

FutureForecasts.foreachRDD(lambda rdd: rdd.saveAsTextFile(f"RLS_num_final_state_{EMBEDDING_ORDER}.txt"))

In [None]:
ssc.start()

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Closing the spark session

In [119]:
spark.stop()