In [0]:
%sql
SELECT 'device_message_raw' AS table_name, COUNT(*) AS rows FROM workspace.bronze.device_message_raw
UNION ALL
SELECT 'rapid_step_test_raw', COUNT(*) FROM workspace.bronze.rapid_step_test_raw;

In [0]:
%sql
SELECT device_id, COUNT(*) AS messages
FROM workspace.bronze.device_message_raw
GROUP BY device_id
ORDER BY messages DESC
LIMIT 10;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW device_messages_clean AS
SELECT
  device_id,
  TRY_CAST(REGEXP_REPLACE(CAST(distance AS STRING), '[^0-9.]', '') AS DOUBLE) AS distance_cm,
  CAST(timestamp AS BIGINT) AS ts_ms,
  CAST(date AS BIGINT) AS date_ms,
  message_origin,
  sensor_type,
  message
FROM workspace.bronze.device_message_raw;

-- Check the cleaning
SELECT * FROM device_messages_clean LIMIT 20

In [0]:
%sql
SELECT device_id,
AVG(distance_cm) AS avg_cm,
MIN(distance_cm) AS min_cm,
MAX(distance_cm) AS max_cm
FROM device_messages_clean
GROUP BY device_id
ORDER BY avg_cm DESC
LIMIT 10;

In [0]:
%sql
SELECT device_id,
AVG(distance_cm) AS avg_cm,
MIN(distance_cm) AS min_cm,
MAX(distance_cm) AS max_cm
FROM device_messages_clean
GROUP BY device_id
ORDER BY avg_cm DESC
LIMIT 10;

In [0]:
%sql
-- One row per step interval
WITH exploded AS (
SELECT
customer,
device_id,
start_time,
posexplode(step_points) AS (step_index, step_ms)
FROM workspace.bronze.rapid_step_test_raw
)
SELECT customer, device_id,
COUNT(*) AS steps,
AVG(step_ms) AS avg_step_ms,
STDDEV(step_ms) AS sd_step_ms
FROM exploded
GROUP BY customer, device_id
ORDER BY steps DESC
LIMIT 10

In [0]:
%sql
-- Join messages that occurred during each test window for the same device
WITH tests AS (
SELECT customer, device_id, start_time, stop_time, test_time, total_steps
FROM workspace.bronze.rapid_step_test_raw
),
msgs AS (
SELECT device_id, ts_ms, distance_cm
FROM device_messages_clean
WHERE distance_cm IS NOT NULL
)
SELECT
t.customer, t.device_id, t.start_time, t.stop_time,
COUNT(m.ts_ms) AS readings_in_window,
AVG(m.distance_cm) AS avg_cm_in_window,
MIN(m.distance_cm) AS min_cm_in_window,
MAX(m.distance_cm) AS max_cm_in_window
FROM tests t
JOIN msgs m
ON m.device_id = t.device_id
AND m.ts_ms BETWEEN t.start_time AND t.stop_time
GROUP BY t.customer, t.device_id, t.start_time, t.stop_time
ORDER BY readings_in_window DESC
LIMIT 20;

In [0]:
# Spark DataFrames from the catalog
dm = spark.table("workspace.bronze.device_message_raw")
rt = spark.table("workspace.bronze.rapid_step_test_raw")
dm.printSchema()
rt.printSchema()

In [0]:
from pyspark.sql import functions as F
dm_clean = (dm
.withColumn("distance_str", F.col("distance").cast("string"))
.withColumn("distance_cm", F.regexp_replace("distance_str", "[^0-9.]", "").cast("double"))
.withColumn("ts_ms", F.col("timestamp").cast("bigint"))
.withColumn("date_ms", F.col("date").cast("bigint"))
)
display(dm_clean.limit(10))

In [0]:
dm_stats = (dm_clean
.groupBy("device_id")
.agg(F.count("*").alias("n"),
F.avg("distance_cm").alias("avg_cm"),
F.min("distance_cm").alias("min_cm"),
F.max("distance_cm").alias("max_cm"))
.orderBy(F.desc("n"))
)
display(dm_stats.limit(20))

In [0]:
rt_exploded = (rt
.select("customer","device_id","start_time","stop_time","test_time","total_steps",
F.posexplode("step_points").alias("step_index","step_ms"))
)
display(rt_exploded.limit(20))
# Step timing stats per test
step_stats = (rt_exploded
.groupBy("customer","device_id","start_time","stop_time")
.agg(F.count("*").alias("steps"),
F.avg("step_ms").alias("avg_step_ms"),
F.stddev("step_ms").alias("sd_step_ms"))
.orderBy(F.desc("steps"))
)
display(step_stats.limit(20))

In [0]:
# Filter to valid numeric distances
msgs = dm_clean.select("device_id", "ts_ms", "distance_cm").where(F.col("distance_cm").isNotNull())
tests = rt.select("customer","device_id","start_time","stop_time","test_time","total_steps")
joined = (tests.alias("t")
.join(msgs.alias("m"),
(F.col("m.device_id")==F.col("t.device_id")) &
(F.col("m.ts_ms").between(F.col("t.start_time"), F.col("t.stop_time"))),
how="inner")
)
features = (joined
.groupBy("t.customer","t.device_id","t.start_time","t.stop_time","t.test_time","t.total_steps")
.agg(F.count("m.ts_ms").alias("readings_in_window"),
F.avg("m.distance_cm").alias("avg_cm_in_window"),
F.min("m.distance_cm").alias("min_cm_in_window"),
F.max("m.distance_cm").alias("max_cm_in_window"),
F.var_pop("m.distance_cm").alias("var_cm_in_window"))
.orderBy(F.desc("readings_in_window"))
)
display(features.limit(20))

In [0]:

# Small sample for a simple line plot of distances over time for one device
import pandas as pd
import matplotlib.pyplot as plt
sample_device = features.select("device_id").first()["device_id"]
pdf = (dm_clean
.filter(F.col("device_id")==sample_device)
.orderBy("ts_ms")
.limit(1000)
.select("ts_ms","distance_cm")
.toPandas())
plt.figure()
plt.plot(pdf["ts_ms"], pdf["distance_cm"])
plt.title(f"Distance over time (device {sample_device})")
plt.xlabel("timestamp (ms)")
plt.ylabel("distance (cm)")
plt.show()

This Python section mirrors the SQL
steps, but prepares a compact features table (avg/min/max/variance of
distance within each test window). We will reuse this features table in ML
weeks.”

1.What was easy? It was easy making the blocks of code for this assignment. 2.What was confusing? I didn't understand why some of my code blocks have "t." in red,  3.One ethics risk you noticed. It is really easy to jus use ai to fix the code but it is very important that you understand what is going on