<a href="https://colab.research.google.com/github/dansarmiento/machine_learning_notebooks/blob/main/Spark_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Challenges

Monitoring indoor environmental conditions poses several challenges:

High data velocity: Continuous data from multiple sensors can overwhelm traditional systems.

Need for immediate alerts: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.

Need for data aggregation and analysis: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

Apache Spark with structured streaming

To address these challenges, Apache Spark is employed for its powerful distributed computing capabilities, enabling real-time data processing and analytics.

In [1]:
!pip install pyspark==3.5 -q
!pip install findspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.clustering import KMeans


from pyspark.sql import SparkSession


In [3]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Smart Building HVAC Monitoring") \
    .getOrCreate()

In [4]:
from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

In [5]:
# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")


Define SQL queries for aggregation and analysis:

- Critical temperature query: Detect rooms with critical temperature levels
- Average readings query: Calculate average readings over a 1-minute window
- Attention needed query: Identify rooms that need immediate attention based on humidity levels

In [6]:
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT
        room_id,
        temperature,
        humidity,
        timestamp
    FROM sensor_table
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT
        room_id,
        AVG(temperature) AS avg_temperature,
        AVG(humidity) AS avg_humidity,
        window.start AS window_start
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT
        room_id,
        COUNT(*) AS critical_readings
    FROM sensor_table
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""

In [7]:
# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)


# Execute the average readings query
average_readings_stream = spark.sql(average_readings_query)

# Execute the attention needed query
attention_needed_stream = spark.sql(attention_needed_query)


In [8]:
# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("Critical Temperatures") \
    .start()

average_query = average_readings_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Average Readings") \
    .start()

attention_query = attention_needed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Attention Needed") \
    .start()


In [9]:
# Keep the streams running

print("********Critical Temperature Values*******")
critical_query.awaitTermination()

print("********Average Readings Values********")
average_query.awaitTermination()
print("********Attention Needed Values********")
attention_query.awaitTermination()

********Critical Temperature Values*******


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 