<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo"  />
    </a>
</p>


## Leveraging Apache Spark for Smart Building HVAC Monitoring

**Estimated time needed: 30 minutes**

### Objectives

After completing this lab, you will be able to:

- Explain the distributed architecture of Spark in the context of smart building monitoring
- Simulate real-time sensor data for HVAC systems in a building
- Perform SQL queries to detect critical environmental conditions and calculate average readings
- Determine the aggregated results to the console for immediate insights into room conditions


## Background
Smart Building Solutions, Inc. specializes in optimizing HVAC (heating, ventilation, and air conditioning) systems to enhance comfort and energy efficiency in commercial buildings. By monitoring temperature and humidity levels in real-time across various rooms, the company aims to ensure optimal indoor conditions and preemptively address potential HVAC issues.

With a continuous influx of sensor data, Smart Building Solutions needs to process and analyze this data in real-time to maintain the quality of the indoor environment.

## Data set description
The simulated data set comprises:

`room_id`: Unique identifier for each room (e.g., R001, R002).

`temperature`: Current temperature reading from the sensor (in °C).

`humidity`: Current humidity level reading from the sensor (in %).

`timestamp`: Time when the reading was recorded (automatically generated by Spark).
The data is generated at a rate of 5 rows per second, simulating multiple rooms with various environmental conditions.


## Challenges
Monitoring indoor environmental conditions poses several challenges:

**High data velocity**: Continuous data from multiple sensors can overwhelm traditional systems.

**Need for immediate alerts**: Delays in identifying critical conditions can lead to discomfort or system inefficiencies.

**Need for data aggregation and analysis**: Efficiently aggregating and analyzing real-time data for proactive maintenance and optimization is essential.

## Apache Spark with structured streaming
To address these challenges, Apache Spark is employed for its powerful distributed computing capabilities, enabling real-time data processing and analytics.


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

#import functions/Classes for sparkml

from pyspark.ml.clustering import KMeans


from pyspark.sql import SparkSession


### Set up the Spark session:


In [3]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Smart Building HVAC Monitoring") \
    .getOrCreate()


25/04/27 05:52:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/27 05:52:54 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Simulate sensor data:

Use Spark’s rate source to generate continuous readings from multiple rooms.


In [4]:
from pyspark.sql.functions import expr, rand,when

# Simulate sensor data with room IDs and readings
sensor_data = spark.readStream.format("rate").option("rowsPerSecond", 5).load() \
    .withColumn("room_id", expr("CAST(value % 10 AS STRING)")) \
    .withColumn("temperature", when(expr("value % 10 == 0"), 15)  # Set temperature to 15 for one specific record
                .otherwise(20 + rand() * 25)) \
    .withColumn("humidity", expr("40 + rand() * 30"))

### Create a temporary SQL view:

Create temporary SQL view to perform SQL queries on the streaming data.


In [9]:
# Create a temporary SQL view for the sensor data
sensor_data.createOrReplaceTempView("sensor_table")

In [20]:
spark.sql("SELECT * FROM sensor_table").writeStream.format("update").outputMode("append").start().awaitTermination()


-------------------------------------------
Batch: 410
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       temperature|          humidity|
+--------------------+-----+-------+------------------+------------------+
|2025-04-27 05:45:...| 2045|      5|24.667459865756122| 66.58159287154736|
|2025-04-27 05:45:...| 2046|      6|25.707231656859477| 58.01330919060051|
|2025-04-27 05:45:...| 2047|      7|21.657909299116987|46.430818108146696|
|2025-04-27 05:45:...| 2048|      8| 24.31009170724244| 41.93848567790332|
|2025-04-27 05:45:...| 2049|      9|32.246884127426235|48.093689968405755|
+--------------------+-----+-------+------------------+------------------+

-------------------------------------------
Batch: 2542
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       tempera

