INFO-H-515 Project <br>
2022–2023

# Phase 1 : Stream Processing - Consummer
Dimitris Sacharidis, Antonios Kontaxakis <br>
EPB, ULB 

## Information
Group Number : 5 <br>
Group Members : Rania Baguia (000459242), Hakim Amri (000459153), Julian Cailliau (000459856), Mehdi Jdaoudi (000457507)

## Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, rank, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.window import Window
from pyspark.streaming import StreamingContext
import json
import logging
import math
import numpy as np
import os
import pandas as pd
import pickle
import re, ast
import socket
import time

## Configuring the consumer

In [2]:
spark = SparkSession \
    .builder \
    .master("local[10]")\
    .config("spark.executor.instances", "1") \
    .config("spark.executor.cores", "10") \
    .config("spark.executor.memory", "16G") \
    .appName("Consummer") \
    .getOrCreate()

# Let us retrieve the sparkContext object
sc=spark.sparkContext

sc.setLogLevel("ERROR")
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)


23/05/14 18:39:43 WARN Utils: Your hostname, Mehdis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.205 instead (on interface en0)
23/05/14 18:39:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/14 18:39:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/14 18:39:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Loading the necessary information
We first load the sensors information, essentially sensor names. This allows us to programmatically create states for the following models.

In [3]:
sensors = []
with open("data/bikes_sensors.json", "r") as f:
    sensors = json.load(f)
    sensors = [
        sensor["properties"]["device_name"] for sensor in sensors["features"]
    ]

#### Set up the Stream function
This function will create a streaming context from a network socket.

In [4]:
def getDStream(sc, batch_interval):
    """
    Create a streaming context and a DStream from a network socket.

    Args:
        sc (SparkContext): The Spark context object.
        batch_interval (int): The time interval in seconds at which streaming data will be divided into batches.

    Returns:
        list: A list containing the streaming context (ssc) and the DStream (dstream).

    Raises:
        None

    Example:
        >>> sc = SparkContext(appName="StreamingExample")
        >>> ssc, dstream = getDStream(sc, 5)
    """

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    # Create a DStream that represents streaming data from a network socket
    dstream = ssc.socketTextStream("localhost", 9999)
    
    return [ssc,dstream]

#### Set up the states and update functions
We are storing the state per sensor which correspond to the cumulative sums, state_per_sensor_squared which is the sum of the squares, the timestamp (the reason why in the preprocessing we added a timestamp) which is the length of the observations, and finally the cumulated sum per covariate. The following equation is used as it allows a single pass on the data, which is in line with streaming principles :
\begin{equation*}
r_{xy} = \dfrac{n\sum x_i y_i - \sum x_i \sum y_i }{\sqrt{n\sum x_i^2-(\sum x_i)^2}\sqrt{n\sum y_i^2-(\sum y_i)^2}}
\end{equation*}

In [5]:
state_per_sensor = {i:0 for i in sensors}
state_per_sensor_rdd = sc.parallelize([('state_per_sensor', state_per_sensor)])

state_per_sensor_squared = {i:0 for i in sensors}
state_per_sensor_squared_rdd = sc.parallelize([('state_per_sensor_squared', state_per_sensor_squared)])

state_timestamp = 0
state_timestamp_rdd = sc.parallelize([('state_timestamp', state_timestamp)])

N_SENSORS = len(sensors)
indexes = [(i, j) for i in range(N_SENSORS) for j in range(N_SENSORS) if j > i]
pais_duplicates = [[row + "-" + col for col in sorted(sensors)] for row in sorted(sensors)]
pairs = [pais_duplicates[i[0]][i[1]] for i in indexes]
state_covariance_pairs = {i:0 for i in pairs}
state_covariance_pairs_rdd = sc.parallelize([('state_covariance_pairs', state_covariance_pairs)])

Due to the states structure, the same update function can be used all the time, as it is just a dictionnary with k, the sensor, and v, the values. The only different update function is for the timestamp, as it is a single number.

In [6]:
def updateFunction(new_values, state): 
    """
    Updates the state with new values.

    Args:
        new_values: The new values to be incorporated into the state.
        state: The current state.

    Returns:
        The updated state.
    """
    L=len(new_values) 
    if (L>0):
        initial_state = state
        for l in np.arange(L):
            sensor = new_values[l][0]
            count = new_values[l][1]
            initial_state[sensor] = initial_state[sensor] + count
        return initial_state    
    else:
        return state

def updateFunctionTimeStamp(new_values, state):
    """
    Updates the state with new timestamped values.

    Args:
        new_values: The new timestamped values to be incorporated into the state.
        state: The current state.

    Returns:
        The updated state.
    """ 
    L=len(new_values) 
    if (L>0):
        return new_values[0]
    else:
        return state

## Runnning the stream
The following cells will run the top-5 computations.

In [7]:
BATCH_INTERVAL = 5
N_SENSORS = len(sensors)

In [8]:
# Get the DStream object containing the streaming data sent by the producer notebook
[ssc,dstream]=getDStream(sc, BATCH_INTERVAL)

