In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.appName("Daily-Day10").getOrCreate()

25/09/01 19:38:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
df = spark.read.option("header","true").option("inferSchema","true").csv("iot_sensor_data.csv")
df.printSchema()


root
 |-- sensor_id: string (nullable = true)
 |-- warehouse_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- battery_level: double (nullable = true)
 |-- status: string (nullable = true)



In [29]:
window_spec_gap = Window.partitionBy("warehouse_id", "sensor_id").orderBy("timestamp")
df_with_gaps = df.withColumn(
    "time_diff_minutes",
    (col("timestamp").cast("long") - lag("timestamp", 1).over(window_spec_gap).cast("long")) / 60
)
df_with_gaps.show()

+---------+------------+-------------------+-----------+--------+-------------+------+-----------------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|time_diff_minutes|
+---------+------------+-------------------+-----------+--------+-------------+------+-----------------+
|   S_W1_1|          W1|2025-08-25 00:10:00|       27.7|    54.0|         57.3|    OK|             NULL|
|   S_W1_1|          W1|2025-08-25 00:25:00|       28.9|    60.7|         67.0|    OK|             15.0|
|   S_W1_1|          W1|2025-08-25 00:40:00|       25.0|    60.7|         94.1|    OK|             15.0|
|   S_W1_1|          W1|2025-08-25 00:50:00|       22.6|    58.6|         93.6|    OK|             10.0|
|   S_W1_1|          W1|2025-08-25 01:00:00|       27.5|    51.5|         61.0|    OK|             10.0|
|   S_W1_1|          W1|2025-08-25 01:20:00|       30.7|    57.5|         82.4|    OK|             20.0|
|   S_W1_1|          W1|2025-08-25 01:30:00|       20.3

In [30]:
gaps_df = df_with_gaps.filter(
    (col("time_diff_minutes").isNotNull()) & (col("time_diff_minutes") > 15.1)
).groupBy("warehouse_id", to_date("timestamp").alias("date")).count().alias("missing_readings_count")
gaps_df.show(truncate=False)

+------------+----------+-----+
|warehouse_id|date      |count|
+------------+----------+-----+
|W45         |2025-08-25|18   |
|W44         |2025-08-25|22   |
|W29         |2025-08-25|28   |
|W7          |2025-08-25|21   |
|W34         |2025-08-25|26   |
|W17         |2025-08-25|15   |
|W21         |2025-08-25|30   |
|W46         |2025-08-25|31   |
|W1          |2025-08-25|18   |
|W36         |2025-08-25|26   |
|W27         |2025-08-25|26   |
|W15         |2025-08-25|21   |
|W37         |2025-08-25|24   |
|W48         |2025-08-25|28   |
|W12         |2025-08-25|19   |
|W3          |2025-08-25|19   |
|W5          |2025-08-25|28   |
|W31         |2025-08-25|23   |
|W26         |2025-08-25|25   |
|W41         |2025-08-25|25   |
+------------+----------+-----+
only showing top 20 rows



In [32]:
daily_metrics_df = df.groupBy("warehouse_id", to_date("timestamp").alias("date")).agg(
    avg(when(col("status") == "OK", col("temperature"))).alias("avg_temp"),
    min(when(col("status") == "OK", col("temperature"))).alias("min_temp"),
    max(when(col("status") == "OK", col("temperature"))).alias("max_temp"),
    avg(when(col("status") == "OK", col("humidity"))).alias("avg_humidity"),
    min(when(col("status") == "OK", col("humidity"))).alias("min_humidity"),
    max(when(col("status") == "OK", col("humidity"))).alias("max_humidity"),
    (sum(when(col("status") == "MALFUNCTION", 1).otherwise(0)) / count("*") * 100).alias("malfunction_pct"),
    avg(col("battery_level")).alias("avg_battery")
).orderBy("date", "warehouse_id")

daily_metrics_df.show(truncate=False)

