In [None]:
"""
Q4 â€“ Structured Streaming on sensor data.
This is your exact script, modified to run in a notebook.
"""

import sys
from pyspark.sql import SparkSession, functions as F, types as T
import os

# Stop any existing Spark session to start fresh
try:
    spark.stop()
except:
    pass

def create_spark(app_name: str = "Q4_Sensor_Streaming") -> SparkSession:
    # Note: `spark` will be a global variable
    global spark 
    spark = (
        SparkSession.builder
        .appName(app_name)
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    return spark


def main(input_dir: str):
    # create_spark() defines the global `spark` variable
    spark = create_spark()

    schema = T.StructType([
        T.StructField("sensor_id", T.StringType(), nullable=False),
        T.StructField("temperature", T.DoubleType(), nullable=True),
        T.StructField("timestamp", T.StringType(), nullable=True),
    ])

    raw_stream = (
        spark.readStream
        .schema(schema)
        .json(input_dir)
    )

    df = (
        raw_stream
        .withColumn("event_time", F.to_timestamp("timestamp"))
        .filter(F.col("event_time").isNotNull())
    )

    # ---------------- Part A: per-sensor aggregates ----------------
    part_a = (
        df.groupBy("sensor_id")
          .agg(
              F.avg("temperature").alias("avg_temp"),
              F.count("*").alias("reading_count"),
              F.min("temperature").alias("min_temp"),
              F.max("temperature").alias("max_temp"),
          )
    )

    query_a = (
        part_a.writeStream
        .outputMode("update")
        .format("console")
        .option("truncate", "false")
        .trigger(processingTime="10 seconds")
        .start()
    )

    # ---------------- Part B: windowed aggregates ----------------
    df_wm = df.withWatermark("event_time", "2 minutes")

    # 5-minute tumbling window: average across all sensors.
    tumbling_5 = (
        df_wm
        .groupBy(F.window("event_time", "5 minutes"))
        .agg(
            F.avg("temperature").alias("avg_temp"),
            F.count("*").alias("count")
        )
        .select(
            F.col("window.start").alias("window_start"),
            F.col("window.end").alias("window_end"),
            F.col("avg_temp"),
            F.col("count")
        )
    )

    query_b1 = (
        tumbling_5.writeStream
        .outputMode("update")
        .format("console")
        .option("truncate", "false")
        .trigger(processingTime="10 seconds")
        .start()
    )

    # 10-minute sliding window (slide every 5 minutes),
    # per-sensor max temp and count of readings.
    sliding_10_5 = (
        df_wm
        .groupBy(F.window("event_time", "10 minutes", "5 minutes"), "sensor_id")
        .agg(
            F.max("temperature").alias("max_temp"),
            F.count("*").alias("count"),
        )
        .select(
            F.col("window.start").alias("window_start"),
            F.col("window.end").alias("window_end"),
            "sensor_id",
            "max_temp",
            "count",
        )
    )
    
    query_b2 = (
        sliding_10_5.writeStream
        .outputMode("update")
        .format("console")
        .option("truncate", "false")
        .trigger(processingTime="10 seconds")
        .start()
    )
    
    # We DO NOT call awaitAnyTermination() here, so the cell doesn't block.

# --- This is the part that was at the bottom of your script ---
input_path = "/home/jovyan/sensor_input"

# Clean the directory first
os.system(f"rm -f {input_path}/*")

# Call the main function to start the streams
main(input_path)

print("Streaming queries started. Run the next cell to copy the data file.")



In [2]:
%%bash

# 2. Copy the entire data file into the folder
echo "Copying data file..."
# Use the full, absolute path to find the file
cp /home/jovyan/shared/midterm/q4_sensor_data.json /home/jovyan/sensor_input/
echo "Data file copied. Check the output of Cell 1."

Copying data file...


cp: cannot stat 'shared/midterm/q4_sensor_data.json': No such file or directory


Data file copied. Check the output of Cell 1.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S010     |73.764           |5            |72.59   |74.46   |
|S002     |68.61            |4            |65.23   |74.56   |
|S006     |71.28363636363635|11           |65.48   |79.31   |
|S004     |75.35666666666667|3            |72.14   |79.46   |
|S007     |71.2325          |8            |65.43   |78.78   |
|S001     |76.52666666666666|3            |75.06   |77.46   |
|S003     |73.82333333333332|6            |59.64   |88.17   |
|S008     |69.96            |4            |65.83   |72.92   |
|S009     |74.14166666666667|6            |65.67   |89.67   |
+---------+-----------------+-------------+--------+--------+


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+-------------------+-----------------+-----+
|window_start       |window_end         |avg_temp         |count|
+-------------------+-------------------+-----------------+-----+
|2024-01-16 00:15:00|2024-01-16 00:20:00|73.82318181818182|22   |
|2024-01-16 00:20:00|2024-01-16 00:25:00|70.4845          |20   |
|2024-01-16 00:10:00|2024-01-16 00:15:00|75.30999999999999|6    |
|2024-01-16 00:25:00|2024-01-16 00:30:00|67.43            |2    |
+-------------------+-------------------+-----------------+-----+



25/11/09 16:33:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 17957 milliseconds
25/11/09 16:33:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 16367 milliseconds
25/11/09 16:33:06 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 17347 milliseconds
                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S002     |69.1425          |8            |65.23   |74.56   |
|S010     |74.591           |10           |65.17   |84.82   |
|S005     |71.27555555555556|9            |66.2    |77.09   |
|S004     |73.39999999999999|7            |68.29   |79.46   |
|S007     |72.128           |15           |65.27   |78.99   |
|S006     |72.03466666666667|15           |65.48   |79.73   |
|S001     |74.86307692307695|13           |66.92   |79.71   |
|S003     |72.89666666666666|9            |59.64   |88.17   |
|S009     |72.1175          |8            |65.05   |89.67   |
|S008     |70.78166666666665|6            |65.24   |79.61   |
+---------+-----------------+-------------+--------+--------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+-----------------+-----+
|window_start       |window_end         |avg_temp         |count|
+-------------------+-------------------+-----------------+-----+
|2024-01-16 00:20:00|2024-01-16 00:25:00|71.46707317073171|41   |
|2024-01-16 00:25:00|2024-01-16 00:30:00|72.17360000000001|25   |
|2024-01-16 00:30:00|2024-01-16 00:35:00|72.87166666666667|6    |
+-------------------+-------------------+-----------------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+-------------------+---------+--------+-----+
|window_start       |window_end         |sensor_id|max_temp|count|
+-------------------+-------------------+---------+--------+-----+
|2024-01-16 00:25:00|2024-01-16 00:35:00|S002     |69.17   |3    |
|2024-01-16 00:20:00|2024-01-16 00:30:00|S007     |78.99   |11   |
|2024-01-16 00:20:00|2024-01-16 00:30:00|S006     |79.73   |10   |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S003     |73.71   |1    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S005     |77.09   |7    |
|2024-01-16 00:20:00|2024-01-16 00:30:00|S002     |74.56   |6    |
|2024-01-16 00:20:00|2024-01-16 00:30:00|S003     |73.38   |4    |
|2024-01-16 00:20:00|2024-01-16 00:30:00|S004     |79.46   |5    |
|2024-01-16 00:15:00|2024-01-16 00:25:00|S004     |79.46   |5    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S004     |72.82   |1    |
|2024-01-16 00:25:00|2024-01-16 

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------+----------+--------+-----+
|window_start|window_end|avg_temp|count|
+------------+----------+--------+-----+
+------------+----------+--------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------+----------+---------+--------+-----+
|window_start|window_end|sensor_id|max_temp|count|
+------------+----------+---------+--------+-----+
+------------+----------+---------+--------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------------------+---------+--------+-----+
|window_start       |window_end         |sensor_id|max_temp|count|
+-------------------+-------------------+---------+--------+-----+
|2024-01-16 00:30:00|2024-01-16 00:40:00|S003     |78.16   |4    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S002     |79.1    |7    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S002     |77.04   |1    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S001     |79.89   |12   |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S004     |74.2    |6    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S009     |79.14   |6    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S005     |78.27   |8    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S006     |77.76   |1    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S010     |72.64   |1    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S010     |84.82   |3    |
|2024-01-16 00:25:00|2024-01-16 

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S010     |73.66199999999999|15           |65.17   |84.82   |
|S002     |70.93846153846154|13           |65.23   |79.1    |
|S004     |72.12249999999999|12           |66.58   |79.46   |
|S007     |73.34166666666667|18           |65.27   |79.74   |
|S006     |72.77000000000001|18           |65.48   |79.73   |
|S005     |71.975           |10           |66.2    |78.27   |
|S008     |71.89333333333333|15           |65.24   |79.68   |
|S001     |74.18380952380953|21           |65.73   |79.89   |
|S003     |72.2575          |12           |56.18   |88.17   |
|S009     |72.263125        |16           |65.05   |89.67   |
+---------+-----------------+-------------+--------+--------+

----------------------------------

[Stage 23:==> (2 + 2) / 4][Stage 25:==> (2 + 0) / 4][Stage 26:>   (0 + 0) / 1]4]

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+-------------------+---------+--------+-----+
|window_start       |window_end         |sensor_id|max_temp|count|
+-------------------+-------------------+---------+--------+-----+
|2024-01-16 00:30:00|2024-01-16 00:40:00|S003     |78.16   |8    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S002     |79.14   |8    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S002     |77.04   |4    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S010     |84.82   |7    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S005     |89.64   |5    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S005     |79.48   |4    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S008     |76.84   |4    |
|2024-01-16 00:25:00|2024-01-16 00:35:00|S004     |74.2    |7    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S009     |79.14   |8    |
|2024-01-16 00:30:00|2024-01-16 00:40:00|S006     |77.76   |4    |
|2024-01-16 00:40:00|2024-01-16 

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S010     |73.59863636363636|22           |65.17   |84.82   |
|S002     |71.3029411764706 |17           |65.23   |79.14   |
|S007     |73.55956521739131|23           |65.27   |79.74   |
|S005     |72.88466666666666|15           |65.18   |89.64   |
|S004     |71.734           |15           |65.19   |79.46   |
|S006     |71.65565217391305|23           |65.26   |79.73   |
|S003     |72.08894736842106|19           |56.18   |88.17   |
|S001     |73.5341379310345 |29           |65.73   |79.89   |
|S008     |71.79263157894736|19           |65.24   |79.68   |
|S009     |72.87166666666667|18           |65.05   |89.67   |
+---------+-----------------+-------------+--------+--------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+-------------------+-----------------+-----+
|window_start       |window_end         |avg_temp         |count|
+-------------------+-------------------+-----------------+-----+
|2024-01-16 00:35:00|2024-01-16 00:40:00|72.7427027027027 |37   |
|2024-01-16 00:30:00|2024-01-16 00:35:00|72.5463888888889 |36   |
|2024-01-16 00:40:00|2024-01-16 00:45:00|71.82571428571428|14   |
+-------------------+-------------------+-----------------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------------+----------+---------+--------+-----+
|window_start|window_end|sensor_id|max_temp|count|
+------------+----------+---------+--------+-----+
+------------+----------+---------+--------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------------+----------+--------+-----+
|window_start|window_end|avg_temp|count|
+------------+----------+--------+-----+
+------------+----------+--------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S002     |72.37363636363638|22           |65.23   |82.95   |
|S010     |73.43275862068964|29           |65.17   |84.82   |
|S006     |71.50606060606061|33           |61.25   |79.73   |
|S004     |72.37333333333333|18           |65.19   |79.46   |
|S007     |73.30875         |24           |65.27   |79.74   |
|S005     |72.90631578947368|19           |62.66   |89.64   |
|S001     |73.3171875       |32           |65.73   |79.89   |
|S003     |71.1096          |25           |56.18   |88.17   |
|S008     |71.73583333333333|24           |65.24   |79.68   |
|S009     |72.17083333333333|24           |65.05   |89.67   |
+---------+-----------------+-------------+--------+--------+



                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+-------------------+-------------------+---------+--------+-----+
