# Stream Processing
## Goal
The producer should first download all historical measurements between 2018-12-06 and 2023-03-31, and then send batches of measurements in regular time intervals. Specifically, every $∆$ seconds, the producer sends a batch of measurements corresponding to all timestamps within a time period $Π$. For example, if $Π$ = 30 days, and ∆ = 5 seconds, the producer will send one month’s worth of data for each sensor every 5 seconds. The producer should be parameterizable by $∆$ and $Π$.

In addition, the consumer here will use a sliding window to compute the correlation. The sliding window has length $W$ and
sliding interval $∆$, where $W$ is a multiple of $∆$.

## Common imports and Environment variables

In [2]:
pip install tqdm

Defaulting to user installation because normal site-packages is not writeable
Collecting tqdm
  Using cached tqdm-4.65.0-py3-none-any.whl (77 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.65.0
[0mNote: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
import numpy as np
import ast
from tqdm import tqdm
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from itertools import combinations
from pyspark.sql.types import StructField, IntegerType, FloatType
import os
import sys

os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 ' + \
                                    '--conf spark.driver.memory=4g  pyspark-shell '
streaming_path = "streaming"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

np.set_printoptions(threshold=sys.maxsize)

## Spark session construction

In [2]:
spark = SparkSession.builder.master("local[*]").appName("stream_processing").getOrCreate()

23/05/15 21:59:18 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.129.3 instead (on interface enp38s0)
23/05/15 21:59:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/15 21:59:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/15 21:59:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Data Streaming construction
We connect a `StreamingContext` object to the `producer` notebook, on `localhost` with port $9999$. The `producer` notebook has to be launched first.

### Method definitions
Since the message sent from the producer is in text format, this method converts its values back to the original format.

In [3]:
# Define the transform func
def convert_to_number(value):
    try:
        return int(value)
    except ValueError:
        try:
            return float(value)
        except ValueError:
            return value

The next method is used to load all streaming data, since from batch to batch we have only the most recent data, and then pivot the dataframe.
The function returns a pivoted dataframe with a column for each sensor, and a list of all the combination of sensors.

In [4]:
def load_and_process_main_df(spk):
    # Read the CSV file into a Spark DataFrame
    batch_data_at_t = spk.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(streaming_path)

    # pivot the dataframe
    pivot_df = batch_data_at_t.groupBy("Date", "Time gap").pivot("Sensor").agg(first("Count").alias("Count"),
                                                                               first("Average speed").alias(
                                                                                   "Average speed"))

    #Sort by date and time gap
    pivot_df = pivot_df.withColumn("Time gap", col("Time gap").cast("int")).sort("Date", "Time gap")

    # Store all columns names
    columns = pivot_df.columns

    # Store only "count" columns and timestamp
    count_columns = [c for c in columns if c.endswith("_Count")]
    # Create all possible combinations of sensors
    pairs = list(combinations(count_columns, 2))

    # Keep timestamp
    count_columns.append("Date")
    count_columns.append("Time gap")

    # Filter out the dataframe to only keep those
    return pivot_df.select(count_columns), pairs

This method is used to calculate the Pearson coefficient between two sensors.

In [5]:
# Define the function to calculate Pearson correlation coefficient with time component
def pearson_corr_coeff_time(count_df, i, j):
    # Assuming i and j are the names of two columns
    col_i = col(i)
    col_j = col(j)

    # We define the window function to take in consideration all the data from the current line to the beginning
    w = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
    # Compute the window function count for column i and j
    ci_count = count(i).over(w)
    cj_count = count(j).over(w)

    # Use a sub-query to calculate the various column needed for the calculation of the Pearson coefficient
    # We modify the name of the sensor column to i and j
    subquery = count_df.select('Date', 'Time gap', col_i.alias('i'), col_j.alias('j'),
                               ci_count.alias('ci_count'),
                               cj_count.alias('cj_count'))
    # We define columns that are the sum of the passage of bikes of the sensor i/j
    subquery = subquery.select('Date', 'Time gap', 'i', 'j', sum('i').over(w).alias('ci_total'),
                               sum('j').over(w).alias('cj_total'), 'ci_count', 'cj_count')
    # We define columns that are the mean of the sensor i/j
    subquery = subquery.select('Date', 'Time gap', 'i', 'j', 'ci_total', 'cj_total', 'ci_count', 'cj_count',
                               (col('ci_total') / col('ci_count')).alias('ci_mean'),
                               (col('cj_total') / col('cj_count')).alias('cj_mean'))
    # We define columns that are the numerator and the two part of the denominator of the Pearson coefficient formula
    subquery = subquery.select('Date', 'Time gap', 'i', 'j', 'ci_total', 'cj_total', 'ci_count', 'cj_count', 'ci_mean',
                               'cj_mean',
                               (sum((col('i') - col('ci_mean')) * (col('j') - col('cj_mean'))).over(w)).alias(
                                   'numerator'),
                               ((sum((col('i') - col('ci_mean')) ** 2).over(w)) ** 0.5).alias('den1'),
                               ((sum((col('j') - col('cj_mean')) ** 2).over(w)) ** 0.5).alias('den2'))

    # Compute the Pearson correlation coefficient for each date and time gap
    rij = subquery.select("Date", "Time gap",
                          when((col("den1") == 0) | (col("den2") == 0), 0).otherwise(
                              col("numerator") / (col("den1") * col("den2"))))

    # Renaming columns for clarity
    i = i.rstrip("_Count")
    j = j.rstrip("_Count")
    rij = rij.withColumnRenamed(rij.columns[2], i + "_" + j)

    return rij

This method handles the reception of the batch by converting the message to a list of data, creating a dataframe, and saving it in CSV format.
The saved data is then loaded to calculate the corresponding Pearson coefficient and for every batch received it will print the top 5 most correlated sensors at time $t$.

In [6]:
# Define a function to process each RDD
def process_rdd(_, rdd):
    if not rdd.isEmpty():
        # Get spark session
        spk = SparkSession.builder.master("local[*]").appName("stream_processing").getOrCreate()

        # Define the schema for the DataFrame
        schema = StructType([
            StructField("Date", StringType(), True),
            StructField("Time gap", IntegerType(), True),
            StructField("Sensor", StringType(), True),
            StructField("Count", FloatType(), True),
            StructField("Average speed", FloatType(), True)
        ])

        data_list = []
        # Loop through each row of the message
        for i, item in tqdm(enumerate(rdd.collect())):
            # Each row is an array, and we need to parse it to get it
            # for this we are getting rid of the characters that interfere with the parsing
            # ex. [['0','2018-12-06','1','CAT17','0.0','-1.0'], -> ['0','2018-12-06','1','CAT17','0.0','-1.0']
            if i == 0:
                single_row = ast.literal_eval(item[1:-1])
            else:
                single_row = ast.literal_eval(item[:-1])
            # We iterate through the single row from the second element because the first one is not needed.
            # We also convert the elements of the array to int or float if needed
            data_list.append(Row(*[convert_to_number(element) for element in single_row[1:]]))

        rows_df = spk.createDataFrame(data_list, schema)
        # Write csv so it is accessible to everyone
        rows_df.write.option("header", "true").mode("append").csv(streaming_path)
        # Load all the data
        main_df, pairs = load_and_process_main_df(spk)
        rij_dict = {}
        # Calculate the Pearson coefficient for each sensor pair
        for pair in tqdm(pairs):
            rij = pearson_corr_coeff_time(main_df, pair[0], pair[1])
            # We are going to take only the last value to get the most correlated sensor pair
            last_row = spk.createDataFrame(rij.tail(1))
            for i in last_row.collect():
                rij_dict[last_row.columns[2]] = i[last_row.columns[2]]
        # Create a spark Dataframe with all the values collected
        last_rj_df = spk.createDataFrame(Row(rij_dict))
        pdf = last_rj_df.toPandas()
        print("Top5 most correlated sensors:")
        # Transpose the Dataframe taking only the top 5 row, so is more readable
        print(pdf.T.nlargest(5, 0))

### Socket connection
This code block sets up a Spark Streaming application that reads data from a network socket and processes it in batches. The batch_interval variable specifies the time interval (in seconds) for each batch of data. The value must be less or equal to the $delta$ value set in the `producer`.

A StreamingContext is then created using the Spark context and the batch interval. Checkpointing is enabled to support stateful transformations.

Next, a DStream is created from a network socket using the `socketTextStream()` function. This creates a stream of data that is divided into RDDs, which are then processed using the `process_rdd()` function.

In [7]:
batch_interval = 5
w = 10*batch_interval

# Get Spark context
sc = spark.sparkContext
sc.setLogLevel("ERROR")

#Create streaming context, with required batch interval
ssc = StreamingContext(sc, batch_interval)

#Checkpointing needed for stateful transforms
ssc.checkpoint("checkpoint")

# Create a DStream that represents streaming data from a network socket
# See https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
dstream = ssc.socketTextStream("localhost", 9999)

# Returns a new dstream with windowed batches
dstream = dstream.window(w,5*batch_interval)

# Apply the process_rdd function to each RDD in the DStream
dstream.foreachRDD(process_rdd)

Then, we start the data consumption. At each `batch_interval`, the data sent by the producer is collected.

In [8]:
ssc.start()

# Await termination
ssc.awaitTermination()

51840it [00:00, 63833.70it/s]                                       (0 + 1) / 1]
100%|█████████████████████████████████████████| 153/153 [01:24<00:00,  1.80it/s]


Top5 most correlated sensors:
                       0
CB02411_CEK049  0.765724
CEK049_CJM90    0.750014
CB02411_CJM90   0.734156
CB2105_CJM90    0.707106
CB2105_CEK049   0.694289


51840it [00:00, 59838.03it/s]                                       (0 + 1) / 1]
100%|█████████████████████████████████████████| 153/153 [01:22<00:00,  1.85it/s]


