In [0]:
df = spark.read.table("stockprices.02_silver.daily_prices_cleaned")


In [0]:
from pyspark.sql.functions import stddev, avg, col, current_date, lit, round as spark_round, max as spark_max, min as spark_min
from pyspark.sql.window import Window

window_7day = (
    Window
    .partitionBy("ticker")
    .orderBy("timestamp")
    .rowsBetween(-6, 0)
)

window_30day = (
    Window
    .partitionBy("ticker")
    .orderBy("timestamp")
    .rowsBetween(-29, 0)
)


In [0]:
latest_date = df.agg(spark_max("timestamp")).collect()[0][0]

windows = df.withColumn("high_7day", spark_max("high").over(window_7day))\
    .withColumn("low_7day", spark_min("low").over(window_7day))\
    .withColumn("vol_7day", spark_round(stddev("daily_return").over(window_7day), 4))\
    .withColumn("avg_price_range_7day", spark_round(avg("price_range").over(window_7day), 2))\
    .withColumn("high_30day", spark_max("high").over(window_30day))\
    .withColumn("low_30day", spark_min("low").over(window_30day))\
    .withColumn("vol_30day", spark_round(stddev("daily_return").over(window_30day), 4))\
    .withColumn("avg_price_range_30day", spark_round(avg("price_range").over(window_30day), 2))\
    .filter(col("`timestamp`") == lit(latest_date))

windows.write.mode("overwrite").saveAsTable("stockprices.03_gold.stock_summary")





