<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo"  />
    </a>
</p>


## Leveraging Apache Spark for Smart Building HVAC Monitoring

**Estimated time needed: 30 minutes**

### Objectives

After completing this lab, you will be able to:

- Explain the distributed architecture of Spark in the context of smart building monitoring
- Simulate real-time sensor data for HVAC systems in a building
- Perform SQL queries to detect critical environmental conditions and calculate average readings
- Determine the aggregated results to the console for immediate insights into room conditions


## Background
Smart Building Solutions, Inc. specializes in optimizing HVAC (heating, ventilation, and air conditioning) systems to enhance comfort and energy efficiency in commercial buildings. By monitoring temperature and humidity levels in real-time across various rooms, the company aims to ensure optimal indoor conditions and preemptively address potential HVAC issues.

With a continuous influx of sensor data, Smart Building Solutions needs to process and analyze this data in real-time to maintain the quality of the indoor environment.

## Data set description
The simulated data set comprises:

`room_id`: Unique identifier for each room (e.g., R001, R002).

`temperature`: Current temperature reading from the sensor (in °C).

`humidity`: Current humidity level reading from the sensor (in %).

`timestamp`: Time when the reading was recorded (automatically generated by Spark).
The data is generated at a rate of 5 rows per second, simulating multiple rooms with various environmental conditions.


## Challenges
Monitoring indoor environmental conditions poses several challenges:

**High data velocity**: Continuous data from multiple sensors can overwhelm traditional systems.

**Need for immediate alerts**: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.

**Need for data aggregation and analysis**: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

## Apache Spark with structured streaming
To address these challenges, Apache Spark is employed for its powerful distributed computing capabilities, enabling real-time data processing and analytics.


In [1]:
# !pip install pyspark==3.1.2 -q
# !pip install findspark -q

In [2]:
# # You can also use this section to suppress warnings generated by your code:
# def warn(*args, **kwargs):
#     pass
# import warnings
# warnings.warn = warn
# warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.clustering import KMeans


from pyspark.sql import SparkSession
from pyspark import SparkConf

### Set up the Spark session:


In [3]:
session_config = SparkConf()
session_config.setMaster('local[3]') \
              .setAll([
                ("spark.sql.legacy.timeParserPolicy", "LEGACY"),
                ("spark.executor.memory", "12g"),
                ("spark.driver.memory", "1g")
                # ("spark.executor.cores", "3")
            ])
# Initialize Spark Session
spark = SparkSession.builder.appName("Smart Building HVAC Monitoring").config(conf=session_config).getOrCreate()

25/12/11 12:27:42 WARN Utils: Your hostname, MICKYXPS15 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/11 12:27:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/11 12:27:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.active()

In [5]:
spark._getActiveSessionOrCreate()

### Simulate sensor data:

Use Spark’s rate source to generate continuous readings from multiple rooms.


In [6]:
from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

### Create a temporary SQL view:

Create temporary SQL view to perform SQL queries on the streaming data.


In [7]:
# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")

### Define SQL queries for aggregation and analysis:

* **Critical temperature query**: Detect rooms with critical temperature levels
* **Average readings query**: Calculate average readings over a 1-minute window
* **Attention needed query**: Identify rooms that need immediate attention based on humidity levels


In [8]:
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT 
        room_id, 
        temperature, 
        humidity, 
        timestamp 
    FROM sensor_table 
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT 
        room_id, 
        AVG(temperature) AS avg_temperature, 
        AVG(humidity) AS avg_humidity, 
        window.start AS window_start 
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT 
        room_id, 
        COUNT(*) AS critical_readings 
    FROM sensor_table 
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""


### Execute the SQL queries:

Execute each SQL query to create streaming DataFrames.


In [9]:
# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)
# Create a temporary SQL view for the filtered critical temperature stream
critical_temperatures_stream.createOrReplaceTempView("critical_temp_view")

