In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('IoT Data Processing') \
    .master('local[*]') \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
from pyspark.sql.types import *
from datetime import datetime, timedelta

# schema for IoT data
IoT_Schema = StructType([
    StructField("sensor_id", StringType(), False, metadata={"description": "Unique identifier for the sensor"}),
    StructField("record_dt", TimestampType(), False, metadata={"description": "Unique identifier for the sensor"}),
    StructField("temperature", FloatType(), False, metadata={"description": "Temperature reading from the sensor"}),
    StructField("location", StructType([
        StructField("latitude", FloatType(), False, metadata={"description": "Latitude of the sensor location"}),
        StructField("longitude", FloatType(), False, metadata={"description": "Longitude of the sensor location"}),
        StructField("altitude", FloatType(), False, metadata={"description": "Altitude of the sensor location"})
    ])),
    StructField("status", StringType(), False, metadata={"description": "Status of the sensor (e.g., active, inactive)"}),
    StructField("readings", MapType(StringType(), FloatType()), False, metadata={"description": "Map of additional readings from the sensor"}),
    StructField("brake_limit_timestamps", ArrayType(TimestampType()), False, metadata={"description": "List of timestamps when the brake limit was reached"}),
])

In [5]:
import random

base_time = datetime.now()
data = []
for sensor_id in range(0, 10):
    for i in range(0, 100):
        sensor_data = (
            f"sensor_{sensor_id:03d}",
            base_time + timedelta(minutes=i*5),
            round(random.uniform(20, 30), 2),
            (40.7128 + random.uniform(-0.1, 0.1), -74.0060 + random.uniform(-0.1, 0.1), random.uniform(0, 100)),
            random.choice(["active", "inactive", "maintenance"]),
            {
                "humidity": round(random.uniform(40, 60), 2),
                "pressure": round(random.uniform(1000, 1020), 2),
                "battery_lvl": round(random.uniform(0, 100), 2),
            },
            [base_time + timedelta(minutes=(i*5), seconds=-random.randint(0, 5*60)) for _ in range(random.randint(0, 5))]
        )
        data.append(sensor_data)

df = spark.createDataFrame(data, schema=IoT_Schema)
df.show(10, truncate=False)

