<a href="https://colab.research.google.com/github/diogocristovao/SPBD/blob/main/docs/labs/projs/spbd2425_tp2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sistemas para Processamento de Big Data
## TP2 - Energy Meter Live Monitoring


The sensor data corresponds to regular readings from 11 residential energy meters. The data covers the month of February 2024.

Each data sample has the following schema:

timestamp | sensor_id | energy
----------|-------------|-----------
timestamp | string  | float

Each energy value (KWh) corresponds to the accumulated value of the meter at the time of measurement. As such,
each meter is expected to produce a monotonically increasing series of pairs of timestamp and energy consummed up to that moment.

The meters do not start at zero or at the same value.


## Questions

For all the sensors combined:

1. For the current month and current day, compute the running total energy consumed so far. The values should be updated every 5 minutes.

2. For the current month and current day, compute the running total energy consumed so far, **as a percentage**, **compared to the same periods in February 2024**. The values should be updated every 5 minutes.

For each sensor, separately:

3. For the current month and current day, compute the running total energy consumed so far, as a percentage, **comparing the value of each individual sensor, relative to the same results for all the sensors together (as in #1)**. The values should be updated every 5 minutes. (Sorted in descending order by value and sensor.)

**Note:** For simplicity, it is fine to assume the first reading of each day can be used to start counting how much energy has been consumed so far. There is no need to interpolate/estimate the value of the meters at midnight.




## Requeriments

Solve each question using Structured Spark Streaming.

## Other Grading Criteria

+ Grading will also take into account the general clarity of the programming and of the presentation report (notebook).




### Deadline

December 6.

Penalty of 0.25 grade points per day late.

Penalty accumulates until the grade of the assignment reaches 8.0.

---
### Colab Setup


In [None]:
#@title Install PySpark
!pip install pyspark --quiet

In [1]:
#@title Download Archived February Energy Readings
!wget -q -O /tmp/readings.csv https://raw.githubusercontent.com/smduarte/spbd-2425/refs/heads/main/docs/labs/projs/energy-readings.csv
!grep "2024-02" /tmp/readings.csv > february-energy-readings.csv
!head -2 february-energy-readings.csv


2024-02-01 00:00:00;D;2615.0
2024-02-01 00:00:18;C;1098.8


In [1]:
#@title Start the Structured Source
!wget -q -O - https://github.com/smduarte/spbd-2425/raw/main/scripts/json_energy_sender.tgz  | tar xfz - 2> /dev/null

!nohup python json_energy_sender/server.py --filename json_energy_sender/energy-readings.csv --speedup 60 > /dev/null 2> /dev/null &


Note: --speedup 60, means the stream is played 60x faster than realtime. Therefore, 1 second in real time corresponds to 1 minute of stream data.


In [None]:
#@title Sample code to process the structured stream...
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()


# Extract a sample JSON string to infer schema
sample_json = '{"date": "2024-02-01 00:00:00", "sensor": "D", "energy": 2615.0}'
inferred_schema = schema_of_json(sample_json)


# Create DataFrame representing the stream of input
# lines from connection to logsender 7777
try:
  json_lines = spark.readStream.format("socket") \
      .option("host", "localhost") \
      .option("port", 7777) \
      .load()

  # Parse the JSON using the inferred schema
  json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
    .select("json_data.*")  # Expand the JSON fields into columns


  query = json_lines \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime='1 seconds') \
    .foreachBatch(lambda df, epoch: df.show(10, False)) \
    .start()

  query.awaitTermination(60)
except Exception as err:
  print(err)
  query.stop()

+----+------+------+
|date|energy|sensor|
+----+------+------+
+----+------+------+

+-------------------+-------+------+
|date               |energy |sensor|
+-------------------+-------+------+
|2024-10-01 00:04:21|2790.18|C     |
|2024-10-01 00:04:27|5949.0 |D     |
|2024-10-01 00:04:36|2162.37|J     |
|2024-10-01 00:04:52|2682.69|I     |
|2024-10-01 00:04:24|3993.9 |H     |
|2024-10-01 00:04:33|3481.07|E     |
|2024-10-01 00:04:43|1597.49|F     |
+-------------------+-------+------+

+-------------------+-------+------+
|date               |energy |sensor|
+-------------------+-------+------+
|2024-10-01 00:14:30|2790.19|C     |
|2024-10-01 00:14:42|3481.08|E     |
|2024-10-01 00:14:48|1668.96|B     |
|2024-10-01 00:14:54|1649.25|A     |
|2024-10-01 00:14:36|5949.1 |D     |
|2024-10-01 00:14:45|2162.5 |J     |
|2024-10-01 00:14:51|1597.5 |F     |
+-------------------+-------+------+