dataS = dstream.flatMap(lambda x: [*np.array(ast.literal_eval(x))])

# Obtaining the last timestamp
TimeStamp = dstream\
    .flatMap(lambda x: [("state_timestamp", max([int(i[5]) for i in np.array(ast.literal_eval(x))]))])

# Mapping the stream to its proper format
dataS = dataS\
    .map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), x[4], int(x[5])))

# Group the data by sensor
dataPerSensor = dataS\
    .map(lambda x: (x[4], x))\
    .groupByKey()\

# Computing the cumulative sum
cumSums = dataPerSensor\
    .mapValues(lambda x : sum([i[2] for i in x]))\
    .map(lambda x :(x[0], x[1]))\
    .flatMap(lambda x : [("state_per_sensor", x)])

# Computing the squared cumulative sum
cumSums_squared = dataPerSensor\
    .mapValues(lambda x : sum([i[2]**2 for i in x]))\
    .map(lambda x :(x[0], x[1]))\
    .flatMap(lambda x : [("state_per_sensor_squared", x)])

def compute_covariance_at_T(x, indexes):
    """
    Computes the covariance at time T for a given set of indexes.

    Args:
        x: The input data.
        indexes: The indexes indicating which covariances to compute.

    Returns:
        The computed covariances at time T.
    """
    x = sorted(x, key = lambda y : y[4])
    covAtT = [[(row[4] + "-" + col[4], row[2]*col[2]) for col in x] for row in x]
    covAtT = [covAtT[i[0]][i[1]] for i in indexes]
    return covAtT


# Computing the indexes for the covariance matrix
# Grouping the records per timestamp and computing the covariance matrix at time T
indexes = [(i, j) for i in range(N_SENSORS) for j in range(N_SENSORS) if j > i]
Cum_Covariances = dataS\
    .map(lambda x: (x[5], x))\
    .groupByKey()\
    .mapValues(lambda x : compute_covariance_at_T(x, indexes))\
    .flatMap(lambda x : [record for record in x[1]])\
    .groupByKey()\
    .mapValues(lambda x : sum(x))\
    .flatMap(lambda x : [("state_covariance_pairs", x)])
    
# Updating the states
Updated_TimeStamp = TimeStamp.updateStateByKey(updateFunctionTimeStamp, initialRDD=state_timestamp_rdd)
Updated_cumSums = cumSums.updateStateByKey(updateFunction, initialRDD=state_per_sensor_rdd)
Updated_cumSums_squared = cumSums_squared.updateStateByKey(updateFunction, initialRDD=state_per_sensor_squared_rdd)
Updated_covariance = Cum_Covariances.updateStateByKey(updateFunction, initialRDD=state_covariance_pairs_rdd)


Updated_cumSums_flatten = Updated_cumSums.flatMap(lambda x : [(i, x[1].get(i)) for i in x[1]])
Updated_cumSums_squared_flatten = Updated_cumSums_squared.flatMap(lambda x : [(i, x[1].get(i)) for i in x[1]])

def get_correlation(x):
    """
    Computes the correlation using the provided formula.

    Args:
        x: The input data.

    Returns:
        Tuple: A tuple containing the pair identifier and the computed correlation.
    """
    try :
        corr = (x[7]*x[2]-x[3]*x[5])/((math.sqrt(x[7]*x[4]-(x[3]**2)))*(math.sqrt(x[7]*x[6]-(x[5]**2))))
    except ZeroDivisionError:
        corr = -np.inf
    return (x[0] + "-" + x[1], corr)

# Joining the states to compute the correlation
Correlation = Updated_covariance\
    .flatMap(lambda x: [(pair, x[1].get(pair)) for pair in x[1]])\
    .map(lambda x : (*x[0].split("-"), x[1]))\
    .map(lambda x : (x[0], x))\
    .join(Updated_cumSums_flatten)\
    .map(lambda x : (x[0], (*x[1][0], x[1][1])))\
    .join(Updated_cumSums_squared_flatten)\
    .map(lambda x : (x[0], (*x[1][0], x[1][1])))\
    .map(lambda x : (x[1][1], x[1]))\
    .join(Updated_cumSums_flatten)\
    .map(lambda x : (x[0], (*x[1][0], x[1][1])))\
    .join(Updated_cumSums_squared_flatten)\
    .map(lambda x : (*x[1][0], x[1][1]))\
    .transformWith(lambda rdd1, rdd2: rdd1.cartesian(rdd2), Updated_TimeStamp)\
    .map(lambda x : (*x[0], x[1][1]))\
    .map(get_correlation)

# Filtering to get the top 5 correlations
Top_5 = Correlation\
    .transform(lambda rdd:rdd.ctx.parallelize(rdd.takeOrdered(5, lambda x: -x[1])))

Updated_TimeStamp.pprint()
Top_5.pprint()




In [9]:
ssc.start()

                                                                                

-------------------------------------------
Time: 2023-05-14 18:39:55
-------------------------------------------
('state_timestamp', 0)

