# Part 2: Streaming application using Spark Structured Streaming  
In this task, you will implement Spark Structured Streaming to consume the data from task 1 and perform a prediction.    
Important:   
-	This task uses PySpark Structured Streaming with PySpark Dataframe APIs and PySpark ML.  
-	You also need your pipeline model from A2A to make predictions and persist the results.  

1.	Write code to create a SparkSession, which 1) uses four cores with a proper application name; 2) use the Melbourne timezone; 3) ensure a checkpoint location has been set.


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[4]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Assignment2B"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name) \
                        .set("spark.sql.streaming.checkpointLocation", "checkpoints")

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Method 1: Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).config("spark.sql.session.timeZone", "GMT+10").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

from pyspark.sql import functions as F

2.	Write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. building information) into data frames. (You can reuse your code from 2A.)


In [None]:
# Adapted from GPT
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, DecimalType, TimestampType
)

# 1. Meters Table
meters_schema = StructType([
    StructField("building_id", IntegerType(), False),
    StructField("meter_type", StringType(), False),   # Char(1) -> StringType
    StructField("ts", TimestampType(), False),
    StructField("value", DecimalType(15, 4), False),
    StructField("row_id", IntegerType(), False)
])

# 2. Buildings Table
buildings_schema = StructType([
    StructField("site_id", IntegerType(), False),
    StructField("building_id", IntegerType(), False),
    StructField("primary_use", StringType(), True),
    StructField("square_feet", IntegerType(), True),
    StructField("floor_count", IntegerType(), True),
    StructField("row_id", IntegerType(), False),
    StructField("year_built", IntegerType(), True),
    StructField("latent_y", DecimalType(6, 4), True),
    StructField("latent_s", DecimalType(6, 4), True),
    StructField("latent_r", DecimalType(6, 4), True)
])

