In [98]:
pip install pyspark



In [99]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark=SparkSession.builder.appName("Day 10").getOrCreate()

In [100]:
df=spark.read.csv("/content/sample_data/iot_sensor_data.csv",
inferSchema=True,
header=True)

In [101]:
df.show()

+---------+------------+-------------------+-----------+--------+-------------+-----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|     status|
+---------+------------+-------------------+-----------+--------+-------------+-----------+
|  S_W37_2|         W37|2025-08-25 01:00:00|       27.3|    56.6|         71.8|         OK|
|   S_W1_5|          W1|2025-08-25 02:20:00|       34.3|    62.1|         50.9|         OK|
|  S_W10_5|         W10|2025-08-25 01:30:00|       31.6|    66.3|         60.9|         OK|
|  S_W20_2|         W20|2025-08-25 02:30:00|       27.5|    57.2|         77.1|         OK|
|  S_W43_5|         W43|2025-08-25 02:40:00|       20.5|     0.0|         87.6|         OK|
|  S_W14_5|         W14|2025-08-25 02:55:00|       25.1|    56.3|         62.9|         OK|
|  S_W39_2|         W39|2025-08-25 00:30:00|       21.8|    59.2|         68.0|         OK|
|  S_W41_5|         W41|2025-08-25 02:25:00|       32.5|    61.8|         93.2| 

## **Problem Scenario: IoT Sensor Data Monitoring**

### **Background**

You work for a company that operates **smart warehouses** equipped with **IoT temperature and humidity sensors** in multiple locations worldwide. These sensors send readings every few minutes to a central data pipeline.

However, the raw data is messy:

* Some readings are missing or delayed.
* Multiple sensors sometimes report at the same timestamp.
* Occasionally, sensors malfunction and send outlier readings (e.g., -100°C).

Your team needs **daily aggregated metrics** to monitor warehouse conditions and ensure compliance with safety regulations.

---

### **Data Generation Requirements**

* **CSV with at least 1000 rows**
* 50+ unique **warehouse locations**
* Columns:

  * `sensor_id`
  * `warehouse_id`
  * `timestamp`
  * `temperature` (°C)
  * `humidity` (%)
  * `battery_level` (%)
  * `status` (OK, MALFUNCTION)
* Edge cases: missing readings, outlier values, multiple readings at same timestamp

---

## **Problem Definition**

You need to process the IoT sensor data using **PySpark** to generate:

1. **Daily metrics per warehouse**

   * Average, min, max temperature and humidity
   * Percentage of malfunctioning sensors
   * Count of missing readings (gaps > 15 min)
   * Average battery level

2. **Outlier Detection Table**

   * Identify sensors with extreme values (e.g., temp < -20°C or temp > 60°C, humidity > 90%)

3. **Handle Edge Cases**

   * Missing readings → fill with previous valid value
   * Malfunctioning sensors → exclude from averages

---

## **Sample Input Table (Simplified)**

| sensor\_id | warehouse\_id | timestamp           | temperature | humidity | battery\_level | status      |
| ---------- | ------------- | ------------------- | ----------- | -------- | -------------- | ----------- |
| S1         | W1            | 2025-08-25 09:00:00 | 25.5        | 60       | 85             | OK          |
| S1         | W1            | 2025-08-25 09:15:00 | 26.0        | 59       | 84             | OK          |
| S2         | W1            | 2025-08-25 09:00:00 | -100.0      | 61       | 80             | MALFUNCTION |
| S3         | W2            | 2025-08-25 09:05:00 | 22.3        | 58       | 90             | OK          |
| S3         | W2            | 2025-08-25 09:20:00 | 23.0        | 57       | 89             | OK          |

---

## **Expected Output 1 – Daily Metrics Table**

| warehouse\_id | date       | avg\_temp | min\_temp | max\_temp | avg\_humidity | malfunction\_pct | missing\_readings | avg\_battery |
| ------------- | ---------- | --------- | --------- | --------- | ------------- | ---------------- | ----------------- | ------------ |
| W1            | 2025-08-25 | 25.8      | 25.5      | 26.0      | 59.5          | 10%              | 1                 | 84.5         |
| W2            | 2025-08-25 | 22.6      | 22.3      | 23.0      | 57.5          | 0%               | 0                 | 89.5         |

---

## **Expected Output 2 – Outlier Detection Table**

| sensor\_id | warehouse\_id | date       | outlier\_type | value  |
| ---------- | ------------- | ---------- | ------------- | ------ |
| S2         | W1            | 2025-08-25 | Temp Low      | -100.0 |
| S3         | W2            | 2025-08-25 | Humidity High | 95.0   |

---

## **Scalability & Realism**

* Use **window functions** for gap detection
* **Partition by date & warehouse** for big data scalability
* Outlier detection logic should be configurable (e.g., thresholds from config table)
* Could be extended to **streaming pipelines** with Spark Structured Streaming

---

In [102]:
df=df.withColumn("timestamp",to_timestamp("timestamp")).withColumn("date",to_date("timestamp").alias("date"))
df.show()