Py4JJavaError: An error occurred while calling o459.start.
: java.lang.ClassNotFoundException: Failed to find data source: update. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:307)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: update.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
	at scala.util.Try.orElse(Try.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
	... 12 more


-------------------------------------------
Batch: 2581
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       temperature|          humidity|
+--------------------+-----+-------+------------------+------------------+
|2025-04-27 05:45:...|12960|      0|              15.0|  49.0849624965287|
|2025-04-27 05:45:...|12961|      1| 26.96058860769233|53.442704677178554|
|2025-04-27 05:45:...|12962|      2|21.533935790438992| 59.83152776966648|
|2025-04-27 05:45:...|12963|      3|38.171550317743986|  41.2330023848423|
|2025-04-27 05:45:...|12964|      4|23.321094209636243|42.232915055711736|
+--------------------+-----+-------+------------------+------------------+

-------------------------------------------
Batch: 411
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       tempera

### Define SQL queries for aggregation and analysis:

* **Critical temperature query**: Detect rooms with critical temperature levels
* **Average readings query**: Calculate average readings over a 1-minute window
* **Attention needed query**: Identify rooms that need immediate attention based on humidity levels


In [22]:
# SQL Query to detect rooms with critical temperatures
critical_temperature_query = """
    SELECT 
        room_id, 
        temperature, 
        humidity, 
        timestamp 
    FROM sensor_table 
    WHERE temperature < 18 OR temperature > 60
"""

# SQL Query to calculate average readings over a 1-minute window
average_readings_query = """
    SELECT 
        room_id, 
        AVG(temperature) AS avg_temperature, 
        AVG(humidity) AS avg_humidity, 
        window.start AS window_start 
    FROM sensor_table
    GROUP BY room_id, window(timestamp, '1 minute')
"""

# SQL Query to find rooms that need immediate attention based on humidity
attention_needed_query = """
    SELECT 
        room_id, 
        COUNT(*) AS critical_readings 
    FROM sensor_table 
    WHERE humidity < 45 OR humidity > 75
    GROUP BY room_id
"""


-------------------------------------------
Batch: 2743
-------------------------------------------
+--------------------+-----+-------+-----------------+-----------------+
|           timestamp|value|room_id|      temperature|         humidity|
+--------------------+-----+-------+-----------------+-----------------+
|2025-04-27 05:47:...|13780|      0|             15.0|50.28417754981318|
|2025-04-27 05:47:...|13781|      1|33.32359881828869|55.36488195675908|
|2025-04-27 05:47:...|13782|      2|33.86518099735714|51.70887922999009|
|2025-04-27 05:47:...|13783|      3|21.17241910071797|41.20304856008298|
|2025-04-27 05:47:...|13784|      4|41.46064207831772|46.54506539864521|
+--------------------+-----+-------+-----------------+-----------------+

-------------------------------------------
Batch: 2704
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       temperature|          hu

### Execute the SQL queries:

Execute each SQL query to create streaming DataFrames.


In [7]:
# Execute the critical temperature query
critical_temperatures_stream = spark.sql(critical_temperature_query)


# Execute the average readings query
average_readings_stream = spark.sql(average_readings_query)

# Execute the attention needed query
attention_needed_stream = spark.sql(attention_needed_query)






### Output the results to the console:

Display the results from each query in real-time.


In [23]:
# Output the results to the console for all queries
critical_query = critical_temperatures_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("Critical Temperatures") \
    .start()

average_query = average_readings_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Average Readings") \
    .start()

attention_query = attention_needed_stream.writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("Attention Needed") \
    .start()



-------------------------------------------
Batch: 2801
-------------------------------------------


NameError: name 'critical_temperatures_stream' is not defined

+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       temperature|          humidity|
+--------------------+-----+-------+------------------+------------------+
|2025-04-27 05:49:...|14065|      5| 40.25737891784146| 51.65630901307872|
|2025-04-27 05:49:...|14066|      6|  33.7655797392339|42.706725530131784|
|2025-04-27 05:49:...|14067|      7| 24.44941960635122| 44.89584283756724|
|2025-04-27 05:49:...|14068|      8| 42.35643562165517| 42.82010746983632|
|2025-04-27 05:49:...|14069|      9|27.498204767499484| 56.36166305299828|
+--------------------+-----+-------+------------------+------------------+

-------------------------------------------
Batch: 676
-------------------------------------------
+--------------------+-----+-------+------------------+------------------+
|           timestamp|value|room_id|       temperature|          humidity|
+--------------------+-----+-------+------------------+------------------+


### Keep the streams running:

Ensure that the streaming queries continue to run to process incoming data.


In [None]:
# Keep the streams running

print("********Critical Temperature Values*******")
critical_query.awaitTermination()

print("********Average Readings Values********")
average_query.awaitTermination()
print("********Attention Needed Values********")
attention_query.awaitTermination()


### Conclusion
In this lab, you explored the use of Apache Spark in smart building monitoring, particularly focusing on HVAC (heating, ventilation, and air conditioning) systems. You now understand the Spark's distributed architecture. You also understand how to simulate real-time sensor data for temperature and humidity, execute SQL queries to identify critical environmental conditions, and output aggregated results for immediate insights.


## Author(s)

Lakshmi Holla

## Other Contributors
Malika Singla
