## Part 2 â€” Python (PySpark/Pandas) Basics

#### 2.1 Load tables

In [0]:
dm = spark.table("workspace.bronze.device_messages_raw")
rt = spark.table("workspace.bronze.rapid_step_tests_raw")

dm.printSchema()
rt.printSchema()

#### 2.2 Clean distance to numeric centimeters

In [0]:
%python
from pyspark.sql import functions as F

dm_clean = (
    dm
    .withColumn("distance_str", F.col("distance").cast("string"))
    .withColumn("distance_cm", F.regexp_replace("distance_str", "[^0-9.]", "").cast("double"))
    .withColumn("ts_ms", F.col("timestamp").cast("bigint"))
    .withColumn("date_ms", F.col("date").cast("bigint"))
)

display(dm_clean.limit(10))

#### 2.3 Per-device descriptive stats

In [0]:

dm_stats = (
    dm_clean
    .groupBy("device_id")
    .agg(
        F.count("*").alias("n"),
        F.avg("distance_cm").alias("avg_cm"),
        F.min("distance_cm").alias("min_cm"),
        F.max("distance_cm").alias("max_cm")
    )
    .orderBy(F.desc("n"))
)

display(dm_stats.limit(20))


#### 2.4 Explode step_points to long form

In [0]:

rt_exploded = (
    rt
    .select(
        "customer", "device_id", "start_time", "stop_time",
        "test_time", "total_steps",
        F.posexplode("step_points").alias("step_index", "step_ms")
    )
)

display(rt_exploded.limit(20))

# Step timing stats per test

step_stats = (rt_exploded
              .groupBy("customer","device_id","start_time","stop_time")
              .agg(F.count("*").alias("steps"),
                   F.avg("step_ms").alias("avg_step_ms"),
F.stddev("step_ms").alias("sd_step_ms"))
.orderBy(F.desc("steps"))
)
display(step_stats.limit(20))




#### 2.5 Windowed join: messages within each test (feature prep)
This mirrors the SQL in 1.7 (join on device_id + [start_time, stop_time]).


In [0]:
%python
msgs = dm_clean.select("device_id", "ts_ms", "distance_cm") \
               .where(F.col("distance_cm").isNotNull())

tests = rt.select("customer", "device_id", "start_time", "stop_time", "test_time", "total_steps")

joined = (
    tests.alias("t")
    .join(
        msgs.alias("m"),
        (F.col("m.device_id") == F.col("t.device_id")) &
        (F.col("m.ts_ms").between(F.col("t.start_time"), F.col("t.stop_time"))),
        how="inner"
    )
)

features = (
    joined
    .groupBy("t.customer", "t.device_id", "t.start_time", "t.stop_time", "t.test_time", "t.total_steps")
    .agg(
        F.count("m.ts_ms").alias("readings_in_window"),
        F.avg("m.distance_cm").alias("avg_cm_in_window"),
        F.min("m.distance_cm").alias("min_cm_in_window"),
        F.max("m.distance_cm").alias("max_cm_in_window"),
        F.var_pop("m.distance_cm").alias("var_cm_in_window")
    )
    .orderBy(F.desc("readings_in_window"))
)

display(features.limit(20))


#### 2.6 Quick visual check

In [0]:
# Small sample for a simple line plot of distances over time for one device
import pandas as pd
import matplotlib.pyplot as plt
sample_device = features.select("device_id").first()["device_id"]
pdf = (dm_clean
.filter(F.col("device_id")==sample_device)
.orderBy("ts_ms")
.limit(1000)
.select("ts_ms","distance_cm")
.toPandas())
plt.figure()
plt.plot(pdf["ts_ms"], pdf["distance_cm"])
plt.title(f"Distance over time (device {sample_device})")
plt.xlabel("timestamp (ms)")
plt.ylabel("distance (cm)")
plt.show()


This Python section mirrors the SQL
steps, but prepares a compact features table (avg/min/max/variance of
distance within each test window). We will reuse this features table in ML
weeks.

Cleaning and querying the STEDI data was straightforward once the schemas were understood, but time-based joins required careful attention to timestamps. One ethics risk is incorrect data labeling or casting (such as distance values), which could lead to misleading features and harmful conclusions in health-related models. Honesty and accuracy in data preparation reflect our responsibility as disciples to seek truth.