+---------+------------+-------------------+-----------+--------+-------------+-----------+----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|     status|      date|
+---------+------------+-------------------+-----------+--------+-------------+-----------+----------+
|  S_W37_2|         W37|2025-08-25 01:00:00|       27.3|    56.6|         71.8|         OK|2025-08-25|
|   S_W1_5|          W1|2025-08-25 02:20:00|       34.3|    62.1|         50.9|         OK|2025-08-25|
|  S_W10_5|         W10|2025-08-25 01:30:00|       31.6|    66.3|         60.9|         OK|2025-08-25|
|  S_W20_2|         W20|2025-08-25 02:30:00|       27.5|    57.2|         77.1|         OK|2025-08-25|
|  S_W43_5|         W43|2025-08-25 02:40:00|       20.5|     0.0|         87.6|         OK|2025-08-25|
|  S_W14_5|         W14|2025-08-25 02:55:00|       25.1|    56.3|         62.9|         OK|2025-08-25|
|  S_W39_2|         W39|2025-08-25 00:30:00|       21.8|    59.2|        

In [103]:
valid_df=df.filter(col("status")=="OK")
valid_df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|  S_W37_2|         W37|2025-08-25 01:00:00|       27.3|    56.6|         71.8|    OK|2025-08-25|
|   S_W1_5|          W1|2025-08-25 02:20:00|       34.3|    62.1|         50.9|    OK|2025-08-25|
|  S_W10_5|         W10|2025-08-25 01:30:00|       31.6|    66.3|         60.9|    OK|2025-08-25|
|  S_W20_2|         W20|2025-08-25 02:30:00|       27.5|    57.2|         77.1|    OK|2025-08-25|
|  S_W43_5|         W43|2025-08-25 02:40:00|       20.5|     0.0|         87.6|    OK|2025-08-25|
|  S_W14_5|         W14|2025-08-25 02:55:00|       25.1|    56.3|         62.9|    OK|2025-08-25|
|  S_W39_2|         W39|2025-08-25 00:30:00|       21.8|    59.2|         68.0|    OK|2025-08-25|
|  S_W41_5|         

Fill missing readings with previous valid value (forward fill)

In [104]:
window_ffill=Window.partitionBy("sensor_id").orderBy("timestamp")
filled_df=valid_df.withColumn("temperature",when(col("temperature").isNull(),lag("temperature",1).over(window_ffill)).otherwise(col("temperature")))\
          .withColumn("humidity",when(col("humidity").isNull(),lag("humidity",1).over(window_ffill)).otherwise(col("humidity")))
filled_df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 00:50:00|       29.0|    60.9|         76.0|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 01:10:00|       28.2|    56.8|         84.0|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 01:25:00|       26.4|    62.8|         83.7|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 01:45:00|       24.7|    54.3|         79.2|    OK|2025-08-25|
|  S_W10_1|         

Detect Missing Readings

In [105]:
window=Window.partitionBy("sensor_id").orderBy("timestamp")
gap_df=df.withColumn("prev_time",lag(col("timestamp"),1).over(window))\
          .withColumn("gap_minutes",col("timestamp").cast("long")-col("prev_time").cast("long")/60)\
          .withColumn("is_missing",when(col("gap_minutes")>15,1).otherwise(0))

cnt_missing=gap_df.groupBy("warehouse_id","date").agg(sum(col("is_missing")).alias("missing_readings"))
cnt_missing.show()

+------------+----------+----------------+
|warehouse_id|      date|missing_readings|
+------------+----------+----------------+
|         W45|2025-08-25|              93|
|         W44|2025-08-25|              94|
|         W29|2025-08-25|              90|
|          W7|2025-08-25|              93|
|         W34|2025-08-25|              92|
|         W17|2025-08-25|              89|
|         W21|2025-08-25|              91|
|         W46|2025-08-25|              92|
|          W1|2025-08-25|              91|
|         W36|2025-08-25|              90|
|         W27|2025-08-25|              88|
|         W15|2025-08-25|              91|
|         W37|2025-08-25|              91|
|         W48|2025-08-25|              89|
|         W12|2025-08-25|              89|
|          W3|2025-08-25|              89|
|          W5|2025-08-25|              93|
|         W31|2025-08-25|              92|
|         W26|2025-08-25|              91|
|         W41|2025-08-25|              91|
+----------

 Daily Metrics per Warehouse

In [106]:
  daily_metrics=filled_df.groupBy("warehouse_id","date")\
                .agg(
                    round(avg(col("temperature")),2).alias("avg_temp"),
                    min(col("temperature")).alias("min_temp"),
                    max(col("temperature")).alias("max_temp"),
                    round(avg(col("humidity")),2).alias("avg_humidity"),
                    round(avg(col("battery_level")),2).alias("avg_battery")
                )
daily_metrics.show()