+-------------------+-------+------+
|date               |energy |sensor|
+-------------------+----

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=41>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/sock

An error occurred while calling o47.awaitTermination


In [2]:
#@title QUESTION 1

#while making this project, we foud out that before running any code we had to restart the session and restart the stream source or the several queries on the different cell would get mixed up and the output would be confusing

from pyspark.sql import *
from pyspark.sql.functions import *

#creates the spark session
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# defines a sample JSON string representing a single data record
sample_json = '{"date": "2024-02-01 00:00:00", "sensor": "D", "energy": 2615.0}'
inferred_schema = schema_of_json(sample_json)


try:
    # creates a data source to read the streamed data flow
    json_lines = spark.readStream.format("socket") \
        .option("host", "localhost") \
        .option("port", 7777) \
        .load()

    # creates a new column "json_data" that convert the json string received in the data flow to the defined sample_json schema
    json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
        .select("json_data.*")

    # creating the global variables to track the state of the sensors and the accumulated total. Without those global variables each batch would be treated independently and the running total wouldn´t be preserved and incremented
    global sensor_states
    sensor_states = {}
    global running_total
    running_total = 0.0

    # creating a function that computes the running total for the group of sensors
    def calculate_running_total(batch_df, batch_id):
        global sensor_states, running_total

        # .collect() converts the dataframe into a list of rows for processing and the for extracts the sensor identifier and correspondent energy value for each of those rows
        for row in batch_df.collect():
            sensor = row["sensor"]
            energy = row["energy"]

            #if the sensor isn´t yet in the sensor_states dictionary , it is added and the correspondent energy is used in the running total energy sum
            if sensor not in sensor_states:
                sensor_states[sensor] = energy
                running_total += energy
            else:
                # if the sensor is already in the dictionary, the energy difference between that reading and the one before is added to the running total energy value
                energy_difference = energy - sensor_states[sensor]
                if energy_difference > 0:
                    running_total += energy_difference
                sensor_states[sensor] = energy

        # print the running total energy for every mini-batch received
        print(f"Batch {batch_id} - Running Total Energy: {running_total}")

    # defining the query, append mode is used because we already have global variables that preserve the data between batches
    query = json_lines \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime='25 seconds') \
        .foreachBatch(calculate_running_total) \
        .start()

    query.awaitTermination(100)
except Exception as err:
    print(err)
    if 'query' in locals():
        query.stop()


#this output was obtained with a trigger of 25 seconds and an await termination of 100 seconds
#depending on the trigger chosen, some early batches might not contain all the 11 sensors (for example, the first reading of the K sensor only appears at 1 am) and the iterations of the running total between those batches might be somewhat significant

Batch 0 - Running Total Energy: 0.0
Batch 1 - Running Total Energy: 28056.309999999998
Batch 2 - Running Total Energy: 28056.619999999984
Batch 3 - Running Total Energy: 30204.40999999998
Batch 4 - Running Total Energy: 30204.94999999998


In [2]:
#@title QUESTION 2 - First Part

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, desc, row_number, sum, round
from pyspark.sql.window import Window

#In this first part of the second question, as an intermediate step, we reapllied the first project code to compute the running total energy for each day of february (available)

#initiates a new spark session
spark = SparkSession.builder.appName("ProcessFebruaryData").getOrCreate()

#uploads the february readings csv, specifying the different cell delimiter which is ";"
readings_february = spark.read.option("header", "false") \
    .option("inferSchema", "true") \
    .option("delimiter", ";") \
    .csv("february-energy-readings.csv")

# renames the columns manually (the february csv didn´t appear to have the columns names)
readings_february = readings_february.toDF("timestamp", "sensor", "energy")

# transform the 'timestamp' column from timestamo into DateType
readings_february = readings_february.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))

# select the last reading for each day of the month and for each sensor
window_spec = Window.partitionBy("sensor", "date").orderBy(desc("timestamp"))
daily_last_reading = readings_february.withColumn("row_number", row_number().over(window_spec)) \
    .filter(col("row_number") == 1) \
    .drop("row_number")

# Calcular o total acumulado diário para os sensores combinados
daily_running_total_february = daily_last_reading.groupBy("date").agg(
    round(sum("energy"), 2).alias("running_total_energy_february")
).orderBy("date")

# Declarar a variável global

