# Week 3 - Spark for Device-Level Aggregation + Insights

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, sum as _sum, avg, max as _max, hour, to_date, desc


# Start Spark session

In [4]:
spark = SparkSession.builder.appName("EnergyAggregation").getOrCreate()

# Load Week 2 cleaned logs

In [10]:
df = spark.read.csv("/content/cleaned_energy_logs.csv", header=True, inferSchema=True)
df.show()

+------+---------+-------+-------------------+----------+
|log_id|device_id|room_id|          timestamp|energy_kwh|
+------+---------+-------+-------------------+----------+
|     1|        1|      1|2025-08-01 08:00:00|      0.05|
|     2|        2|      1|2025-08-01 09:00:00|       0.3|
|     3|        3|      2|2025-08-01 10:00:00|       1.2|
|     4|        4|      2|2025-08-01 11:00:00|      0.15|
|     5|        5|      3|2025-08-01 12:00:00|       0.5|
|     6|        6|      3|2025-08-01 13:00:00|       1.5|
|     7|        7|      1|2025-08-01 14:00:00|       0.4|
|     8|        8|      4|2025-08-01 15:00:00|       0.8|
|     9|        9|      4|2025-08-01 16:00:00|       2.0|
|    10|       10|      1|2025-08-01 17:00:00|       0.1|
|    11|       11|      5|2025-08-01 18:00:00|       0.9|
|    12|       12|      1|2025-08-01 19:00:00|       0.2|
|    13|       13|      5|2025-08-01 20:00:00|      0.05|
|    14|       14|      6|2025-08-01 21:00:00|      0.02|
|    15|      


# Convert timestamp to hour

In [9]:
df = df.withColumn("hour", hour(col("timestamp")))
df.show()


+------+---------+-------+-------------------+----------+----+----------+-------------+
|log_id|device_id|room_id|          timestamp|energy_kwh|hour|peak_usage|offpeak_usage|
+------+---------+-------+-------------------+----------+----+----------+-------------+
|     1|        1|      1|2025-08-01 08:00:00|      0.05|   8|      0.05|          0.0|
|     2|        2|      1|2025-08-01 09:00:00|       0.3|   9|       0.3|          0.0|
|     3|        3|      2|2025-08-01 10:00:00|       1.2|  10|       1.2|          0.0|
|     4|        4|      2|2025-08-01 11:00:00|      0.15|  11|      0.15|          0.0|
|     5|        5|      3|2025-08-01 12:00:00|       0.5|  12|       0.5|          0.0|
|     6|        6|      3|2025-08-01 13:00:00|       1.5|  13|       1.5|          0.0|
|     7|        7|      1|2025-08-01 14:00:00|       0.4|  14|       0.4|          0.0|
|     8|        8|      4|2025-08-01 15:00:00|       0.8|  15|       0.8|          0.0|
|     9|        9|      4|2025-0

# Define peak/off-peak classification

In [8]:
df = df.withColumn(
    "peak_usage",
    when((col("hour") >= 6) & (col("hour") <= 22), col("energy_kwh")).otherwise(0)
).withColumn(
    "offpeak_usage",
    when((col("hour") < 6) | (col("hour") > 22), col("energy_kwh")).otherwise(0)
)
df.show()

+------+---------+-------+-------------------+----------+----+----------+-------------+
|log_id|device_id|room_id|          timestamp|energy_kwh|hour|peak_usage|offpeak_usage|
+------+---------+-------+-------------------+----------+----+----------+-------------+
|     1|        1|      1|2025-08-01 08:00:00|      0.05|   8|      0.05|          0.0|
|     2|        2|      1|2025-08-01 09:00:00|       0.3|   9|       0.3|          0.0|
|     3|        3|      2|2025-08-01 10:00:00|       1.2|  10|       1.2|          0.0|
|     4|        4|      2|2025-08-01 11:00:00|      0.15|  11|      0.15|          0.0|
|     5|        5|      3|2025-08-01 12:00:00|       0.5|  12|       0.5|          0.0|
|     6|        6|      3|2025-08-01 13:00:00|       1.5|  13|       1.5|          0.0|
|     7|        7|      1|2025-08-01 14:00:00|       0.4|  14|       0.4|          0.0|
|     8|        8|      4|2025-08-01 15:00:00|       0.8|  15|       0.8|          0.0|
|     9|        9|      4|2025-0

