In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Load Silver minutes table
df = spark.table("silver_stock_minutes")

# Extract date from timestamp
df = df.withColumn("date", F.to_date("timestamp"))

# 1-Minute returns for volatility
df = df.withColumn(
    "return_1m",
    F.col("close") / F.lag("close").over(Window.partitionBy("ticker").orderBy("timestamp")) - 1
)

# Aggregate by ticker + date
agg_df = (
    df.groupBy("ticker", "date")
    .agg(
        F.first("open").alias("open_minutes"),
        F.max("high").alias("high_minutes"),
        F.min("low").alias("low_minutes"),
        F.last("close").alias("close_minutes"),
        F.sum("volume").alias("volume_minutes"),
        F.stddev("return_1m").alias("intraday_volatility"),
        (F.sum(F.col("close") * F.col("volume")) / F.sum("volume")).alias("vwap_daily"),
        F.sum(F.when(F.col("close") > F.col("open"), 1).otherwise(0)).alias("uptrend_minutes"),
        F.sum(F.when(F.col("close") < F.col("open"), 1).otherwise(0)).alias("downtrend_minutes")
    )
)

# Intraday range
agg_df = agg_df.withColumn("intraday_range", F.col("high_minutes") - F.col("low_minutes"))

# Add ingestion timestamp
agg_df = agg_df.withColumn("ingestion_time", F.current_timestamp())

# Final column order
final_cols = [
    "ticker", "date",
    "open_minutes", "high_minutes", "low_minutes", "close_minutes",
    "volume_minutes",
    "vwap_daily",
    "intraday_volatility",
    "uptrend_minutes", "downtrend_minutes",
    "intraday_range",
    "ingestion_time"
]

agg_df = agg_df.select(*final_cols)

# Save or merge into Gold table
gold_table = "gold_stock_daily_aggregated_from_minutes"

if spark.catalog.tableExists(gold_table):
    delta_table = DeltaTable.forName(spark, gold_table)
    (
        delta_table.alias("target")
        .merge(
            agg_df.alias("source"),
            "target.ticker = source.ticker AND target.date = source.date"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    agg_df.write.format("delta").saveAsTable(gold_table)

StatementMeta(, 18c62998-8f41-4922-9584-370876016f22, 3, Finished, Available, Finished)