|window_start       |window_end         |sensor_id|max_temp|count|
+-------------------+-------------------+---------+--------+-----+
|2024-01-16 00:40:00|2024-01-16 00:50:00|S004     |78.34   |2    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S002     |82.95   |8    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S006     |79.04   |4    |
|2024-01-16 00:40:00|2024-01-16 00:50:00|S009     |75.74   |5    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S008     |67.86   |1    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S004     |78.34   |2    |
|2024-01-16 00:40:00|2024-01-16 00:50:00|S003     |75.45   |9    |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S008     |76.84   |8    |
|2024-01-16 00:40:00|2024-01-16 00:50:00|S010     |79.27   |10   |
|2024-01-16 00:35:00|2024-01-16 00:45:00|S010     |79.58   |11   |
|2024-01-16 00:35:00|2024-01-16 

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+---------+-----------------+-------------+--------+--------+
|sensor_id|avg_temp         |reading_count|min_temp|max_temp|
+---------+-----------------+-------------+--------+--------+
|S002     |72.17793103448277|29           |65.23   |82.95   |
|S010     |73.52199999999999|30           |65.17   |84.82   |
|S005     |72.06071428571428|28           |62.66   |89.64   |
|S007     |73.1907142857143 |28           |65.02   |79.74   |
|S006     |70.90769230769232|39           |55.18   |79.73   |
|S004     |72.78153846153846|26           |65.19   |79.46   |
|S001     |73.34297297297297|37           |65.73   |79.89   |
|S009     |71.67892857142859|28           |65.05   |89.67   |
|S008     |71.4275          |28           |65.24   |79.68   |
|S003     |71.32777777777777|27           |56.18   |88.17   |
+---------+-----------------+-------------+--------+--------+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+-------------------+-------------------+-----------------+-----+
|window_start       |window_end         |avg_temp         |count|
+-------------------+-------------------+-----------------+-----+
|2024-01-16 00:45:00|2024-01-16 00:50:00|71.02866666666665|45   |
|2024-01-16 00:55:00|2024-01-16 01:00:00|69.71            |1    |
|2024-01-16 00:50:00|2024-01-16 00:55:00|72.46769230769232|26   |
+-------------------+-------------------+-----------------+-----+



                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+-------------------+-------------------+---------+--------+-----+