# Insert this in the cell before the .writeStream operation (Cell 10 or 11)

# SQL Query to format the critical data into a single String column for the 'text' sink
sql_format_critical_data = """
    SELECT 
        -- Concatenate columns into a single string, separated by a comma (or another delimiter)
        CONCAT_WS(',', 
            room_id, 
            CAST(temperature AS STRING), -- Explicitly cast DOUBLE to STRING
            CAST(humidity AS STRING),    -- Also cast humidity (likely DOUBLE) to STRING
            timestamp
        ) AS output_record
    FROM 
        critical_temp_view
"""

# Execute the SQL query to create the valid streaming DataFrame
formatted_critical_stream = spark.sql(sql_format_critical_data)


# Execute the average readings query
# average_readings_stream = spark.sql(average_readings_query)
# average_readings_stream.createOrReplaceTempView("average_readings_view")

# # Execute the attention needed query
# attention_needed_stream = spark.sql(attention_needed_query)
# attention_needed_stream.createOrReplaceTempView("attention_needed_view")

In [None]:
# # Execute the critical temperature query
# critical_temperatures_stream = spark.sql(critical_temperature_query)


# # Execute the average readings query
# average_readings_stream = spark.sql(average_readings_query)

# # Execute the attention needed query
# attention_needed_stream = spark.sql(attention_needed_query)

### Output the results to the console:

Display the results from each query in real-time.


In [10]:
output_path_1 = "tmp/critical_query"
output_path_2 = "tmp/average_query"
output_path_3 = "tmp/attention_query"

In [None]:
# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("text") \
    .option("path", output_path_1) \
    .option("CheckpointLocation", output_path_1 +  "/checkpoint") \
    .queryName("Critical Temperatures") \
    .start()

# average_query = average_readings_stream.writeStream \
#     .outputMode("complete") \
#     .format("text") \
#     .option("path", output_path_2) \
#     .option("CheckpointLocation", output_path_2 +  "/checkpoint") \
#     .queryName("Average Readings") \
#     .start()

# attention_query = attention_needed_stream.writeStream \
#     .outputMode("complete") \
#     .format("text") \
#     .option("path", output_path_3) \
#     .option("CheckpointLocation", output_path_3 +  "/checkpoint") \
#     .queryName("Attention Needed") \
#     .start()

25/12/11 12:30:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/12/11 12:30:39 ERROR MicroBatchExecution: Query Critical Temperatures [id = f6657843-ea49-410f-a50c-1a849524ac1d, runId = 478360ad-c3b3-4073-982d-9965cb98efe3] terminated with error
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The Text datasource doesn't support the column `temperature` of the type "DOUBLE".
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataTypeUnsupportedByDataSourceError(QueryCompilationErrors.scala:1650)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$1(DataSourceUtils.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$1$adapted(DataSourceUtils.scala:90)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(D

In [None]:
# # Output the results to the console for all queries
# critical_query = critical_temperatures_stream.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .queryName("Critical Temperatures") \
#     .start()

# average_query = average_readings_stream.writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .queryName("Average Readings") \
#     .start()

# attention_query = attention_needed_stream.writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .queryName("Attention Needed") \
#     .start()

### Keep the streams running:

Ensure that the streaming queries continue to run to process incoming data.


In [None]:
# Keep the streams running

print("********Critical Temperature Values*******")
critical_query.awaitTermination()

print("********Average Readings Values********")
average_query.awaitTermination()

print("********Attention Needed Values********")
attention_query.awaitTermination()

### Conclusion
In this lab, you explored the use of Apache Spark in smart building monitoring, particularly focusing on HVAC (heating, ventilation, and air conditioning) systems. You now understand the Spark's distributed architecture. You also understand how to simulate real-time sensor data for temperature and humidity, execute SQL queries to identify critical environmental conditions, and output aggregated results for immediate insights.


## Author(s)

Lakshmi Holla

## Other Contributors
Malika Singla