+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+
|sensor_id |record_dt                 |temperature|location                          |status     |readings                                                      |brake_limit_timestamps                                                                                          |
+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+
|sensor_000|2025-06-25 14:35:14.468232|20.49      |{40.81259, -74.07255, 9.40678}    |inactive   |{humidity -> 45.04, pressure -> 1017.19, battery_lvl -> 96.48}|[2025-06-25 14

In [14]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [7]:
df.select('*', 
          col('location.latitude').alias('latitude'),
          col('location.longitude').alias('longitude'),
          col('readings.humidity').alias('humidity'),
          col('readings.pressure').alias('pressure'),
).show(10, truncate=False)

+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+----------+--------+--------+
|sensor_id |record_dt                 |temperature|location                          |status     |readings                                                      |brake_limit_timestamps                                                                                          |latitude |longitude |humidity|pressure|
+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+----------+--------+--------+
|sensor_000|2025-06-25 14:35:14.468232|20.49      |{40.812

In [24]:
df_dt_adj = df.withColumn('adjust_date', lit(timedelta(0, 0, 2, 12, 30, 0)))
df_dt_adj = df_dt_adj.withColumn('new_date', col('record_dt') - lit(timedelta(0, 0, 2, 12, 30, 0)))
df_dt_adj.show(10, truncate=False)

+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+------------------------------------------+-------------------------+
|sensor_id |record_dt                 |temperature|location                          |status     |readings                                                      |brake_limit_timestamps                                                                                          |adjust_date                               |new_date                 |
+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+--------------------------------------

In [25]:
df_dt_adj.columns
df_dt_adj.dtypes

[('sensor_id', 'string'),
 ('record_dt', 'timestamp'),
 ('temperature', 'float'),
 ('location', 'struct<latitude:float,longitude:float,altitude:float>'),
 ('status', 'string'),
 ('readings', 'map<string,float>'),
 ('brake_limit_timestamps', 'array<timestamp>'),
 ('adjust_date', 'interval day to second'),
 ('new_date', 'timestamp')]

## Filters
Tips watch the "()" are surrounding the now function commands most involving ==, =!, >, <=, etc

In [None]:
cnt = df.filter(col('status') == 'active').count()
print(type(cnt))
print(cnt)


<class 'int'>
362


In [39]:
df.filter((col('status') == 'active') & col('location.latitude').between(40.7,40.8) | col('location.longitude').between(-74.1, -74.0)).count()

574

In [53]:
df.filter(size(col('brake_limit_timestamps')) == 0) \
    .withColumn('2nd_brake', coalesce(element_at(col('brake_limit_timestamps'), 1), lit(datetime.now()))) \
        .show(10, truncate=False)

+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------+--------------------------+
|sensor_id |record_dt                 |temperature|location                         |status     |readings                                                      |brake_limit_timestamps|2nd_brake                 |
+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------+--------------------------+
|sensor_000|2025-06-25 14:45:14.468232|23.66      |{40.65895, -73.96245, 50.49276}  |active     |{humidity -> 51.71, pressure -> 1019.16, battery_lvl -> 50.12}|[]                    |2025-06-25 15:37:12.013388|
|sensor_000|2025-06-25 15:30:14.468232|27.15      |{40.72172, -73.994385, 49.673744}|inactive   |{humidity -> 43.25, pressure -> 1008.63, battery_lvl -> 8.9

In [55]:
df.select('sensor_id', 'record_dt', 'readings', explode('readings')).show(10, truncate=False)

+----------+--------------------------+--------------------------------------------------------------+-----------+-------+
|sensor_id |record_dt                 |readings                                                      |key        |value  |
+----------+--------------------------+--------------------------------------------------------------+-----------+-------+
|sensor_000|2025-06-25 14:35:14.468232|{humidity -> 45.04, pressure -> 1017.19, battery_lvl -> 96.48}|humidity   |45.04  |
|sensor_000|2025-06-25 14:35:14.468232|{humidity -> 45.04, pressure -> 1017.19, battery_lvl -> 96.48}|pressure   |1017.19|
|sensor_000|2025-06-25 14:35:14.468232|{humidity -> 45.04, pressure -> 1017.19, battery_lvl -> 96.48}|battery_lvl|96.48  |
|sensor_000|2025-06-25 14:40:14.468232|{humidity -> 49.66, pressure -> 1001.45, battery_lvl -> 81.58}|humidity   |49.66  |
|sensor_000|2025-06-25 14:40:14.468232|{humidity -> 49.66, pressure -> 1001.45, battery_lvl -> 81.58}|pressure   |1001.45|
|sensor_000|2025

In [59]:
# to keep null values in the exploded array
df.select('sensor_id', 'record_dt', 'brake_limit_timestamps', explode_outer('brake_limit_timestamps')).show(10, truncate=False)


+----------+--------------------------+----------------------------------------------------------------------------------------------------------------+--------------------------+
|sensor_id |record_dt                 |brake_limit_timestamps                                                                                          |col                       |
+----------+--------------------------+----------------------------------------------------------------------------------------------------------------+--------------------------+
|sensor_000|2025-06-25 14:35:14.468232|[2025-06-25 14:33:42.468232, 2025-06-25 14:34:25.468232, 2025-06-25 14:34:21.468232, 2025-06-25 14:30:50.468232]|2025-06-25 14:33:42.468232|
|sensor_000|2025-06-25 14:35:14.468232|[2025-06-25 14:33:42.468232, 2025-06-25 14:34:25.468232, 2025-06-25 14:34:21.468232, 2025-06-25 14:30:50.468232]|2025-06-25 14:34:25.468232|
|sensor_000|2025-06-25 14:35:14.468232|[2025-06-25 14:33:42.468232, 2025-06-25 14:34:25.468232, 2025

# The Tables In SQL
Create a temporary view to use the data frame in SQL.
The explode function only works for Map and Array Types.

In [65]:
df.createOrReplaceTempView("iot_data")

df_sql = spark.sql("""
select *
from iot_data
lateral view explode(readings) as reading_key, reading_value
""")
df_sql.show(10, truncate=False)

+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------+-------------+
|sensor_id |record_dt                 |temperature|location                         |status     |readings                                                      |brake_limit_timestamps                                                                                          |reading_key|reading_value|
+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------+-------------+
|sensor_000|2025-06-25 14:35:14.468232|20.49      |{40.81259, -74.07255, 9.40678}   |inactive   |{hu

In [67]:
df.createOrReplaceTempView("iot_data")

df_sql = spark.sql("""
select *
    ,location.latitude as latitude
    ,location.longitude as longitude
from iot_data
""")
df_sql.show(10, truncate=False)

+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+----------+
|sensor_id |record_dt                 |temperature|location                          |status     |readings                                                      |brake_limit_timestamps                                                                                          |latitude |longitude |
+----------+--------------------------+-----------+----------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+----------+
|sensor_000|2025-06-25 14:35:14.468232|20.49      |{40.81259, -74.07255, 9.40678}    |inactive   |{humidity -> 4

In [86]:
df.createOrReplaceTempView("iot_data")

df_sql = spark.sql("""
select iot.*
    , brake_pos
    , brake_pos_timestamp
from iot_data as iot
lateral view posexplode(brake_limit_timestamps) as brake_pos, brake_pos_timestamp
""")
df_sql.show(10, truncate=False)

+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+--------------------------+
|sensor_id |record_dt                 |temperature|location                         |status     |readings                                                      |brake_limit_timestamps                                                                                          |brake_pos|brake_pos_timestamp       |
+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------+--------------------------+
|sensor_000|2025-06-25 14:35:14.468232|20.49      |{40.81259, -74.0

In [88]:
df_win_prep = df.withColumn('brake_limits_final', array_sort(col('brake_limit_timestamps'))) \
    .withColumn('brake_limit_min', array_min(col('brake_limit_timestamps'))) \
    .withColumn('brake_limit_max', array_min(col('brake_limit_timestamps'))) \
    .select('*', posexplode(col('brake_limits_final')).alias('brake_pos', 'brake_pos_timestamp'))

df_win_prep.show(10, truncate=False)

+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+---------+--------------------------+
|sensor_id |record_dt                 |temperature|location                         |status     |readings                                                      |brake_limit_timestamps                                                                                          |brake_limits_final                                                                                              |brake_limit_min           |brake_limit_max           |brake_pos|brake_pos_timestamp       |
+----------+--------------------------+-----

## Window Functions
Rank and cut the time series data

In [None]:
# running max temperature per sensor
window_temp = Window.partitionBy("sensor_id").orderBy("record_dt").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_temp_rev = Window.partitionBy("sensor_id").orderBy(desc("record_dt")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_temp_max = df.withColumn("running_max_temp", max("temperature").over(window_temp)) \
    .withColumn("sensor_reading_order", row_number().over(window_temp)) \
    .withColumn("running_max_temp_rev", max("temperature").over(window_temp_rev)) \
    .select("sensor_id", "record_dt", "temperature", "sensor_reading_order", "running_max_temp", "running_max_temp_rev").orderBy("sensor_id", asc("record_dt"))

df_temp_max.show(50, truncate=False)

## Lead Lag

In [105]:
window_brakes_delta = Window.partitionBy("sensor_id", "record_dt").orderBy("brake_limit_dt")

# Split out array, and add conditions of time pre recording
df_delta = df.withColumn("isActive", col("status") == "active") \
    .select('*', explode_outer(array_sort('brake_limit_timestamps')).alias('brake_limit_dt')) \
    .withColumn("brake_recording_bucket", when(col("brake_limit_dt").isNull(), None).when(col("brake_limit_dt") - col("record_dt") < timedelta(seconds=-180), "EARLY") \
                .when(col("brake_limit_dt") - col("record_dt") < timedelta(seconds=-90), "MIDDLE") \
                    .otherwise("LATE"))
    # .show(10, truncate=False) 

df_delta = df_delta.withColumn("brake_delta", col("brake_limit_dt") - lag(col("brake_limit_dt")).over(window_brakes_delta))

df_delta.show(10, truncate=False)


+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+--------------------------+----------------------+-----------------------------------+
|sensor_id |record_dt                 |temperature|location                         |status     |readings                                                      |brake_limit_timestamps                                                                                          |isActive|brake_limit_dt            |brake_recording_bucket|brake_delta                        |
+----------+--------------------------+-----------+---------------------------------+-----------+--------------------------------------------------------------+------------------------------------------------------------------------------------------------------

In [109]:
df_delta.printSchema()
for dtype in df_delta.schema:
    print(f"{dtype.name}: {dtype.dataType}")

root
 |-- sensor_id: string (nullable = false)
 |-- record_dt: timestamp (nullable = false)
 |-- temperature: float (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- latitude: float (nullable = false)
 |    |-- longitude: float (nullable = false)
 |    |-- altitude: float (nullable = false)
 |-- status: string (nullable = false)
 |-- readings: map (nullable = false)
 |    |-- key: string
 |    |-- value: float (valueContainsNull = true)
 |-- brake_limit_timestamps: array (nullable = false)
 |    |-- element: timestamp (containsNull = true)
 |-- isActive: boolean (nullable = false)
 |-- brake_limit_dt: timestamp (nullable = true)
 |-- brake_recording_bucket: string (nullable = true)
 |-- brake_delta: interval day to second (nullable = true)

sensor_id: StringType()
record_dt: TimestampType()
temperature: FloatType()
location: StructType([StructField('latitude', FloatType(), False), StructField('longitude', FloatType(), False), StructField('altitude', FloatType(), Fals

In [120]:
df_delta.select("sensor_id", "brake_delta").selectExpr("*", "extract(minute from brake_delta) as brake_delta_minutes",
                "cast(extract(second from brake_delta) as INT) as brake_delta_seconds") \
    .show(10, truncate=False)

+----------+-----------------------------------+-------------------+-------------------+
|sensor_id |brake_delta                        |brake_delta_minutes|brake_delta_seconds|
+----------+-----------------------------------+-------------------+-------------------+
|sensor_000|NULL                               |NULL               |NULL               |
|sensor_000|INTERVAL '0 00:02:52' DAY TO SECOND|2                  |52                 |
|sensor_000|INTERVAL '0 00:00:39' DAY TO SECOND|0                  |39                 |
|sensor_000|INTERVAL '0 00:00:04' DAY TO SECOND|0                  |4                  |
|sensor_000|NULL                               |NULL               |NULL               |
|sensor_000|NULL                               |NULL               |NULL               |
|sensor_000|NULL                               |NULL               |NULL               |
|sensor_000|INTERVAL '0 00:01:28' DAY TO SECOND|1                  |28                 |
|sensor_000|INTERVAL 

# Make timedelta and pandas datetime UDFs

In [124]:
import pandas as pd
from pyspark.sql.functions import udf, pandas_udf

@udf(returnType=IntegerType())
def to_seconds(delta):
    if delta is None:
        return None
    return int(delta.total_seconds())


In [129]:
@pandas_udf(returnType=IntegerType())
def to_seconds_pandas(delta: pd.Series) -> pd.Series:
    return delta.dt.total_seconds()

In [135]:
# register the UDF later 
def to_seconds_postreg(delta):
    if delta is None:
        return None
    return int(delta.total_seconds())


In [136]:
to_seconds_postreg_udf = udf(to_seconds_postreg, returnType=IntegerType())
df_delta.select("sensor_id", "record_dt", "brake_limit_dt", "brake_recording_bucket", "brake_delta", 
                to_seconds(col("brake_delta")).alias("brake_delta_seconds"),
                to_seconds_postreg_udf(col("brake_delta")).alias("brake_delta_seconds_postreg"),
                to_seconds_pandas(col("brake_delta")).alias("brake_delta_seconds_pandas")
                ).show(10, truncate=False)

+----------+--------------------------+--------------------------+----------------------+-----------------------------------+-------------------+---------------------------+--------------------------+
|sensor_id |record_dt                 |brake_limit_dt            |brake_recording_bucket|brake_delta                        |brake_delta_seconds|brake_delta_seconds_postreg|brake_delta_seconds_pandas|
+----------+--------------------------+--------------------------+----------------------+-----------------------------------+-------------------+---------------------------+--------------------------+
|sensor_000|2025-06-25 14:35:14.468232|2025-06-25 14:30:50.468232|EARLY                 |NULL                               |NULL               |NULL                       |NULL                      |
|sensor_000|2025-06-25 14:35:14.468232|2025-06-25 14:33:42.468232|MIDDLE                |INTERVAL '0 00:02:52' DAY TO SECOND|172                |172                        |172                    

In [138]:
df_delta.createOrReplaceTempView("iot_data_delta")
spark.udf.register("to_seconds", to_seconds)
spark.udf.register("to_seconds_postreg", to_seconds_postreg, IntegerType())
spark.udf.register("to_seconds_pandas", to_seconds_pandas)

df_sql = spark.sql("""
     SELECT sensor_id, record_dt, brake_limit_dt, brake_recording_bucket, brake_delta,
       to_seconds(brake_delta) AS brake_delta_seconds,
         to_seconds_postreg(brake_delta) AS brake_delta_seconds_postreg,
       to_seconds_pandas(brake_delta) AS brake_delta_seconds_pandas
    FROM iot_data_delta
""")
df_sql.show(10, truncate=False)

+----------+--------------------------+--------------------------+----------------------+-----------------------------------+-------------------+---------------------------+--------------------------+
|sensor_id |record_dt                 |brake_limit_dt            |brake_recording_bucket|brake_delta                        |brake_delta_seconds|brake_delta_seconds_postreg|brake_delta_seconds_pandas|
+----------+--------------------------+--------------------------+----------------------+-----------------------------------+-------------------+---------------------------+--------------------------+
|sensor_000|2025-06-25 14:35:14.468232|2025-06-25 14:30:50.468232|EARLY                 |NULL                               |NULL               |NULL                       |NULL                      |
|sensor_000|2025-06-25 14:35:14.468232|2025-06-25 14:33:42.468232|MIDDLE                |INTERVAL '0 00:02:52' DAY TO SECOND|172                |172                        |172                    

In [None]:
df_delta.withColumn("brake_too_hot", (extract(col("brake_delta"), "minutes") *  < timedelta(seconds=60)).over(window_brakes_delta)) \


In [None]:
# Find time since last brake limit was reached, in terms of actual time as well as record_dt window

## Higher Order Functions
They are faster than UDFs and increasingly preferred, but bad for complex use cases.
Work similar to python  lambda functions.

In [141]:
# Process sensor readings with higher-order functions
sensor_data = spark.table("iot_data_delta")

cleaned_data = sensor_data.select(
    "sensor_id",
    "record_dt",
    # Remove outliers and null values
    expr("""
        filter(
            sensor_readings, 
            reading -> reading.value IS NOT NULL 
                AND reading.value BETWEEN -50 AND 150
        )
    """).alias("valid_readings"),
    
    # Calculate average temperature
    expr("""
        aggregate(
            filter(sensor_readings, r -> r.type = 'temperature'),
            0.0,
            (acc, reading) -> acc + reading.value,
            acc -> acc / size(filter(sensor_readings, r -> r.type = 'temperature'))
        )
    """).alias("avg_temperature")
)

cleaned_data.write.format("delta").mode("overwrite").saveAsTable("processed_sensors")

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `sensor_readings` cannot be resolved. Did you mean one of the following? [`readings`, `sensor_id`, `location`, `record_dt`, `brake_delta`].; line 3 pos 12;
'Project [sensor_id#0, record_dt#1, 'filter('sensor_readings, lambdafunction((isnotnull(lambda 'reading.value) AND ((lambda 'reading.value >= -50) AND (lambda 'reading.value <= 150))), lambda 'reading, false)) AS valid_readings#2900, 'aggregate('filter('sensor_readings, lambdafunction((lambda 'r.type = temperature), lambda 'r, false)), 0.0, lambdafunction((lambda 'acc + lambda 'reading.value), lambda 'acc, lambda 'reading, false), lambdafunction((lambda 'acc / 'size('filter(lambda 'sensor_readings, lambdafunction((lambda 'r.type = temperature), lambda 'r, false)))), lambda 'acc, false)) AS avg_temperature#2901]
+- SubqueryAlias iot_data_delta
   +- View (`iot_data_delta`, [sensor_id#0,record_dt#1,temperature#2,location#3,status#4,readings#5,brake_limit_timestamps#6,isActive#2457,brake_limit_dt#2469,brake_recording_bucket#2479,brake_delta#2490])
      +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, isActive#2457, brake_limit_dt#2469, brake_recording_bucket#2479, brake_delta#2490]
         +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, isActive#2457, brake_limit_dt#2469, brake_recording_bucket#2479, _we0#2491, (brake_limit_dt#2469 - _we0#2491) AS brake_delta#2490]
            +- Window [lag(brake_limit_dt#2469, -1, null) windowspecdefinition(sensor_id#0, record_dt#1, brake_limit_dt#2469 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2491], [sensor_id#0, record_dt#1], [brake_limit_dt#2469 ASC NULLS FIRST]
               +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, isActive#2457, brake_limit_dt#2469, brake_recording_bucket#2479]
                  +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, isActive#2457, brake_limit_dt#2469, CASE WHEN isnull(brake_limit_dt#2469) THEN cast(null as string) WHEN ((brake_limit_dt#2469 - record_dt#1) < INTERVAL '-0 00:03:00' DAY TO SECOND) THEN EARLY WHEN ((brake_limit_dt#2469 - record_dt#1) < INTERVAL '-0 00:01:30' DAY TO SECOND) THEN MIDDLE ELSE LATE END AS brake_recording_bucket#2479]
                     +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, isActive#2457, brake_limit_dt#2469]
                        +- Generate explode(array_sort(brake_limit_timestamps#6, lambdafunction(if ((isnull(lambda left#2467) AND isnull(lambda right#2468))) 0 else if (isnull(lambda left#2467)) 1 else if (isnull(lambda right#2468)) -1 else if ((lambda left#2467 < lambda right#2468)) -1 else if ((lambda left#2467 > lambda right#2468)) 1 else 0, lambda left#2467, lambda right#2468, false), false)), true, [brake_limit_dt#2469]
                           +- Project [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6, (status#4 = active) AS isActive#2457]
                              +- LogicalRDD [sensor_id#0, record_dt#1, temperature#2, location#3, status#4, readings#5, brake_limit_timestamps#6], false