|window_start       |window_end         |sensor_id|max_temp|count|
+-------------------+-------------------+---------+--------+-----+
|2024-01-16 00:40:00|2024-01-16 00:50:00|S009     |75.74   |8    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S008     |73.81   |5    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S006     |79.7    |10   |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S004     |79.4    |10   |
|2024-01-16 00:40:00|2024-01-16 00:50:00|S004     |79.27   |5    |
|2024-01-16 00:50:00|2024-01-16 01:00:00|S003     |69.99   |1    |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S005     |78.15   |12   |
|2024-01-16 00:45:00|2024-01-16 00:55:00|S009     |71.49   |5    |
|2024-01-16 00:50:00|2024-01-16 01:00:00|S006     |79.7    |4    |
|2024-01-16 00:50:00|2024-01-16 01:00:00|S007     |77.41   |2    |
|2024-01-16 00:40:00|2024-01-16 

                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+------------+----------+--------+-----+
|window_start|window_end|avg_temp|count|
+------------+----------+--------+-----+
+------------+----------+--------+-----+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------+----------+---------+--------+-----+
|window_start|window_end|sensor_id|max_temp|count|
+------------+----------+---------+--------+-----+
+------------+----------+---------+--------+-----+



In [3]:
# 3. Run this cell when you are finished to stop all queries