#february_totals = {row["date"]: row["running_total_energy_february"] for row in daily_running_total_february.collect()}

daily_running_total_february.show()



+----------+-----------------------------+
|      date|running_total_energy_february|
+----------+-----------------------------+
|2024-02-01|                      13447.7|
|2024-02-02|                      13517.9|
|2024-02-09|                      14432.9|
|2024-02-10|                      14547.4|
|2024-02-11|                      14665.3|
|2024-02-12|                      14776.1|
|2024-02-13|                      14888.8|
|2024-02-14|                      14982.2|
|2024-02-15|                      15063.4|
|2024-02-16|                      15111.2|
|2024-02-18|                      15351.4|
|2024-02-19|                      15431.1|
|2024-02-20|                      15515.4|
|2024-02-21|                      15598.4|
|2024-02-22|                      15675.0|
|2024-02-23|                      15759.8|
|2024-02-24|                     15903.35|
|2024-02-25|                     16003.15|
|2024-02-26|                     16095.66|
|2024-02-27|                     16188.81|
+----------

In [None]:
#@title QUESTÃO 2 - 2ºParte

from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime

# JSON de exemplo para inferir o esquema
sample_json = '{"date": "2024-02-01 00:00:00", "sensor": "D", "energy": 2615.0}'
inferred_schema = schema_of_json(sample_json)

# Variáveis globais para rastrear o estado dos sensores e o total acumulado
sensor_states = {}
running_total = 0.0

# Função para calcular e comparar o total diário
def calculate_daily_comparison(batch_df, epoch_id):
    global sensor_states, running_total, daily_running_total_february

    # Atualizar o estado e calcular a soma incremental para cada sensor
    for row in batch_df.collect():
        sensor = row["sensor"]
        energy = row["energy"]
        timestamp = row["date"]
        current_date = datetime.strptime(timestamp.split(" ")[0], "%Y-%m-%d").date()

        # Atualizar o estado de cada sensor
        if sensor not in sensor_states:
            sensor_states[sensor] = energy
            running_total += energy
        else:
            # Calcular a diferença de energia e atualizar o total acumulado
            energy_difference = energy - sensor_states[sensor]
            if energy_difference > 0:  # Garantir que só valores crescentes sejam somados
                running_total += energy_difference
            sensor_states[sensor] = energy

    # Ajuste para comparação com fevereiro
    # Transformar a data para apenas o dia (desconsiderando o mês e ano)
    batch_df = batch_df.withColumn("day", dayofmonth("date"))
    february_data = daily_running_total_february.withColumn("day", dayofmonth("date"))

    # Fazer o join com o DataFrame de fevereiro
    comparison = february_data.join(
        batch_df.groupBy("day").agg(lit(running_total).alias("running_total_stream")),
        "day",
        "inner"
    ).withColumn(
        "percentage",
        (col("running_total_stream") / col("running_total_energy_february")) * 100
    )

    # Exibir os resultados
    comparison.select("day", "running_total_stream", "running_total_energy_february", "percentage").show(truncate=False)

# Criar a sessão do Spark
spark = SparkSession.builder.appName("StructuredWebLogExample").getOrCreate()

# Criar DataFrame representando o fluxo de entrada
try:
    # Ler o fluxo do socket
    json_lines = spark.readStream.format("socket") \
        .option("host", "localhost") \
        .option("port", 7777) \
        .load()

    # Analisar o JSON e expandir os campos
    json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
        .select("json_data.*")

    # Definir a consulta para processar o fluxo
    query = json_lines \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime='25 seconds') \
        .foreachBatch(calculate_daily_comparison) \
        .start()

    query.awaitTermination(100)
except Exception as err:
    print(err)
    if 'query' in locals():
        query.stop()


+---+--------------------+-----------------------------+----------+
|day|running_total_stream|running_total_energy_february|percentage|
+---+--------------------+-----------------------------+----------+
+---+--------------------+-----------------------------+----------+

+---+--------------------+-----------------------------+-----------------+
|day|running_total_stream|running_total_energy_february|percentage       |
+---+--------------------+-----------------------------+-----------------+
|1  |28056.18            |13447.7                      |208.6318106441994|
+---+--------------------+-----------------------------+-----------------+

+---+--------------------+-----------------------------+------------------+
|day|running_total_stream|running_total_energy_february|percentage        |
+---+--------------------+-----------------------------+------------------+
|1  |28056.41999999999   |13447.7                      |208.63359533600533|
+---+--------------------+---------------------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
#@title QYESTÃO 3