weather_schema = StructType([
    StructField("site_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("air_temperature", DecimalType(5, 3), True),
    StructField("cloud_coverage", DecimalType(5, 3), True), # Is an Integer, but ends with a ".0", so read as a DecimalType
    StructField("dew_temperature", DecimalType(5, 3), True),
    StructField("sea_level_pressure", DecimalType(8, 3), True),
    StructField("wind_direction", DecimalType(5, 3), True), # Is an Integer, but ends with a ".0", so read as a DecimalType
    StructField("wind_speed", DecimalType(5, 3), True),
    StructField("weather_ts", TimestampType(), False) # new field
])



buildings_df = spark.read.csv(
    "data/new_building_information.csv",
    header=True,
    schema=buildings_schema
)

weather_df = spark.read.csv(
    "data/weather.csv",
    header=True,
    schema=weather_schema
)


3.	Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'weather_ts' column, you shall receive it as an Int type. Load the new building information CSV file into a dataframe. Then, the data frames should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.


In [None]:


#configuration
hostip = "192.168.0.6"
topic = 'weather_data'

df_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .load()

df_str = df_raw.selectExpr("CAST(value AS STRING) as json_str")

weather_stream = (
    df_str
    .withColumn("data", F.from_json(F.col("json_str"), F.ArrayType(weather_schema)))
    .select(F.explode(F.col("data")).alias("r"))
    .select("r.*")
)


4.	Use a watermark on weather_ts, if data points are received 5 seconds late, discard the data.

In [4]:
weather_stream = weather_stream.withWatermark("weather_ts", '5 seconds')

5.	Perform the necessary transformation you used in A2A. (note: every student may have used different features, feel free to reuse the code you have written in A2A. If you built an end-to-end pipeline, you can ignore this task.) 

In [None]:

# from A2A which was from GPT
# Get global_means, site_month_means, site_means
# weather_df is history, weather_stream is current

# Split timestamp to date, month, time bucket
weather_df = weather_df.withColumn("date", F.to_date("timestamp")).withColumn(
    "time",
    F.when(F.hour("timestamp") <= 5, "0-6h")
     .when(F.hour("timestamp") <= 11, "6-12h")
     .when(F.hour("timestamp") <= 17, "12-18h")
     .when(F.hour("timestamp") <= 23, "18-24h")
).withColumn("month", F.month("timestamp"))

# Choose which columns to impute
impute_cols = [
    "air_temperature",
    "cloud_coverage",
    "dew_temperature",
    "sea_level_pressure",
    "wind_direction",
    "wind_speed"
]

# Compute global_means, site_month_means, site_means
global_means = weather_df.select(
    *[F.mean(c).alias(c) for c in impute_cols]
).first().asDict()

site_month_means = weather_df.groupBy("site_id", "month").agg(
    *[F.mean(c).alias(f"{c}_site_month_mean") for c in impute_cols]
)

site_means = weather_df.groupBy("site_id").agg(
    *[F.mean(c).alias(f"{c}_site_mean") for c in impute_cols]
)
    
# Data imputation
# Transform weather_stream
# Split timestamp to date, month, time bucket
weather_stream = weather_stream.withColumn("date", F.to_date("timestamp")).withColumn(
    "time",
    F.when(F.hour("timestamp") <= 5, "0-6h")
     .when(F.hour("timestamp") <= 11, "6-12h")
     .when(F.hour("timestamp") <= 17, "12-18h")
     .when(F.hour("timestamp") <= 23, "18-24h")
).withColumn("month", F.month("timestamp"))


# Step 1: site_id + month
weather_stream = weather_stream.join(site_month_means, on=["site_id", "month"], how="left")
for c in impute_cols:
    weather_stream = weather_stream.withColumn(
        c, F.coalesce(c, F.col(f"{c}_site_month_mean"))
    ).drop(f"{c}_site_month_mean")
    
# Step 2: site_id
weather_stream = weather_stream.join(site_means, on="site_id", how="left")
for c in impute_cols:
    weather_stream = weather_stream.withColumn(
        c, F.coalesce(c, F.col(f"{c}_site_mean"))
    ).drop(f"{c}_site_mean")

# Step 3: global fallback
for c in impute_cols:
    weather_stream = weather_stream.withColumn(
        c, F.coalesce(c, F.lit(global_means[c]))
    )

# Add custom columns
weather_stream = (
    weather_stream
    .withColumn("dew_depression", F.col("air_temperature") - F.col("dew_temperature"))
    .withColumn("nonideal_temp", (F.col("air_temperature") - 18)**2)
    .drop("air_temperature")
    .drop("dew_temperature")
)

# No need to add median temp and peak-offpeak as our pipeline model later does not use them
feature_df = buildings_df.join(weather_stream, ["site_id"])


6.	Load your pipeline model and perform the following aggregations:  
a)	Print the prediction from your model as a stream comes in.  
b)	Every 7 seconds, print the total energy consumption for each 6-hour interval, aggregated by building, and print 20 records. (Note: This is simulating energy data each day in a week)  
c)	Every 14 seconds, for each site, print the daily total energy consumption.  

In [7]:

# --- 1. Spark + model setup ---
# from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
import time

model = PipelineModel.load("models/best_model_rmsle")

# --- 2. Apply model ---
predictions = model.transform(feature_df).withColumnRenamed("prediction", "log_power_usage")

checkpoint_dir = os.path.abspath("checkpoints/weather_stream")
os.makedirs(checkpoint_dir, exist_ok=True)

In [12]:
# 6a, 7a
def write_and_debug_live(batch_df, batch_id):
    # 1️⃣ Write batch incrementally to Parquet
    (
        batch_df
        .select("site_id", "building_id", "log_power_usage")
        .write
        .mode("append")
        .parquet("data/live_predictions")
    )

    # 2️⃣ Print a few rows for debugging
    print(f"\n=== Microbatch {batch_id} ===")
    batch_df.select("site_id", "building_id", "log_power_usage").show(5, truncate=False)

query_live_combined = (
    predictions
        .writeStream
        .foreachBatch(write_and_debug_live)
        .outputMode("append")
        .option("checkpointLocation", checkpoint_dir + "/live_predictions")
        .start()
)


