**Part 1 — Load the Data**

In this step, we load the two raw STEDI datasets from the bronze layer into Spark DataFrames.
These tables contain the original, unprocessed data that will be cleaned and curated in later steps.

**device_message_raw** contains continuous sensor readings

**rapid_step_test_raw** contains step test sessions with start and stop times

Previewing the data helps verify that the tables loaded correctly and that the schemas match expectations.

In [0]:
df_device = spark.table("workspace.bronze.device_messages")
df_steps = spark.table("workspace.bronze.rapid_step_tests")

# Preview tables
display(df_device)
display(df_steps)

**Part 2 — Prepare Each Table**

Before aligning the datasets, we need to clean and enrich them so they are easier to work with and clearly documented.

**A. Clean / Convert Distance**

The distance column in the device messages table is stored as a string.
Machine learning models require numeric values, so we extract the numeric portion and convert it to an integer.
This creates a new column called **distance_cm** that can be used for analysis and modeling.


In [0]:
from pyspark.sql.functions import regexp_extract, col, lit

# Extract numeric distance from string (e.g., "1cm" → 1)
df_device = df_device.withColumn(
    "distance_cm", regexp_extract(col("distance"), r"(\d+)", 1).cast("int")
)

**B. Add Source Labels**

To maintain data traceability, we add a source column to both datasets.

Rows from **device_message_raw** are labeled as "**device**"

Rows from **rapid_step_test_raw** are labeled as "**step**"

This makes it clear where each record originated and helps with debugging and validation later in the pipeline.

In [0]:
df_device = df_device.withColumn("source", lit("device"))
df_steps = df_steps.withColumn("source", lit("step"))

**Part 3 — Label Each Sensor Reading**

This is the most important step of the ETL process.
Here, we determine whether each sensor reading occurred during a real stepping session.

**A. Extract Step Windows**

Each Rapid Step Test includes a **start_time** and **stop_time** that define when the user was actively stepping.
We extract only the necessary columns (**device_id**, **start_time**, and **stop_time**) to create clean step windows that will be used for timestamp alignment.

In [0]:
df_steps_window = df_steps.select(
    col("device_id"),
    col("start_time"),
    col("stop_time")
)

**B. Join Device Messages with Step Windows**

We perform a **left join** between device messages and step windows using:

Matching **device_id**

A timestamp condition where the sensor timestamp falls between the step test start and stop times

If a sensor reading falls inside a step window, it is labeled as "**step**".
If it does not, it is labeled as "**no_step**".
This produces a labeled dataset suitable for supervised machine learning.

In [0]:
from pyspark.sql.functions import when

df_labeled = (
    df_device.alias("d")
    .join(
        df_steps_window.alias("s"),
        (col("d.device_id") == col("s.device_id")) &
        (col("d.timestamp").between(col("s.start_time"), col("s.stop_time"))),
        "left"
    )
    .withColumn(
        "step_label",
        when(col("s.start_time").isNotNull(), "step").otherwise("no_step")
    )
)

**Part 4 — Keep Only the Required Columns**

After labeling the data, we select only the columns required for the curated (silver layer) dataset:

**timestamp**

**sensor_type**

**distance_cm**

**device_id**

**step_label**

**source**

Removing unnecessary columns keeps the dataset clean, efficient, and focused on modeling needs.

In [0]:
df_final = df_labeled.select(
    "timestamp",
    "sensor_type",
    "distance_cm",
    "d.device_id",
    "step_label",
    "source"
)

display(df_final)

**Part 5 — Save Curated Dataset**

In this step, we save the curated dataset as a managed table in the Databricks catalog.
This table represents the **silver layer** of the data pipeline and will be used later for machine-learning tasks.
The dataset is saved in overwrite mode to ensure reproducibility.

In [0]:
spark.sql("USE workspace.silver")

# Save as a table for ML, allowing schema overwrite
df_final.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("labeled_step_test")

**Part 6 — Verify Labels (SQL Queries)**

Validation ensures the ETL process worked correctly and that labels are clean and consistent.

**Check Step vs No Step Counts**

This query counts how many rows are labeled as **step** versus **no_step**.
Both values should be present, and the counts should make logical sense based on the dataset.

In [0]:
%sql
-- %sql
SELECT
  step_label,
  COUNT(*) AS row_count
FROM labeled_step_test
GROUP BY step_label;

**Check for Invalid Step Labels**
This query checks for any rows with missing or invalid step_label values.
A correct dataset should return zero rows.

In [0]:
%sql
-- %sql
SELECT *
FROM labeled_step_test
WHERE step_label NOT IN ('step', 'no_step')
   OR step_label IS NULL
LIMIT 50;

**Check Source Labels**

This query verifies that all rows are properly labeled as either **device** or **step** in the **source** column.

In [0]:
%sql
-- %sql
SELECT
  source,
  COUNT(*) AS row_count
FROM labeled_step_test
GROUP BY source;

**Check for Invalid Source Labels**

This final validation ensures there are no null or unexpected values in the **source** column.
Returning zero rows confirms the dataset is clean and ready for machine learning.

In [0]:
%sql
-- %sql
SELECT *
FROM labeled_step_test
WHERE source NOT IN ('device', 'step')
   OR source IS NULL
LIMIT 50;

**Final Step — Persist the Curated Silver Dataset**

In this step, we save the fully curated and labeled dataset as a managed table in Databricks.
The dataset is written using **overwrite mode** to ensure the table can be recreated consistently if the ETL process is rerun.
Saving the data as a table allows it to be easily queried using SQL and reused in future assignments, including the machine-learning project later in the course.
This table represents the silver layer of the STEDI data pipeline, containing clean, structured, and labeled data ready for modeling and analysis.

In [0]:
df_final.write.mode("overwrite").saveAsTable("labeled_step_test")

**Ethics Check**

**Are we labeling data fairly?**
The labeling process uses clear, consistent criteria for 'step' and 'no_step', minimizing bias and ensuring fairness across all samples.

**Are we protecting identity?**
The dataset does not include personally identifiable information; all sensitive data is excluded or anonymized to protect participant privacy.

**Are we avoiding medical claims?**
The dataset and analysis do not make medical diagnoses or claims. All results are intended for research and educational purposes only.