-------------------------------------------
Time: 2023-05-14 18:39:55
-------------------------------------------
('CB1101-CJE181', -inf)
('CEK18-CJE181', -inf)
('CEV011-CJE181', -inf)
('CB1143-CJE181', -inf)
('CEK049-CJE181', -inf)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:00
-------------------------------------------
('state_timestamp', 0)

-------------------------------------------
Time: 2023-05-14 18:40:00
-------------------------------------------
('CB1101-CJE181', -inf)
('CEK18-CJE181', -inf)
('CEV011-CJE181', -inf)
('CB1143-CJE181', -inf)
('CEK049-CJE181', -inf)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:05
-------------------------------------------
('state_timestamp', 0)

-------------------------------------------
Time: 2023-05-14 18:40:05
-------------------------------------------
('CB1101-CJE181', -inf)
('CEK18-CJE181', -inf)
('CEV011-CJE181', -inf)
('CB1143-CJE181', -inf)
('CEK049-CJE181', -inf)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:10
-------------------------------------------
('state_timestamp', 2880)

-------------------------------------------
Time: 2023-05-14 18:40:10
-------------------------------------------
('CB02411-CEK049', 0.7657636106436597)
('CEK049-CJM90', 0.7500074623328697)
('CB02411-CJM90', 0.7341503240487527)
('CB2105-CJM90', 0.7071314753542198)
('CB2105-CEK049', 0.6943233481656088)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:15
-------------------------------------------
('state_timestamp', 5760)

-------------------------------------------
Time: 2023-05-14 18:40:15
-------------------------------------------
('CEK049-CJM90', 0.8895591661284046)
('CB02411-CEK049', 0.8806025427740716)
('CB02411-CJM90', 0.8799457993377243)
('CB1143-CB2105', 0.7056776812543091)
('CB1143-CJM90', 0.6933695688222551)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:20
-------------------------------------------
('state_timestamp', 8640)

-------------------------------------------
Time: 2023-05-14 18:40:20
-------------------------------------------
('CEK049-CJM90', 0.8199521335264696)
('CB02411-CJM90', 0.7973627802301151)
('CB02411-CEK049', 0.7969983218815763)
('CB1143-CEK049', 0.6954347469601095)
('CB1143-CJM90', 0.6299576745113024)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:25
-------------------------------------------
('state_timestamp', 8640)

-------------------------------------------
Time: 2023-05-14 18:40:25
-------------------------------------------
('CEK049-CJM90', 0.8199521335264696)
('CB02411-CJM90', 0.7973627802301151)
('CB02411-CEK049', 0.7969983218815763)
('CB1143-CEK049', 0.6954347469601095)
('CB1143-CJM90', 0.6299576745113024)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:30
-------------------------------------------
('state_timestamp', 11520)

-------------------------------------------
Time: 2023-05-14 18:40:30
-------------------------------------------
('CEK049-CJM90', 0.819021704421524)
('CB02411-CJM90', 0.7778404487961231)
('CB02411-CEK049', 0.7759358889102512)
('CB1143-CEK049', 0.7043333216786912)
('CB1599-CLW239', 0.6986806136245722)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:35
-------------------------------------------
('state_timestamp', 14400)

-------------------------------------------
Time: 2023-05-14 18:40:35
-------------------------------------------
('CEK049-CJM90', 0.8037096924896054)
('CB02411-CEK049', 0.7514565342051365)
('CB02411-CJM90', 0.748493056316119)
('CB1143-CEK049', 0.7065300874988342)
('CB1599-CLW239', 0.6979038112695906)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:40
-------------------------------------------
('state_timestamp', 17280)

-------------------------------------------
Time: 2023-05-14 18:40:40
-------------------------------------------
('CEK049-CJM90', 0.8091435174283385)
('CB02411-CEK049', 0.7410423170281322)
('CB02411-CJM90', 0.7376331123400591)
('CB1599-CLW239', 0.6887482374975789)
('CB2105-CEK049', 0.6774306410717481)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:45
-------------------------------------------
('state_timestamp', 17280)

-------------------------------------------
Time: 2023-05-14 18:40:45
-------------------------------------------
('CEK049-CJM90', 0.8091435174283385)
('CB02411-CEK049', 0.7410423170281322)
('CB02411-CJM90', 0.7376331123400591)
('CB1599-CLW239', 0.6887482374975789)
('CB2105-CEK049', 0.6774306410717481)



                                                                                

-------------------------------------------
Time: 2023-05-14 18:40:50
-------------------------------------------
('state_timestamp', 20160)

-------------------------------------------
Time: 2023-05-14 18:40:50
-------------------------------------------
('CEK049-CJM90', 0.8078367575672482)
('CB02411-CJM90', 0.7428091779203823)
('CB1599-CLW239', 0.7118793540768974)
('CB02411-CEK049', 0.7011036801865222)
('CB1599-CB2105', 0.669829283816831)



[Stage 0:>                                                          (0 + 1) / 1]

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

### Stopping the spark session

In [11]:
spark.stop()