%undefined
### Bronze Table Loading

This cell loads the bronze-level device messages and rapid step tests tables from the workspace catalog. It prepares the raw data for initial exploration and analysis, providing a foundation for subsequent cleaning and feature engineering steps.

In [0]:
df_device = spark.table("workspace.bronze.device_messages_raw")
df_steps = spark.table("workspace.bronze.rapid_step_tests_raw")
display(df_device)
display(df_steps)

%undefined
### Device Message Cleaning

This cell extracts numeric distance values from the raw device messages and converts them to centimeters. The result is a new column, `distance_cm`, which is ready for further analysis and feature engineering.

In [0]:
from pyspark.sql.functions import regexp_extract, col

df_device = df_device.withColumn(
    "distance_cm", regexp_extract(col("distance"), r"(\d+)", 1).cast("int")
)


%undefined
### Source Column Addition

This cell adds a new column called `source` to both DataFrames. The column indicates the origin of each rowâ€”'device' for device messages and 'step' for rapid step tests. This helps with downstream analysis, debugging, and tracking data provenance.

In [0]:
from pyspark.sql.functions import lit

df_device = df_device.withColumn("source", lit("device"))
df_steps = df_steps.withColumn("source", lit("step"))


%undefined
### Step Window Selection

This cell selects the `device_id`, `start_time`, and `stop_time` columns from the rapid step tests DataFrame. These columns define the time windows for each test, which will be used to join sensor readings and analyze device activity during specific test periods.

In [0]:
df_steps_window = df_steps.select(
    "device_id", "start_time", "stop_time"
)


%undefined
### Step Labeling

This cell joins device messages with step test windows and labels each sensor reading as either 'step' or 'no_step' depending on whether its timestamp falls within a test window. This labeling is essential for supervised ML tasks and feature engineering.

In [0]:
from pyspark.sql.functions import when

df_labeled = (
    df_device.alias("d")
    .join(
        df_steps_window.alias("s"),
        (col("d.device_id") == col("s.device_id")) &
        (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
        "left"
    )
    .withColumn(
        "step_label",
        when(col("s.start_time").isNotNull(), "step").otherwise("no_step")
    )
)


%undefined
### Final Feature Selection

This cell selects the relevant columns for the final Silver table, including timestamp, sensor type, distance in centimeters, device ID, step label, and source. The resulting DataFrame is ready for downstream analysis and machine learning tasks.

In [0]:
df_final = df_labeled.select(
    "timestamp",
    "sensor_type",
    "distance_cm",
    "device_id",
    "step_label",
    "source"
)


Databricks Free Tier requires USING DATABASE first:

In [0]:
spark.sql("USE workspace.silver")

%undefined
### Disambiguate Device ID

This cell resolves the ambiguity caused by having device_id columns from both device messages and step test windows after the join. It explicitly selects d.device_id for the final Silver table, ensuring the correct device identifier is used for downstream analysis and machine learning.

In [0]:
from pyspark.sql.functions import col

df_final = df_labeled.select(
    "timestamp",
    "sensor_type",
    "distance_cm",
    col("d.device_id").alias("device_id"),
    "step_label",
    "source"
)


In [0]:
df_final.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("silver.labeled_step_test")

In [0]:
%sql
-- %sql
SELECT
  step_label,
  COUNT(*) AS row_count
FROM silver.labeled_step_test
GROUP BY step_label;


In [0]:
%sql
-- %sql
SELECT *
FROM silver.labeled_step_test
WHERE step_label NOT IN ('step', 'no_step')
   OR step_label IS NULL
LIMIT 50;


In [0]:
%sql
-- %sql
SELECT
  source,
  COUNT(*) AS row_count
FROM silver.labeled_step_test
GROUP BY source;


In [0]:
%sql
SELECT *
FROM silver.labeled_step_test
WHERE source NOT IN ('device', 'step')
   OR source IS NULL
LIMIT 50;


%undefined
### Ethics Check

* **Fair Labeling:** Data is labeled using transparent, rule-based logic based on objective sensor readings and test windows. No subjective or biased criteria are applied.
* **Identity Protection:** Only device and customer IDs are used; no personally identifiable information (PII) is exposed or processed. Data access is restricted to authorized users.
* **Avoiding Medical Claims:** This analysis does not interpret or diagnose medical conditions. All results are for research and engineering purposes, not for clinical or health advice.