# Gold Layer – Aggregated Metrics and Distance Calculation

This layer produces **final analytical datasets** from the Silver layer.

## 1. `gold_command_metrics_mv`
- **Type:** Materialized View  
- **Source:** `silver_combined_events`
- **Purpose:**  
  - Aggregates robot command data by **robot ID**, **date**, and **command name**.
  - Calculates:
    - `command_count` → number of executed commands
    - `avg_duration_ms` → average execution time per command

## 2. `gold_robot_distance`
- **Type:** Table  
- **Source:** `silver_combined_events`
- **Purpose:**  
  - Calculates **robot movement distance** using position changes over time.
  - Uses a window function to compare each robot’s current and previous position.
  - Computes `step_distance` with Euclidean distance formula.
  - First row per robot defaults to distance = 0.


In [0]:
import dlt
from pyspark.sql.functions import col, avg, count, sum as spark_sum, lag, sqrt, pow, to_date
from pyspark.sql.window import Window

CATALOG = "haley_b_demo"
SILVER = "silver"
GOLD = "gold"

@dlt.materialized_view(name=f"{CATALOG}.{GOLD}.gold_command_metrics_mv")
def gold_command_metrics_mv():
    df = dlt.read(f"{CATALOG}.{SILVER}.silver_combined_events")
    df = df.filter(col("command_name").isNotNull())
    df = df.withColumn("date", to_date("timestamp"))
    
    return df.groupBy("robot_id", "date", "command_name").agg(
        count("*").alias("command_count"),
        avg("duration_ms").alias("avg_duration_ms")
    )

In [0]:
import dlt
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, sqrt, pow, col

CATALOG = "haley_b_demo"
SILVER = "silver"
GOLD = "gold"

@dlt.table(
    name=f"{CATALOG}.{GOLD}.gold_robot_distance",
    comment="Calculate robot movement distance based on position changes (window function)"
)
def gold_robot_distance():
    df = dlt.read(f"{CATALOG}.{SILVER}.silver_combined_events")

    # Window for ordering robot movements
    w = Window.partitionBy("robot_id").orderBy("timestamp")

    # Calculate previous position
    df = df.withColumn("prev_x", lag("x").over(w))
    df = df.withColumn("prev_y", lag("y").over(w))

    # Calculate step_distance (Euclidean distance)
    df = df.withColumn(
        "step_distance",
        sqrt(
            pow(col("x") - col("prev_x"), 2) +
            pow(col("y") - col("prev_y"), 2)
        )
    )

    # For the first row, previous value does not exist, so distance = 0
    df = df.fillna({"step_distance": 0})

    return df.select(
        "robot_id",
        "timestamp",
        "x", "y",
        "step_distance"
    )