#  Aggregate per device


In [12]:
df = df.withColumn("hour", hour(col("timestamp")))
df = df.withColumn(
    "peak_usage",
    when((col("hour") >= 6) & (col("hour") <= 22), col("energy_kwh")).otherwise(0)
).withColumn(
    "offpeak_usage",
    when((col("hour") < 6) | (col("hour") > 22), col("energy_kwh")).otherwise(0)
)

agg_df = df.groupBy("device_id").agg(
    _sum("energy_kwh").alias("total_energy_kwh"),
    _sum("peak_usage").alias("peak_usage_kwh"),
    _sum("offpeak_usage").alias("offpeak_usage_kwh"),
    avg("energy_kwh").alias("avg_energy_kwh"),
    _max("energy_kwh").alias("max_energy_kwh")
)


# Save aggregate results

In [13]:
agg_df.toPandas().to_csv("device_aggregates.csv", index=False)
print("✅ device_aggregates.csv saved")

✅ device_aggregates.csv saved


# 2. Daily usage per device

In [15]:
df_daily = df.withColumn("date", to_date("timestamp"))
daily_usage = df_daily.groupBy("device_id","date").agg(
    _sum("energy_kwh").alias("daily_energy_kwh")
)
daily_usage.show()

+---------+----------+----------------+
|device_id|      date|daily_energy_kwh|
+---------+----------+----------------+
|        2|2025-08-01|             0.3|
|       12|2025-08-01|             0.2|
|        5|2025-08-01|             0.5|
|       17|2025-08-02|             0.3|
|       10|2025-08-01|             0.1|
|       13|2025-08-01|            0.05|
|       16|2025-08-01|             1.0|
|       15|2025-08-01|            0.15|
|        1|2025-08-01|            0.05|
|        3|2025-08-01|             1.2|
|        8|2025-08-01|             0.8|
|        7|2025-08-01|             0.4|
|        4|2025-08-01|            0.15|
|       20|2025-08-02|            0.05|
|        6|2025-08-01|             1.5|
|       11|2025-08-01|             0.9|
|        9|2025-08-01|             2.0|
|       19|2025-08-02|            0.25|
|       18|2025-08-02|             0.7|
|       14|2025-08-01|            0.02|
+---------+----------+----------------+



# Devices exceeding 10 kWh/day

In [16]:
alerts = daily_usage.filter(col("daily_energy_kwh") > 10)
alerts.toPandas().to_csv("alerts.csv", index=False)
print("✅ alerts.csv saved (devices > 10 kWh/day)")

✅ alerts.csv saved (devices > 10 kWh/day)


#3. Top 5 energy consuming devices

In [17]:
top_devices = agg_df.orderBy(desc("total_energy_kwh")).limit(5)
top_devices.toPandas().to_csv("top_devices.csv", index=False)
print("✅ top_devices.csv saved")

✅ top_devices.csv saved


#Show results in console

In [18]:
print("📊 Aggregates:")
agg_df.show(truncate=False)

print("📊 Daily Usage (Alerts > 10 kWh):")
alerts.show(truncate=False)

print("📊 Top Devices:")
top_devices.show(truncate=False)

📊 Aggregates:
+---------+----------------+--------------+-----------------+--------------+--------------+
|device_id|total_energy_kwh|peak_usage_kwh|offpeak_usage_kwh|avg_energy_kwh|max_energy_kwh|
+---------+----------------+--------------+-----------------+--------------+--------------+
|12       |0.2             |0.2           |0.0              |0.2           |0.2           |
|1        |0.05            |0.05          |0.0              |0.05          |0.05          |
|13       |0.05            |0.05          |0.0              |0.05          |0.05          |
|6        |1.5             |1.5           |0.0              |1.5           |1.5           |
|16       |1.0             |0.0           |1.0              |1.0           |1.0           |
|3        |1.2             |1.2           |0.0              |1.2           |1.2           |
|20       |0.05            |0.05          |0.0              |0.05          |0.05          |
|5        |0.5             |0.5           |0.0              |0.5  