In [2]:
from pyspark.sql import SparkSession

### Reading Data From Kafka

In [3]:
spark = SparkSession.builder \
    .appName("Test") \
    .master("local[*]") \
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1"
    ) \
    .getOrCreate()

df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "smart-meter-data")
    .option("startingoffsets", "earliest")
    .load()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/17 21:23:00 WARN Utils: Your hostname, yegane, resolves to a loopback address: 127.0.1.1; using 172.22.201.234 instead (on interface wlp0s20f3)
25/11/17 21:23:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/yegane/Documents/smart-meter-simulation/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/yegane/.ivy2.5.2/cache
The jars for the packages stored in: /home/yegane/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6a17117f-b444-464c-8477-387a5b9e8ea2;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.1 in central
	found org.apache.kafka#kafka-clie

### Create Schema

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType

meter_schema = StructType([
    StructField("meter_id", IntegerType(), nullable=False),
    StructField("building_id", IntegerType(), nullable=False),
    StructField("timestamp", TimestampType(), nullable=False),
    StructField("power_kw", FloatType(), nullable=False),
    StructField("voltage_v", FloatType(), nullable=False),
    StructField("status", IntegerType(), nullable=False)
])

### Parse Raw DataFrame

In [5]:
from pyspark.sql.functions import col, expr, from_json

raw_df = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "partition", "offset", "timestamp")

In [6]:
df = raw_df.select(from_json(col("value"), meter_schema).alias("data")).select("data.*") 

In [7]:
df = df.filter((df.status >= 0) & (df.status <= 4))

### Inspect DataFrame

In [8]:

# query = df.writeStream.outputMode("append").format("console").start()

# query.awaitTermination()

### House Hourly Power Consumption

In [None]:
from pyspark.sql.functions import window, col, max, min, avg, sum

hourly_house_power_consumption = df.groupBy(
    "meter_id",
    window("timestamp", "1 hour")
).agg(
    avg("power_kw").alias("avg_power"),
    max("power_kw").alias("max_power"),
    min("power_kw").alias("min_power"), 
    sum("power_kw").alias("sum_power")
)

### Building Hourly Power Consumption

In [None]:
hourly_building_power_consumption = df.groupBy(
    "building_id",
    window("timestamp", "1 hour")
).agg(
    avg("power_kw").alias("avg_power"),
    max("power_kw").alias("max_power"),
    min("power_kw").alias("min_power"),
    sum("power_kw").alias("sum_power")
)

### Power Consumption Trend - Dashboard

In [9]:
from pyspark.sql.functions import window, col, max, min, avg, sum, count

dashboard = (
    df
    .withWatermark("timestamp", "5 minutes")
    .groupBy(
        window("timestamp", "1 minute"),
        col("meter_id")
    )
    .agg(
        avg("power_kw").alias("avg_power_min"),
        avg("voltage_v").alias("avg_voltage_min"),
        count("*").alias("event_count")
    )
)

# (
#     dashboard.writeStream
#     .format("console")  # in production → ClickHouse
#     .outputMode("complete")
#     .option("truncate", False)
#     .start()
# )


In [10]:
# spark.sql("SELECT * FROM dashboard_table").show()

### Power Consumption Anomaly Detection

In [None]:
anomaly = (
    df
    .withWatermark("timestamp", "1 minute")
    .groupBy(
        window("timestamp", "30 seconds"),
        col("meter_id")
    ).agg(
        avg("power_kw").alias("avg_power"),
        max("power_kw").alias("max_power"),
        min("power_kw").alias("min_power")
    ).withColumn(
        "is_anomaly",
        (col("max_power") > col("avg_power") * 1.8)
    )
)

# (
#     anomaly.writeStream
#     .format("console")
#     .outputMode("update")
#     .option("truncate", False)
#     .start()
# )

### Power Consumption - Prediction

In [None]:
prediction = (
    df
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window("timestamp", "5 minutes"),
        col("meter_id")
    )
    .agg(
        avg("power_kw").alias("power_5m_avg"),
        stddev("power_kw").alias("power_5m_std"),
        avg("voltage_v").alias("voltage_5m_avg"),
        stddev("voltage_v").alias("voltage_5m_std"),
        max("status").alias("max_status"),
    )
)

# (
#     features.writeStream
#     .format("console")  # in production → write to ClickHouse/Parquet
#     .outputMode("update")
#     .option("truncate", False)
#     .start()
# )


### Count Animalies / Spikes / Failures

In [None]:
anomaly_count = (
    df
    .groupBy("meter_id")
    .agg(sum(when(col("status")) == 1, 1).otherwise(0)).alias("spike_count")
    .agg(sum(when(col("status")) == 2, 1).otherwise(0)).alias("dropout_count")
    .agg(sum(when(col("status")) == 3, 1).otherwise(0)).alias("low_consumption_count")
    .agg(sum(when(col("status")) == 1, 1).otherwise(0)).alias("voltage_anomaly_count")
)


### Hourly House Voltage Monitoring

In [None]:
hourly_house_voltage = house_agg = df.groupBy(
    "meter_id",
    window("timestamp", "1 hour")
).agg(
    avg("voltage_v").alias("avg_voltage"),
    max("voltage_v").alias("max_voltage"),
    min("voltage_v").alias("min_voltage"), 
    sum("voltage_v").alias("sum_voltage")
)

### Hourly Building Voltage Monitoring

In [None]:
hourly_building_voltage = df.groupBy(
    "building_id",
    window("timestamp", "1 hour")
).agg(
    avg("voltage_v").alias("avg_voltage"),
    max("voltage_v").alias("max_voltage"),
    min("voltage_v").alias("min_voltage"), 
    sum("voltage_v").alias("sum_voltage")
)

### Peak Hours / Minimum

In [None]:
power_min_max = (
    df.withWatermark("timestamp", "2 hours")
    .groupBy(
        window(col("timestamp"), "1 hour")
    ).agg(
        max("power_kw").alias("max_power_kw")
        min("power_kw").alias("min_power_kw")
    )
)

# max_value = power_min_max.agg(max("max_power_kw")).collect()[0]["max_power"]
# min_value = power_min_max.agg(min("min_power_kw")).collect()[1]["min_power"]

peak_hour = df_hourly_max.orderBy(col("max_power_kw").desc()).limit(1)
min_hour = df_hourly_max.orderBy(col("max_power_kw").asc()).limit(1)

power_min_max.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .start()

### Write Data To Clickhouse

In [None]:
query = (
    df.writeStream.foreachBatch(
        lambda batch_df, batch_id: batch_df.write
        .format("jdbc")
        .option("url", "jdbc:clickhouse://host:8123/db")
        .option("dbtable", "iot_max_power")
        .option("user", "default")
        .option("password", "")
        .mode("append")
        .save()
    )
    .option("checkpointLocation", "/tmp/checkpoints/iot_clickhouse")
    .start()
)