In [0]:
 
### Load raw tables, normalize columns, label step events, and build final_df ###
from pyspark.sql.functions import col, lit, when, regexp_extract, length, expr
df_device = spark.table("workspace.bronze.device_messages_raw")
df_steps  = spark.table("workspace.bronze.rapid_step_tests_raw")
def rename_if_exists(df, old, new):
    return df.withColumnRenamed(old, new) if old in df.columns else df
d = df_device
d = rename_if_exists(d, "deviceId", "device_id")
d = rename_if_exists(d, "sensorType", "sensor_type")
d = d.withColumn("timestamp", expr("timestamp_millis(timestamp)"))
dist_digits = regexp_extract(col("distance"), r"(\d+)", 1)
d = d.withColumn(
    "distance_cm",
    when(length(dist_digits) > 0, dist_digits.cast("int")).otherwise(lit(None).cast("int"))
)
s = df_steps
s = rename_if_exists(s, "deviceId", "device_id")
s = rename_if_exists(s, "startTime", "start_time")
s = rename_if_exists(s, "stopTime", "stop_time")
s = s.withColumn("start_time", expr("timestamp_millis(start_time)"))
s = s.withColumn("stop_time",  expr("timestamp_millis(stop_time)"))

# Build step windows with non-colliding key name
s_win = (
    s.select("device_id", "start_time", "stop_time")
     .dropna(subset=["device_id", "start_time", "stop_time"])
     .withColumnRenamed("device_id", "step_device_id")
)
# Join + label
labeled = (
    d.alias("d")
     .join(
         s_win.alias("s"),
         (col("d.device_id") == col("s.step_device_id")) &
         (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
         "left"
     )
     .withColumn(
         "step_label",
         when(col("s.start_time").isNotNull(), lit("step")).otherwise(lit("no_step"))
     )
     .withColumn("source_label", lit("device"))
)
# Final curated dataframe expected by the assignment queries
final_df = labeled.select(
    "timestamp",
    "sensor_type",
    "distance_cm",
    "device_id",
    "step_label",
    "source_label"
)
# Make it available to SQL as final_df (their instructions require this pattern)
final_df.createOrReplaceTempView("final_df")
 
 