In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window as W
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType
from datetime import datetime

In [2]:
spark = SparkSession.builder.appName("DailyCodingProblem-27-08-2025").getOrCreate()

In [3]:
df = spark.read.format("csv") \
    .options(header=True, inferSchema=True) \
    .load("/home/jupyter/work/data/sources/csv/28-08-2025/iot_sensor_data.csv")

## **Problem Scenario: IoT Sensor Data Monitoring**

### **Background**

You work for a company that operates **smart warehouses** equipped with **IoT temperature and humidity sensors** in multiple locations worldwide. These sensors send readings every few minutes to a central data pipeline.

However, the raw data is messy:

* Some readings are missing or delayed.
* Multiple sensors sometimes report at the same timestamp.
* Occasionally, sensors malfunction and send outlier readings (e.g., -100°C).

Your team needs **daily aggregated metrics** to monitor warehouse conditions and ensure compliance with safety regulations.

---

### **Data Generation Requirements**

* **CSV with at least 1000 rows**
* 50+ unique **warehouse locations**
* Columns:

  * `sensor_id`
  * `warehouse_id`
  * `timestamp`
  * `temperature` (°C)
  * `humidity` (%)
  * `battery_level` (%)
  * `status` (OK, MALFUNCTION)
* Edge cases: missing readings, outlier values, multiple readings at same timestamp

---

## **Problem Definition**

You need to process the IoT sensor data using **PySpark** to generate:

1. **Daily metrics per warehouse**

   * Average, min, max temperature and humidity
   * Percentage of malfunctioning sensors
   * Count of missing readings (gaps > 15 min)
   * Average battery level

2. **Outlier Detection Table**

   * Identify sensors with extreme values (e.g., temp < -20°C or temp > 60°C, humidity > 90%)

3. **Handle Edge Cases**

   * Missing readings → fill with previous valid value
   * Malfunctioning sensors → exclude from averages

---

## **Sample Input Table (Simplified)**

| sensor\_id | warehouse\_id | timestamp           | temperature | humidity | battery\_level | status      |
| ---------- | ------------- | ------------------- | ----------- | -------- | -------------- | ----------- |
| S1         | W1            | 2025-08-25 09:00:00 | 25.5        | 60       | 85             | OK          |
| S1         | W1            | 2025-08-25 09:15:00 | 26.0        | 59       | 84             | OK          |
| S2         | W1            | 2025-08-25 09:00:00 | -100.0      | 61       | 80             | MALFUNCTION |
| S3         | W2            | 2025-08-25 09:05:00 | 22.3        | 58       | 90             | OK          |
| S3         | W2            | 2025-08-25 09:20:00 | 23.0        | 57       | 89             | OK          |

---

## **Expected Output 1 – Daily Metrics Table**

| warehouse\_id | date       | avg\_temp | min\_temp | max\_temp | avg\_humidity | malfunction\_pct | missing\_readings | avg\_battery |
| ------------- | ---------- | --------- | --------- | --------- | ------------- | ---------------- | ----------------- | ------------ |
| W1            | 2025-08-25 | 25.8      | 25.5      | 26.0      | 59.5          | 10%              | 1                 | 84.5         |
| W2            | 2025-08-25 | 22.6      | 22.3      | 23.0      | 57.5          | 0%               | 0                 | 89.5         |

---

## **Expected Output 2 – Outlier Detection Table**

| sensor\_id | warehouse\_id | date       | outlier\_type | value  |
| ---------- | ------------- | ---------- | ------------- | ------ |
| S2         | W1            | 2025-08-25 | Temp Low      | -100.0 |
| S3         | W2            | 2025-08-25 | Humidity High | 95.0   |

---

## **Scalability & Realism**

* Use **window functions** for gap detection
* **Partition by date & warehouse** for big data scalability
* Outlier detection logic should be configurable (e.g., thresholds from config table)
* Could be extended to **streaming pipelines** with Spark Structured Streaming

---

In [4]:
df.show()

+---------+------------+-------------------+-----------+--------+-------------+-----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|     status|
+---------+------------+-------------------+-----------+--------+-------------+-----------+
|  S_W37_2|         W37|2025-08-25 01:00:00|       27.3|    56.6|         71.8|         OK|
|   S_W1_5|          W1|2025-08-25 02:20:00|       34.3|    62.1|         50.9|         OK|
|  S_W10_5|         W10|2025-08-25 01:30:00|       31.6|    66.3|         60.9|         OK|
|  S_W20_2|         W20|2025-08-25 02:30:00|       27.5|    57.2|         77.1|         OK|
|  S_W43_5|         W43|2025-08-25 02:40:00|       20.5|     0.0|         87.6|         OK|
|  S_W14_5|         W14|2025-08-25 02:55:00|       25.1|    56.3|         62.9|         OK|
|  S_W39_2|         W39|2025-08-25 00:30:00|       21.8|    59.2|         68.0|         OK|
|  S_W41_5|         W41|2025-08-25 02:25:00|       32.5|    61.8|         93.2| 