In [13]:
# (Task 6b, 7b requires aggregation by 6-hour interval)
building_6h = (
    predictions
        .groupBy(
            "building_id",
            "time",  # The 6-hour interval column you created in Task 5
            F.window("weather_ts", "5 seconds") # Group by 6-hour windows of event-time
        )
        .agg(F.sum("log_power_usage").alias("total_power_6h"))
)

def write_and_debug_building(batch_df, batch_id):
    # 1️⃣ Write batch incrementally to Parquet
    (
        batch_df
        .write
        .mode("append")
        .parquet("data/building_6h")
    )

    # 2️⃣ Print a few rows for debugging
    print(f"\n=== Microbatch {batch_id} ===")
    batch_df.show(5, truncate=False)

query_building_6h = (
    building_6h
        .writeStream
        .foreachBatch(write_and_debug_building)
        .outputMode("append")
        .option("checkpointLocation", checkpoint_dir + "/building_6h")
        .trigger(processingTime="7 seconds")
        .start()
)


In [14]:
# (Task 6c, 7c requires daily aggregation by site)
site_daily = (
    predictions
        .groupBy(
            "site_id", 
            "date", # The date column you created in Task 5
            F.window("weather_ts", "5 seconds") # Group by daily windows of event-time
        )
        .agg(F.sum("log_power_usage").alias("total_power_day"))
)


def write_and_debug_site(batch_df, batch_id):
    # 1️⃣ Write batch incrementally to Parquet
    (
        batch_df
        .write
        .mode("append")
        .parquet("data/site_daily")
    )

    # 2️⃣ Print a few rows for debugging
    print(f"\n=== Microbatch {batch_id} ===")
    batch_df.show(5, truncate=False)

query_site_daily = (
    site_daily
        .writeStream
        .foreachBatch(write_and_debug_site)
        .outputMode("append")
        .option("checkpointLocation", checkpoint_dir + "/site_daily")
        .trigger(processingTime="14 seconds")
        .start()
)



In [15]:
# #### 6a
# # Show live predictions
# query_live = (
#     predictions
#         .select("site_id", "building_id", "log_power_usage")
#         .writeStream
#         .outputMode("append")
#         .format("memory")
#         .queryName("live_predictions")
#         .start()
# )

# print("Waiting for first batch...")
# # Wait for the first microbatch to finish
# while query_live.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)
# spark.sql("select * from live_predictions").show()


In [16]:
# #### 6b
# # (Task 6b requires aggregation by 6-hour interval)
# building_6h = (
#     predictions
#         .groupBy(
#             "building_id",
#             "time",  # The 6-hour interval column you created in Task 5
#             F.window("weather_ts", "5 seconds") # Group by 6-hour windows of event-time
#         )
#         .agg(F.sum("log_power_usage").alias("total_power_6h"))
# )

# # --- Print to console every 7 seconds ---
# query_building_6h = (
#     building_6h
#         .writeStream
#         .outputMode("update") # 'update' mode is correct for windowed aggregations
#         .format("memory")
#         .queryName("building_6h")
#         .trigger(processingTime="7 seconds")
#         .start()
# )

# print("Waiting for first batch...")
# # Wait for the first microbatch to finish
# while query_building_6h.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)    
# spark.sql("select * from building_6h").show()

In [18]:
# #### 6c
# # (Task 6c requires daily aggregation by site)
# site_daily = (
#     predictions
#         .groupBy(
#             "site_id", 
#             "date", # The date column you created in Task 5
#             F.window("weather_ts", "5 seconds") # Group by daily windows of event-time
#         )
#         .agg(F.sum("log_power_usage").alias("total_power_day"))
# )

# # --- Print to console every 14 seconds ---
# query_site_daily = (
#     site_daily
#         .writeStream
#         .outputMode("update") # 'update' mode is correct
#         .format("memory")
#         .queryName("site_daily")
#         .trigger(processingTime="14 seconds")
#         .start()
# )
# print("Waiting for first batch...")
# # Wait for the first microbatch to finish
# while query_site_daily.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)
# spark.sql("select * from site_daily").show()

7.	Save the data from 6 to Parquet files as streams. (Hint: Parquet files support streaming writing/reading. The file keeps updating while new batches arrive.)

In [22]:
# # 7a(save 6a)

