## Leveraging Apache Spark for Smart Building HVAC Monitoring


### Objectives

After completing this lab, you will be able to:

- Explain the distributed architecture of Spark in the context of smart building monitoring
- Simulate real-time sensor data for HVAC systems in a building
- Perform SQL queries to detect critical environmental conditions and calculate average readings
- Determine the aggregated results to the console for immediate insights into room conditions


## Background
Smart Building Solutions, Inc. specializes in optimizing HVAC (heating, ventilation, and air conditioning) systems to enhance comfort and energy efficiency in commercial buildings. By monitoring temperature and humidity levels in real-time across various rooms, the company aims to ensure optimal indoor conditions and preemptively address potential HVAC issues.

With a continuous influx of sensor data, Smart Building Solutions needs to process and analyze this data in real-time to maintain the quality of the indoor environment.

## Data set description
The simulated data set comprises:

`room_id`: Unique identifier for each room (e.g., R001, R002).

`temperature`: Current temperature reading from the sensor (in °C).

`humidity`: Current humidity level reading from the sensor (in %).

`timestamp`: Time when the reading was recorded (automatically generated by Spark).
The data is generated at a rate of 5 rows per second, simulating multiple rooms with various environmental conditions.


## Challenges
Monitoring indoor environmental conditions poses several challenges:

**High data velocity**: Continuous data from multiple sensors can overwhelm traditional systems.

**Need for immediate alerts**: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.

**Need for data aggregation and analysis**: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

## Apache Spark with structured streaming
To address these challenges, Apache Spark is employed for its powerful distributed computing capabilities, enabling real-time data processing and analytics.


In [None]:
%pip install pyspark


### Set up the Spark session:


In [None]:
from pyspark.sql import SparkSession

# Windows: prepare HADOOP_HOME and winutils diagnostics to avoid NativeIO errors
import os
import sys

# Set this to the folder where you'll place winutils.exe (e.g. C:\hadoop)
# If you already have HADOOP_HOME set system-wide you can omit changing it here.
hadoop_home = r"C:\Users\farra\spark\spark-4.0.0-bin-hadoop3"
os.environ.setdefault("HADOOP_HOME", hadoop_home)
# Disable Hadoop file locking (works around some Windows permission/native IO issues)
os.environ.setdefault("HADOOP_CLIENT_DISABLE_FILE_LOCKING", "true")

winutils_path = os.path.join(os.environ.get("HADOOP_HOME", ""), "bin", "winutils.exe")
if os.path.exists(winutils_path):
    # Add winutils bin to PATH so Spark/Hadoop can locate it
    os.environ["PATH"] = os.path.dirname(winutils_path) + os.pathsep + os.environ.get("PATH", "")
    print(f"Found winutils.exe at: {winutils_path}")
else:
    print("winutils.exe not found at:", winutils_path)
    print("If you're on Windows, download a winutils.exe that matches your Hadoop version and place it at the path above (HADOOP_HOME\\bin).")
    print("Common fixes: set HADOOP_HOME to the folder containing bin\\winutils.exe, or run Spark inside WSL/Docker to avoid Windows native IO.")

print("HADOOP_HOME=", os.environ.get("HADOOP_HOME"))
print("HADOOP_CLIENT_DISABLE_FILE_LOCKING=", os.environ.get("HADOOP_CLIENT_DISABLE_FILE_LOCKING"))

# Initialize Spark Session
# Initialize Spark Session
# Enable Python faulthandler for better tracebacks
spark = SparkSession.builder \
    .appName("Smart Building HVAC Monitoring") \
    .getOrCreate()


### Simulate sensor data:

Use Spark’s rate source to generate continuous readings from multiple rooms.


In [None]:
from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

### Create a temporary SQL view:

Create temporary SQL view to perform SQL queries on the streaming data.


In [None]:
# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")


### Define SQL queries for aggregation and analysis:

* **Critical temperature query**: Detect rooms with critical temperature levels
* **Average readings query**: Calculate average readings over a 1-minute window
* **Attention needed query**: Identify rooms that need immediate attention based on humidity levels


In [None]:
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT 
        room_id, 
        temperature, 
        humidity, 
        timestamp 
    FROM sensor_table 
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT 
        room_id, 
        AVG(temperature) AS avg_temperature, 
        AVG(humidity) AS avg_humidity, 
        window.start AS window_start 
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT 
        room_id, 
        COUNT(*) AS critical_readings 
    FROM sensor_table 
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""


### Execute the SQL queries:

Execute each SQL query to create streaming DataFrames.


In [None]:
# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)


# Execute the average readings query
average_readings_stream = spark.sql(average_readings_query)

# Execute the attention needed query
attention_needed_stream = spark.sql(attention_needed_query)






### Output the results to the console:

Display the results from each query in real-time.


In [None]:
# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("Critical Temperatures") \
    .start()

average_query = average_readings_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Average Readings") \
    .start()

attention_query = attention_needed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Attention Needed") \
    .start()



### Keep the streams running:

Ensure that the streaming queries continue to run to process incoming data.


In [None]:
# Keep the streams running
import traceback

# Note: awaitTermination() will block — we wrap each call to surface errors clearly.
try:
    print("********Critical Temperature Values*******")
    critical_query.awaitTermination()
except Exception as e:
    print("StreamingQueryException or other error while running critical_query:")
    traceback.print_exc()
    try:
        print("Java exception object:", e.java_exception)
    except Exception:
        pass

try:
    print("********Average Readings Values********")
    average_query.awaitTermination()
except Exception as e:
    print("StreamingQueryException or other error while running average_query:")
    traceback.print_exc()
    try:
        print("Java exception object:", e.java_exception)
    except Exception:
        pass

try:
    print("********Attention Needed Values********")
    attention_query.awaitTermination()
except Exception as e:
    print("StreamingQueryException or other error while running attention_query:")
    traceback.print_exc()
    try:
        print("Java exception object:", e.java_exception)
    except Exception:
        pass

In [None]:
spark.stop()