In [5]:
df = df.withColumn(
    "date",
    F.to_date("timestamp")
).withColumn(
    "time",
    F.date_format("timestamp", "HH:mm:ss")
)

df.show()

+---------+------------+-------------------+-----------+--------+-------------+-----------+----------+--------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|     status|      date|    time|
+---------+------------+-------------------+-----------+--------+-------------+-----------+----------+--------+
|  S_W37_2|         W37|2025-08-25 01:00:00|       27.3|    56.6|         71.8|         OK|2025-08-25|01:00:00|
|   S_W1_5|          W1|2025-08-25 02:20:00|       34.3|    62.1|         50.9|         OK|2025-08-25|02:20:00|
|  S_W10_5|         W10|2025-08-25 01:30:00|       31.6|    66.3|         60.9|         OK|2025-08-25|01:30:00|
|  S_W20_2|         W20|2025-08-25 02:30:00|       27.5|    57.2|         77.1|         OK|2025-08-25|02:30:00|
|  S_W43_5|         W43|2025-08-25 02:40:00|       20.5|     0.0|         87.6|         OK|2025-08-25|02:40:00|
|  S_W14_5|         W14|2025-08-25 02:55:00|       25.1|    56.3|         62.9|         OK|2025-08-25|02

In [6]:
window_spec = W.partitionBy("sensor_id").orderBy("timestamp")

In [7]:
df = df.withColumn("temperature", F.when(F.col("temperature").isNull(), F.lag("temperature", 1).over(window_spec))
                                       .otherwise(F.col("temperature"))) \
              .withColumn("humidity", F.when(F.col("humidity").isNull(), F.lag("humidity", 1).over(window_spec))
                                     .otherwise(F.col("humidity"))) \
              .withColumn("battery_level", F.when(F.col("battery_level").isNull(), F.lag("battery_level", 1).over(window_spec))
                                           .otherwise(F.col("battery_level")))
df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|    time|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|00:20:00|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|00:35:00|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25|00:40:00|
|  S_W10_1|         W10|2025-08-25 00:50:00|       29.0|    60.9|         76.0|    OK|2025-08-25|00:50:00|
|  S_W10_1|         W10|2025-08-25 01:10:00|       28.2|    56.8|         84.0|    OK|2025-08-25|01:10:00|
|  S_W10_1|         W10|2025-08-25 01:25:00|       26.4|    62.8|         83.7|    OK|2025-08-25|01:25:00|
|  S_W10_1|         W10|2025-08-25 01

In [8]:
df = df.withColumn(
    "prev_ts",
    F.when(
        F.lag("timestamp").over(window_spec).isNull(),
        F.col("timestamp")
    ).otherwise(F.lag("timestamp").over(window_spec))
)

df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|    time|            prev_ts|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|00:20:00|2025-08-25 00:20:00|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|00:35:00|2025-08-25 00:20:00|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25|00:40:00|2025-08-25 00:35:00|
|  S_W10_1|         W10|2025-08-25 00:50:00|       29.0|    60.9|         76.0|    OK|2025-08-25|00:50:00|2025-08-25 00:40:00|
|  S_W10_1|         W10|2025-08-25 01:10:00|       28.2|    56.8|         84.0|    OK|2025-08-25|01:10:00|2025-

In [9]:
df = df.withColumn(
    "gap_in_minutes",
    (F.unix_timestamp("timestamp") - F.unix_timestamp("prev_ts")) / 60
)

In [10]:
df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+--------------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|    time|            prev_ts|gap_in_minutes|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+--------------+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|00:20:00|2025-08-25 00:20:00|           0.0|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|00:35:00|2025-08-25 00:20:00|          15.0|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25|00:40:00|2025-08-25 00:35:00|           5.0|
|  S_W10_1|         W10|2025-08-25 00:50:00|       29.0|    60.9|         76.0|    OK|2025-08-25|00:50:00|2025-08-25 00:40:00|          10.0|
|  S_W

In [11]:
df = df.withColumn("missing_reading", F.when(F.col("gap_in_minutes") > 15, 1).otherwise(0))
df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+--------------+---------------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|    time|            prev_ts|gap_in_minutes|missing_reading|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+--------+-------------------+--------------+---------------+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|00:20:00|2025-08-25 00:20:00|           0.0|              0|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|00:35:00|2025-08-25 00:20:00|          15.0|              0|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25|00:40:00|2025-08-25 00:35:00|           5.0|              0|
|  S_W10_1|         W10|2025-08-25 00:50:00|       2