# # Save predictions to Parquet incrementally
# query_live_parquet = (
#     predictions
#         .select("site_id", "building_id", "time", "log_power_usage")
#         .writeStream
#         .outputMode("append")
#         .format("parquet")
#         .option("path", "data/live_predictions")
#         .option("checkpointLocation", checkpoint_dir + "/live_predictions")
#         .start()
# )
# print("Waiting for Parquet microbatch...")
# while query_live_parquet.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)

In [23]:
# # 7b(save 6b)

# query_building_6h_parquet = (
#     building_6h
#         .writeStream
#         .outputMode("append")
#         .format("parquet")
#         .option("path", "data/building_6h")
#         .option("checkpointLocation", checkpoint_dir + "/building_6h")
#         .start()
# )
# print("Waiting for Parquet microbatch...")
# while query_building_6h_parquet.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)

In [24]:
# # 7c(save 6c)


# query_site_daily_parquet = (
#     site_daily
#         .writeStream
#         .outputMode("append")
#         .format("parquet")
#         .option("path", "data/site_daily")
#         .option("checkpointLocation", checkpoint_dir + "/site_daily")
#         .trigger(processingTime="14 seconds")
#         .start()
# )
# print("Waiting for Parquet microbatch...")
# while query_site_daily_parquet.lastProgress is None:
#     time.sleep(1)
# time.sleep(5)

In [None]:
def wait_for_first_batches(queries, label=""):
    print(f"⏳ Waiting for first microbatches {label}...")
    for q in queries:
        print(f"  ↳ Waiting for {q.name or 'unnamed query'}...")
        while q.lastProgress is None:
            time.sleep(1)
        print(f"  ✅ {q.name or 'query'} has processed its first batch.")
    # small grace period for filesystem to flush parquet files
    time.sleep(3)
    print(f"✅ All prerequisite batches for {label} completed.\n")
    
wait_for_first_batches(
    [query_live_combined, query_building_6h, query_site_daily],
    label="pre-Kafka publication"
)


8.	Read the parquet files from task 7 as data streams and send them to Kafka topics with appropriate names.
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet file.)

In [None]:
import json
kafka_ip = hostip + ":9092"
# Stream 1
live_predictions = (
    spark.readStream
         .format("parquet")
         .schema(predictions.schema)
         .load("data/live_predictions")
)

# send predictions to Kafka
kafka_live_predictions = (
    live_predictions
        .selectExpr("\"predictions\" AS key", 
                    "to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_ip)
        .option("topic", "live_predictions")
        .option("checkpointLocation", checkpoint_dir + "/kafka/live_predictions")
        .outputMode("append")
        .start()
)


In [None]:
# Stream 2
building_6h = (
    spark.readStream
         .format("parquet")
         .schema(predictions.schema)
         .load("data/building_6h")
)

# send predictions to Kafka
kafka_building_6h = (
    building_6h
        .selectExpr("\"predictions\" AS key", 
                        "to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_ip)
        .option("topic", "building_6h")
        .option("checkpointLocation", checkpoint_dir + "/kafka/building_6h")
        .outputMode("append")
        .start()
)


In [None]:
# Stream 3
site_daily = (
    spark.readStream
         .format("parquet")
         .schema(predictions.schema)
         .load("data/site_daily")
)

# send predictions to Kafka
kafka_site_daily = (
    site_daily
        .selectExpr("\"predictions\" AS key", 
                        "to_json(struct(*)) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_ip)
        .option("topic", "site_daily")
        .option("checkpointLocation", checkpoint_dir + "/kafka/site_daily")
        .outputMode("append")
        .start()
)

In [None]:
print(kafka_site_daily.lastProgress)
print(kafka_site_daily.status)
# this shows it's inactive

In [None]:
import time, json

for i in range(10):
    time.sleep(3)
    print(json.dumps(kafka_live_predictions.lastProgress, indent=2))
# this show they are nulls

In [None]:
spark.read.parquet("data/live_predictions").show(5)
spark.read.parquet("data/building_6h").show(5)
spark.read.parquet("data/site_daily").show(5)
# this shows they exist