Top5 most correlated sensors:
                       0
CB02411_CEK049  0.765724
CEK049_CJM90    0.750014
CB02411_CJM90   0.734156
CB2105_CJM90    0.707106
CB2105_CEK049   0.694289


51840it [00:00, 63186.17it/s]                                       (0 + 1) / 1]
100%|█████████████████████████████████████████| 153/153 [01:25<00:00,  1.79it/s]


Top5 most correlated sensors:
                       0
CEK049_CJM90    0.889583
CB02411_CEK049  0.880615
CB02411_CJM90   0.879970
CB1143_CB2105   0.705698
CB1143_CJM90    0.693394


51840it [00:00, 64189.38it/s]                                       (0 + 1) / 1]
100%|█████████████████████████████████████████| 153/153 [01:23<00:00,  1.83it/s]


Top5 most correlated sensors:
                       0
CEK049_CJM90    0.889583
CB02411_CEK049  0.880615
CB02411_CJM90   0.879970
CB1143_CB2105   0.705698
CB1143_CJM90    0.693394


51840it [00:00, 61495.73it/s]                                       (0 + 1) / 1]
100%|█████████████████████████████████████████| 153/153 [01:36<00:00,  1.59it/s]


Top5 most correlated sensors:
                       0
CEK049_CJM90    0.819957
CB02411_CJM90   0.797368
CB02411_CEK049  0.796997
CB1143_CEK049   0.695441
CB1143_CJM90    0.629960


51840it [00:00, 61507.17it/s]                                       (0 + 1) / 1]
  7%|██▋                                       | 10/153 [00:06<01:32,  1.54it/s]ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/lquivron/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/lquivron/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

To stop the data consumption, we stop the data stream object.

In [None]:
ssc.stop(stopSparkContext=False, stopGraceFully=False)