+------------+----------+--------+--------+--------+------------+-----------+
|warehouse_id|      date|avg_temp|min_temp|max_temp|avg_humidity|avg_battery|
+------------+----------+--------+--------+--------+------------+-----------+
|         W45|2025-08-25|   24.47|    16.4|    31.2|       60.42|      75.93|
|         W44|2025-08-25|   24.86|    17.9|    31.3|       59.07|      73.81|
|         W29|2025-08-25|   25.28|    16.8|    34.4|       60.11|      73.28|
|          W7|2025-08-25|   24.47|    16.8|    34.1|       59.49|      75.84|
|         W34|2025-08-25|    25.5|    17.2|    35.1|        59.7|       74.2|
|         W17|2025-08-25|   25.14|    18.9|    32.5|       59.75|      75.33|
|         W21|2025-08-25|    25.0|    18.6|    30.9|       59.59|      72.27|
|         W46|2025-08-25|   24.97|    16.6|    32.9|       59.77|      73.91|
|          W1|2025-08-25|   25.19|    17.0|    34.3|       60.14|      73.49|
|         W36|2025-08-25|   25.21|    17.0|    31.4|       60.15

Add malfunction percentage


In [107]:
malfunction_pct=df.groupBy("warehouse_id","date")\
              .agg(round(100*sum(when(col("status")=="MALFUNCTION",2).otherwise(0))/count("*"),1).alias("malfunction_pct"))
malfunction_pct.show()

+------------+----------+---------------+
|warehouse_id|      date|malfunction_pct|
+------------+----------+---------------+
|         W45|2025-08-25|            6.1|
|         W44|2025-08-25|            2.0|
|         W29|2025-08-25|            0.0|
|          W7|2025-08-25|            4.1|
|         W34|2025-08-25|            8.2|
|         W17|2025-08-25|            6.4|
|         W21|2025-08-25|            8.3|
|         W46|2025-08-25|            2.1|
|          W1|2025-08-25|            6.3|
|         W36|2025-08-25|            6.3|
|         W27|2025-08-25|            0.0|
|         W15|2025-08-25|            2.1|
|         W37|2025-08-25|            6.3|
|         W48|2025-08-25|            6.4|
|         W12|2025-08-25|            4.3|
|          W3|2025-08-25|            8.5|
|          W5|2025-08-25|            6.1|
|         W31|2025-08-25|            6.2|
|         W26|2025-08-25|            2.1|
|         W41|2025-08-25|            4.2|
+------------+----------+---------

In [108]:
daily_report=daily_metrics.join(malfunction_pct,on=["warehouse_id","date"],how="inner")\
                           .join(cnt_missing,on=["warehouse_id","date"],how="inner")
daily_report.show()

+------------+----------+--------+--------+--------+------------+-----------+---------------+----------------+
|warehouse_id|      date|avg_temp|min_temp|max_temp|avg_humidity|avg_battery|malfunction_pct|missing_readings|
+------------+----------+--------+--------+--------+------------+-----------+---------------+----------------+
|         W45|2025-08-25|   24.47|    16.4|    31.2|       60.42|      75.93|            6.1|              93|
|         W44|2025-08-25|   24.86|    17.9|    31.3|       59.07|      73.81|            2.0|              94|
|         W29|2025-08-25|   25.28|    16.8|    34.4|       60.11|      73.28|            0.0|              90|
|          W7|2025-08-25|   24.47|    16.8|    34.1|       59.49|      75.84|            4.1|              93|
|         W34|2025-08-25|    25.5|    17.2|    35.1|        59.7|       74.2|            8.2|              92|
|         W17|2025-08-25|   25.14|    18.9|    32.5|       59.75|      75.33|            6.4|              89|
|

Outlier Detection Table

In [111]:
outliers=df.withColumn("outlier_type",
                       when(col("temperature")<-20,lit("Temp low"))
                       .when(col("temperature")>60,lit("Temp high"))
                       .when(col("humidity")>90,lit("High humidity")))\
                       .filter(col("outlier_type").isNotNull())\
                       .select("sensor_id","warehouse_id","date","outlier_type","temperature","humidity")
outliers.show()

+---------+------------+----------+-------------+-----------+--------+
|sensor_id|warehouse_id|      date| outlier_type|temperature|humidity|
+---------+------------+----------+-------------+-----------+--------+
|  S_W34_3|         W34|2025-08-25|     Temp low|     -100.0|    49.9|
|   S_W8_1|          W8|2025-08-25|High humidity|       22.1|    95.0|
|   S_W1_5|          W1|2025-08-25|     Temp low|     -100.0|    61.6|
|  S_W17_2|         W17|2025-08-25|High humidity|       21.5|    95.0|
|  S_W34_4|         W34|2025-08-25|High humidity|       22.9|    95.0|
|  S_W15_4|         W15|2025-08-25|     Temp low|     -100.0|    56.1|
|  S_W28_2|         W28|2025-08-25|    Temp high|      100.0|    61.9|
|  S_W39_2|         W39|2025-08-25|     Temp low|     -100.0|    52.4|
|  S_W41_5|         W41|2025-08-25|    Temp high|      100.0|    57.0|
|  S_W14_1|         W14|2025-08-25|High humidity|       29.3|    95.0|
|  S_W34_2|         W34|2025-08-25|    Temp high|      100.0|    63.2|
|  S_W