+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+
|warehouse_id|date      |avg_temp          |min_temp|max_temp|avg_humidity      |min_humidity|max_humidity|malfunction_pct   |avg_battery      |
+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+
|W1          |2025-08-25|25.19032258064516 |17.0    |34.3    |60.13763440860213 |47.6        |95.0        |3.125             |73.84687499999998|
|W10         |2025-08-25|25.182795698924732|17.6    |31.8    |60.25376344086022 |49.7        |95.0        |1.0638297872340425|72.44255319148935|
|W11         |2025-08-25|25.4586956521739  |19.0    |32.4    |59.77065217391308 |0.0         |95.0        |3.1578947368421053|73.94631578947366|
|W12         |2025-08-25|25.04021739130435 |16.4    |32.6    |60.464130434782625|0.0         |95.0        |2.127659574468085 |75.2

In [33]:
final_daily_metrics_df = daily_metrics_df.join(
    gaps_df.select(col("warehouse_id"), col("date"), col("count").alias("missing_readings")),
    on=["warehouse_id", "date"],
    how="left"
).na.fill(0, subset=["missing_readings"])
final_daily_metrics_df.show(truncate=False)

+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+----------------+
|warehouse_id|date      |avg_temp          |min_temp|max_temp|avg_humidity      |min_humidity|max_humidity|malfunction_pct   |avg_battery      |missing_readings|
+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+----------------+
|W45         |2025-08-25|24.472631578947365|16.4    |31.2    |60.419999999999995|46.9        |95.0        |3.061224489795918 |76.36020408163263|18              |
|W44         |2025-08-25|24.864285714285717|17.9    |31.3    |59.06836734693876 |0.0         |95.0        |1.0101010101010102|73.75555555555555|22              |
|W29         |2025-08-25|25.278947368421054|16.8    |34.4    |60.11157894736844 |0.0         |75.7        |0.0               |73.27684210526316|28              |
|W7          |2025-08-25|24.

In [None]:

outliers_df = df.filter(
    (col("temperature") < -20) | (col("temperature") > 60) | (col("humidity") > 90)
).withColumn(
    "outlier_type",
    when(col("temperature") < -20, "Temp Low")
    .when(col("temperature") > 60, "Temp High")
    .when(col("humidity") > 90, "Humidity High")
    .otherwise("Other")
).select(
    "sensor_id",
    "warehouse_id",
    to_date("timestamp").alias("date"),
    "outlier_type",
    when(col("outlier_type").contains("Temp"), col("temperature"))
    .when(col("outlier_type").contains("Humidity"), col("humidity"))
    .alias("value")
).orderBy("date", "warehouse_id", "sensor_id")

In [25]:
outliers_df.show(truncate=False)

+---------+------------+----------+-------------+------+
|sensor_id|warehouse_id|date      |outlier_type |value |
+---------+------------+----------+-------------+------+
|S_W1_3   |W1          |2025-08-25|Temp High    |100.0 |
|S_W1_3   |W1          |2025-08-25|Temp Low     |-100.0|
|S_W1_4   |W1          |2025-08-25|Humidity High|95.0  |
|S_W1_5   |W1          |2025-08-25|Temp Low     |-100.0|
|S_W10_2  |W10         |2025-08-25|Temp Low     |-100.0|
|S_W10_5  |W10         |2025-08-25|Humidity High|95.0  |
|S_W11_1  |W11         |2025-08-25|Humidity High|95.0  |
|S_W11_2  |W11         |2025-08-25|Temp Low     |-100.0|
|S_W11_3  |W11         |2025-08-25|Temp High    |100.0 |
|S_W11_5  |W11         |2025-08-25|Temp Low     |-100.0|
|S_W12_1  |W12         |2025-08-25|Humidity High|95.0  |
|S_W12_1  |W12         |2025-08-25|Temp High    |100.0 |
|S_W12_1  |W12         |2025-08-25|Temp Low     |-100.0|
|S_W12_2  |W12         |2025-08-25|Humidity High|95.0  |
|S_W12_2  |W12         |2025-08