from pyspark.sql import *
from pyspark.sql.functions import *

# Criar a sessão do Spark
spark = SparkSession \
    .builder \
    .appName("StructuredWebLogExample") \
    .getOrCreate()

# JSON de exemplo para inferir o esquema
sample_json = '{"date": "2024-02-01 00:00:00", "sensor": "D", "energy": 2615.0}'
inferred_schema = schema_of_json(sample_json)

# Criar DataFrame representando o fluxo de entrada
try:
    # Ler o fluxo do socket
    json_lines = spark.readStream.format("socket") \
        .option("host", "localhost") \
        .option("port", 7777) \
        .load()

    # Analisar o JSON e expandir os campos
    json_lines = json_lines.withColumn("json_data", from_json(col("value"), inferred_schema)) \
        .select("json_data.*")  # Expandir os campos do JSON para colunas

    # Variáveis globais para rastrear o estado dos sensores e o total acumulado
    global sensor_states
    sensor_states = {}
    global running_total
    running_total = 0.0

    # Função para calcular e exibir a soma acumulada e porcentagem por sensor
    def calculate_running_total_and_percentage(batch_df, epoch_id):
        global sensor_states, running_total

        # Atualizar o estado e calcular a soma incremental
        for row in batch_df.collect():
            sensor = row["sensor"]
            energy = row["energy"]

            # Se o sensor não estiver no estado, inicialize com a primeira leitura
            if sensor not in sensor_states:
                sensor_states[sensor] = energy
                running_total += energy
            else:
                # Calcular a diferença entre a leitura atual e a última conhecida
                energy_difference = energy - sensor_states[sensor]

                # Atualizar o total acumulado e o estado do sensor
                if energy_difference > 0:  # Garantir que só valores crescentes sejam somados
                    running_total += energy_difference
                sensor_states[sensor] = energy

        # Calcular e exibir os percentuais por sensor
        sensor_percentages = []
        for sensor, energy in sensor_states.items():
            percentage = (energy / running_total) * 100 if running_total > 0 else 0
            sensor_percentages.append((sensor, percentage))

        # Ordenar em ordem decrescente de porcentagem e por nome do sensor
        sorted_percentages = sorted(sensor_percentages, key=lambda x: (-x[1], x[0]))

        # Exibir os resultados
        print(f"Batch {epoch_id} - Running Total Energy: {running_total}")
        print("Sensor Percentages (sorted):")
        for sensor, percentage in sorted_percentages:
            print(f"  Sensor {sensor}: {percentage:.2f}%")

    # Definir a consulta para processar o fluxo
    query = json_lines \
        .writeStream \
        .outputMode("append") \
        .trigger(processingTime='30 seconds') \
        .foreachBatch(calculate_running_total_and_percentage) \
        .start()

    query.awaitTermination(500)
except Exception as err:
    print(err)
    if 'query' in locals():
        query.stop()


Batch 0 - Running Total Energy: 0.0
Sensor Percentages (sorted):
Batch 1 - Running Total Energy: 28056.18
Sensor Percentages (sorted):
  Sensor D: 21.20%
  Sensor H: 14.24%
  Sensor E: 12.41%
  Sensor C: 9.95%
  Sensor I: 9.56%
  Sensor J: 7.71%
  Sensor G: 7.42%
  Sensor B: 5.95%
  Sensor A: 5.88%
  Sensor F: 5.69%
Batch 2 - Running Total Energy: 28056.60999999999
Sensor Percentages (sorted):
  Sensor D: 21.20%
  Sensor H: 14.24%
  Sensor E: 12.41%
  Sensor C: 9.94%
  Sensor I: 9.56%
  Sensor J: 7.71%
  Sensor G: 7.42%
  Sensor B: 5.95%
  Sensor A: 5.88%
  Sensor F: 5.69%
Batch 3 - Running Total Energy: 30204.559999999983
Sensor Percentages (sorted):
  Sensor D: 19.70%
  Sensor H: 13.22%
  Sensor E: 11.53%
  Sensor C: 9.24%
  Sensor I: 8.88%
  Sensor J: 7.16%
  Sensor K: 7.11%
  Sensor G: 6.89%
  Sensor B: 5.53%
  Sensor A: 5.46%
  Sensor F: 5.29%
Batch 4 - Running Total Energy: 30205.079999999973
Sensor Percentages (sorted):
  Sensor D: 19.70%
  Sensor H: 13.22%
  Sensor E: 11.53%
  

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=41>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/sock

An error occurred while calling o47.awaitTermination