In [12]:
df_metrics = df.withColumn("date", F.to_date("timestamp")) \
    .groupBy("warehouse_id", "date") \
    .agg(
        F.round(F.avg(F.when(F.col("status") == "OK", F.col("temperature"))), 2).alias("avg_temp"),
        F.round(F.min(F.when(F.col("status") == "OK", F.col("temperature"))), 2).alias("min_temp"),
        F.round(F.max(F.when(F.col("status") == "OK", F.col("temperature"))), 2).alias("max_temp"),
        F.round(F.avg(F.when(F.col("status") == "OK", F.col("humidity"))), 2).alias("avg_humidity"),
        F.round(F.avg(F.when(F.col("status") == "OK", F.col("battery_level"))), 2).alias("avg_battery"),
        (100 * F.sum(F.when(F.col("status")=="MALFUNCTION",1).otherwise(0)) / F.count("*")).alias("malfunction_pct"),
        F.sum("missing_reading").alias("missing_readings")
    )
df_metrics.show()

+------------+----------+--------+--------+--------+------------+-----------+------------------+----------------+
|warehouse_id|      date|avg_temp|min_temp|max_temp|avg_humidity|avg_battery|   malfunction_pct|missing_readings|
+------------+----------+--------+--------+--------+------------+-----------+------------------+----------------+
|         W45|2025-08-25|   24.47|    16.4|    31.2|       60.42|      75.93| 3.061224489795918|              18|
|         W44|2025-08-25|   24.86|    17.9|    31.3|       59.07|      73.81|1.0101010101010102|              22|
|         W29|2025-08-25|   25.28|    16.8|    34.4|       60.11|      73.28|               0.0|              28|
|          W7|2025-08-25|   24.47|    16.8|    34.1|       59.49|      75.84|2.0408163265306123|              21|
|         W34|2025-08-25|    25.5|    17.2|    35.1|        59.7|       74.2| 4.123711340206185|              26|
|         W17|2025-08-25|   25.14|    18.9|    32.5|       59.75|      75.33|3.191489361

In [13]:
thresholds = {
    "temp_low": -20,
    "temp_high": 60,
    "humidity_high": 90
}

In [19]:
df_outliers = df.withColumn("date", F.to_date("timestamp")) \
    .withColumn("outlier_type", F.when(F.col("temperature") < thresholds["temp_low"], "Temp Low")
                                 .when(F.col("temperature") > thresholds["temp_high"], "Temp High")
                                 .when(F.col("humidity") > thresholds["humidity_high"], "Humidity High")
                                 .otherwise(None)) \
    .withColumn("value", F.when(F.col("temperature") < thresholds["temp_low"], F.col("temperature"))
                        .when(F.col("temperature") > thresholds["temp_high"], F.col("temperature"))
                        .when(F.col("humidity") > thresholds["humidity_high"], F.col("humidity"))
                        .otherwise(None)) \
    .filter(F.col("outlier_type").isNotNull()) \
    .select("sensor_id", "warehouse_id", "date", "outlier_type", "value")

In [20]:
df_outliers.show()

+---------+------------+----------+-------------+------+
|sensor_id|warehouse_id|      date| outlier_type| value|
+---------+------------+----------+-------------+------+
|  S_W10_2|         W10|2025-08-25|     Temp Low|-100.0|
|  S_W10_5|         W10|2025-08-25|Humidity High|  95.0|
|  S_W11_1|         W11|2025-08-25|Humidity High|  95.0|
|  S_W11_2|         W11|2025-08-25|     Temp Low|-100.0|
|  S_W11_3|         W11|2025-08-25|    Temp High| 100.0|
|  S_W11_5|         W11|2025-08-25|     Temp Low|-100.0|
|  S_W12_1|         W12|2025-08-25|Humidity High|  95.0|
|  S_W12_1|         W12|2025-08-25|     Temp Low|-100.0|
|  S_W12_1|         W12|2025-08-25|    Temp High| 100.0|
|  S_W12_2|         W12|2025-08-25|Humidity High|  95.0|
|  S_W12_2|         W12|2025-08-25|Humidity High|  95.0|
|  S_W12_2|         W12|2025-08-25|Humidity High|  95.0|
|  S_W13_5|         W13|2025-08-25|     Temp Low|-100.0|
|  S_W13_5|         W13|2025-08-25|Humidity High|  95.0|
|  S_W14_1|         W14|2025-08