print("Stopping all active streaming queries...")
for q in spark.streams.active:
    try:
        q.stop()
        print(f"Stopped query: {q.id}")
    except Exception as e:
        print(f"Could not stop query: {e}")

print("All queries stopped.")

Stopping all active streaming queries...
Stopped query: ef6ff464-66b1-4b2a-b2b5-e5a80651b68d
Stopped query: 63a3565a-c9ae-42f5-be79-9911d52b188d
Stopped query: 57e14cd6-76ad-4729-9ac4-6ed92eb61056
All queries stopped.


25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group 15400968-aba6-44d3-924d-9cf20d91193a. Cannot find active jobs for it.
25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group 15400968-aba6-44d3-924d-9cf20d91193a. Cannot find active jobs for it.
25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group d0293b77-0a32-4086-ac24-f6daaac9c4a4. Cannot find active jobs for it.
25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group d0293b77-0a32-4086-ac24-f6daaac9c4a4. Cannot find active jobs for it.
25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group 8a2ef6b3-37b8-4190-8e2c-bc9367e2f79d. Cannot find active jobs for it.
25/11/09 16:34:14 WARN DAGScheduler: Failed to cancel job group 8a2ef6b3-37b8-4190-8e2c-bc9367e2f79d. Cannot find active jobs for it.
