# Gold Layer Processing for Stock Quotes

In [1]:
# This notebook takes our clean, transactional data from the Silver layer
# and builds a business-level summary table in the Gold layer.
# We’re calculating 5-minute price aggregations to help spot short-term trends and patterns.

from pyspark.sql.functions import col, window, avg, max, min
from pyspark.sql.types import DecimalType

# Step 1: Let’s load the clean, structured data from the Silver Delta table.
# This gives us a solid foundation to build our business-level insights.
silver_table_name = "silver_stock_trades"
print(f"Reading data from Silver table: {silver_table_name}")

try:
    df_silver = spark.read.table(silver_table_name)

# Step 2: Let’s break the data into 5-minute chunks using Spark’s window function.
# These “tumbling” windows help us calculate price stats over short time intervals—perfect for spotting quick market shifts.
    print("Performing 5-minute window aggregations...")
    df_gold = df_silver.groupBy(
        col("stock_symbol"),
        window(col("timestamp_utc"), "5 minutes")
    ).agg(
        avg("price_current").alias("avg_price_5min"),
        max("price_high_day").alias("high_price_5min"),
        min("price_low_day").alias("low_price_5min")
    )

# Step 3: Let’s tidy up the final aggregated columns—renaming, reordering, or dropping anything we don’t need.
# This helps keep our Gold table clean, readable, and ready for business use.
    df_gold_final = df_gold.select(
        col("stock_symbol"),
        col("window.start").alias("window_start_utc"),
        col("window.end").alias("window_end_utc"),
        col("avg_price_5min").cast(DecimalType(10, 2)),
        col("high_price_5min").cast(DecimalType(10, 2)),
        col("low_price_5min").cast(DecimalType(10, 2))
    )
    
    print("Aggregations complete. Showing a preview of the Gold data:")
    df_gold_final.orderBy("window_start_utc", ascending=False).show(10, truncate=False)

# Step 4: Let’s write our final aggregated DataFrame to the Gold Delta table.
# We’re using 'overwrite' mode here since this table always reflects the latest, complete snapshot of our 5-minute aggregations.
    gold_table_name = "gold_stock_trades_5min"
    df_gold_final.write.format("delta").mode("overwrite").saveAsTable(gold_table_name)

    print(f"SUCCESS: Aggregated data has been saved to the Gold table: '{gold_table_name}'")

except Exception as e:
    print(f"An error occurred: {e}")


StatementMeta(, a3b10f93-498b-40a6-a724-76b6c4221f11, 3, Finished, Available, Finished)

Reading data from Silver table: silver_stock_trades
Performing 5-minute window aggregations...
Aggregations complete. Showing a preview of the Gold data:
+------------+-------------------+-------------------+--------------+---------------+--------------+
|stock_symbol|window_start_utc   |window_end_utc     |avg_price_5min|high_price_5min|low_price_5min|
+------------+-------------------+-------------------+--------------+---------------+--------------+
|AAPL        |2025-08-22 16:05:00|2025-08-22 16:10:00|227.36        |228.72         |225.35        |
|AAPL        |2025-08-19 20:00:00|2025-08-19 20:05:00|230.56        |232.87         |229.35        |
+------------+-------------------+-------------------+--------------+---------------+--------------+

SUCCESS: Aggregated data has been saved to the Gold table: 'gold_stock_trades_5min'
