In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, min, sum, approx_count_distinct, col, rand
import time

# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .appName("StructuredStreaming").getOrCreate()

# Stop any previous streaming queries before starting new ones
for query in spark.streams.active:
    query.stop()

# Step 2: Define the schema
schema = """
    ts STRING,
    asset INT,
    status STRING,
    alarm_time INT,
    loading_time INT,
    tooling_time INT,
    maintenance_time INT,
    support_time INT,
    power_avg FLOAT,
    power_min FLOAT,
    power_max FLOAT
"""

selected_columns = [
    "ts", "asset", "status", "power_avg", "power_max", "power_min"
]

# Step 3: Read streaming data
streaming_df = (
    spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv("FILES")  # Directory to monitor
    .select(*selected_columns)
)

streaming_df = streaming_df.filter(rand() < 0.05)  # ~5% of the data

# 1. Average Power Consumption Per Asset
average_power = (
    streaming_df
    .filter(col("asset").isNotNull())  # remove rows where asset is null
    .groupBy("asset")
    .agg(avg("power_avg").alias("average_power"))
    .orderBy(col("average_power").desc())  # sort in descending order
)

# 2. Maximum Power Usage Over Time
max_power = (
    streaming_df.agg(max("power_max").alias("max_power_usage"))
)

# 3. Power Fluctuation Detection
power_fluctuation = (
    streaming_df.withColumn("power_fluctuation", col("power_max") - col("power_min"))
)

# 4. Total Power Consumption Over Time
total_power = (
    streaming_df.agg(sum("power_avg").alias("total_power_consumption"))
)

# 5. Approximate Unique Active Assets Count
unique_assets = (
    streaming_df.agg(approx_count_distinct("asset").alias("approx_unique_assets"))
)

# Writing outputs to console
query_avg = (
    average_power.writeStream
    .queryName("avg_power_query")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .option("checkpointLocation", f"/tmp/checkpoints/average_power_{int(time.time())}/")  # Unique checkpoint
    .start()
)

query_fluctuation = (
    power_fluctuation.writeStream
    .queryName("fluctuation_query")
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("checkpointLocation", f"/tmp/checkpoints/power_fluctuation_{int(time.time())}/")
    .start()
)

query_max_power = (
    max_power.writeStream
    .queryName("max_power_query")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .option("checkpointLocation", f"/tmp/checkpoints/max_power_{int(time.time())}/")
    .start()
)

query_total_power = (
    total_power.writeStream
    .queryName("total_power_query")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .option("checkpointLocation", f"/tmp/checkpoints/total_power_{int(time.time())}/")
    .start()
)

# Wait for the streaming to finish
query_avg.awaitTermination()
query_fluctuation.awaitTermination()
query_max_power.awaitTermination()
query_total_power.awaitTermination()


25/03/30 11:41:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/03/30 11:41:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/30 11:41:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/30 11:41:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/30 11:41:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------


[Stage 2:>    (0 + 0) / 1][Stage 3:>    (0 + 4) / 4][Stage 5:>    (0 + 0) / 4]4]

+-------------------------+-----+----------+---------+---------+---------+-----------------+
|ts                       |asset|status    |power_avg|power_max|power_min|power_fluctuation|
+-------------------------+-----+----------+---------+---------+---------+-----------------+
|2021-11-16 15:48:00+00:00|5    |Standby   |9.1762   |17.9793  |5.76693  |12.21237         |
|2021-11-16 15:49:00+00:00|14   |MachineOn |5.060667 |8.732685 |3.6945975|5.038088         |
|2021-11-16 15:51:00+00:00|4    |Production|330.53748|386.2534 |46.014534|340.23886        |
|2021-11-16 15:51:00+00:00|6    |Standby   |16.27782 |22.503458|12.427282|10.076176        |
|2021-11-16 15:54:00+00:00|5    |Production|8.55993  |17.64    |5.08847  |12.551529        |
|2021-11-16 15:56:00+00:00|11   |Production|7.465372 |11.755537|5.0380874|6.7174497        |
|2021-11-16 15:57:00+00:00|14   |MachineOn |8.151258 |95.05192 |3.358725 |91.69319         |
|2021-11-16 15:58:00+00:00|12   |Standby   |1.0492017|1.34349  |1.0076

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------------+
|total_power_consumption|
+-----------------------+
|3666703.984284345      |
+-----------------------+

-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+
|max_power_usage|
+---------------+
|806.06         |
+---------------+



                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+------------------+
|asset|average_power     |
+-----+------------------+
|3    |139.84429432709726|
|4    |102.06913520106617|
|9    |101.29514047516223|
|5    |86.63100026217131 |
|13   |79.91346556293301 |
|1    |62.413434229726604|
|10   |53.1344742992425  |
|8    |45.702913488545875|
|7    |22.897871970659885|
|6    |17.08529790577849 |
|15   |14.468509786303606|
|16   |12.474804667094462|
|17   |10.277148821121868|
|12   |6.1009717636178165|
|14   |5.810286658517822 |
|11   |5.767543048855085 |
|2    |5.468842835756808 |
|0    |3.398475297367126 |
+-----+------------------+

