# Calculating 5-minute moving average for each stock symbol

In [0]:
from pyspark.sql import functions as F

# Read from Silver table
silver_df = df = spark.read.table("kafka.silver.silver_stock_data")

# Example of calculating 5-minute moving average for each stock symbol
gold_df = (silver_df
           .groupBy(F.window("timestamp", "5 minutes"), "symbol")
           .agg(F.avg("high").alias("avg_price")))
           # Calculate 50-period simple moving average for each stock symbol
ohlc_df = silver_df.groupBy(F.window("timestamp", "1 day"), "symbol").agg(
    F.first("open").alias("open_price"),
    F.max("high").alias("high_price"),
    F.min("low").alias("low_price"),
    F.last("close").alias("close_price"),
    F.sum("volume").alias("daily_volume")
)

# Write to the Gold Delta table
#gold_df.write.format("delta").mode("overwrite").saveAsTable("kafka.gold.gold_stock_data")

ohlc_df.write.format("delta").mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable("kafka.gold.gold_stock_data") 



In [0]:
%sql
select * from kafka.gold.gold_stock_data

window,symbol,avg_price,open_price,high_price,low_price,close_price,daily_volume
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",MSFT,,415.88,418.18,415.6025,415.655,115084.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",GOOGL,,166.95,167.06,164.54,166.85,579984.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",AAPL,,226.37,226.8,224.42,226.635,950900.0


#-- Daily stock price aggregates

In [0]:
%sql
-- Daily stock price aggregates
CREATE OR REPLACE TABLE kafka.gold.daily_stock_aggregates AS
SELECT 
  symbol,
  timestamp,
  AVG(open) AS avg_open,
  AVG(close) AS avg_close,
  MAX(high) AS max_high,
  MIN(low) AS min_low,
  SUM(volume) AS total_volume
FROM kafka.silver.silver_stock_data
GROUP BY symbol, timestamp;


num_affected_rows,num_inserted_rows


# Daily stock aggregation

In [0]:
%sql
select * from kafka.gold.daily_stock_aggregates order by symbol

symbol,timestamp,avg_open,avg_close,max_high,min_low,total_volume
AAPL,2024-10-04 19:48:00,226.43,226.48500000000007,226.49,226.42,1530.0
AAPL,2024-10-04 18:59:00,226.565,226.565,226.7,226.56,1290.0
AAPL,2024-10-04 18:55:00,226.54,226.53000000000003,226.54,226.53,550.0
AAPL,2024-10-04 18:39:00,226.53500000000005,226.53000000000003,226.63,226.53,7990.0
AAPL,2024-10-04 19:00:00,226.45,226.575,226.575,226.45,60.0
AAPL,2024-10-04 18:38:00,226.53500000000005,226.53500000000005,226.54,226.53,270.0
AAPL,2024-10-04 19:04:00,226.575,226.45,226.575,226.45,260.0
AAPL,2024-10-04 19:16:00,226.555,226.5,226.555,226.5,280.0
AAPL,2024-10-04 19:58:00,225.29,226.4189000000001,226.46,225.29,1450.0
AAPL,2024-10-04 19:38:00,226.46,226.46,226.46,226.46,360.0


In [0]:
%sql
SELECT * FROM kafka.gold.gold_stock_data ;

window,symbol,avg_price,open_price,high_price,low_price,close_price,daily_volume
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",MSFT,,415.88,418.18,415.6025,415.655,115084.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",GOOGL,,166.95,167.06,164.54,166.85,579984.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",AAPL,,226.37,226.8,224.42,226.635,950900.0


In [0]:
from pyspark.sql.functions import max as spark_max, min as spark_min, sum as spark_sum

# Monthly Aggregates
monthly_aggregates = silver_df.groupBy("symbol", "timestamp").agg(
    spark_max("high").alias("max_high"),
    spark_min("low").alias("min_low"),
    spark_sum("volume").alias("total_volume")
)

# Write the monthly aggregates to Delta table
monthly_aggregates.write.format("delta").mode("overwrite").saveAsTable("kafka.gold.monthly_aggregates")

In [0]:
%sql
select * from kafka.gold.monthly_aggregates order by symbol

symbol,timestamp,max_high,min_low,total_volume
AAPL,2024-10-04 19:48:00,226.49,226.42,1530.0
AAPL,2024-10-04 18:59:00,226.7,226.56,1290.0
AAPL,2024-10-04 18:55:00,226.54,226.53,550.0
AAPL,2024-10-04 18:39:00,226.63,226.53,7990.0
AAPL,2024-10-04 19:00:00,226.575,226.45,60.0
AAPL,2024-10-04 18:38:00,226.54,226.53,270.0
AAPL,2024-10-04 19:04:00,226.575,226.45,260.0
AAPL,2024-10-04 19:16:00,226.555,226.5,280.0
AAPL,2024-10-04 19:58:00,226.46,225.29,1450.0
AAPL,2024-10-04 19:38:00,226.46,226.46,360.0


# Optimize Delta Tables

In [0]:
%sql
OPTIMIZE kafka.gold.daily_stock_aggregates ZORDER BY (symbol)


path,metrics
abfss://data@adbmathew01.dfs.core.windows.net/ext_tables/gold/__unitystorage/schemas/1b6382ee-4640-4e7b-bc3c-4bf9b5517b79/tables/6966fa29-10e5-44d2-9036-92ae8b369be9,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 8051), 0, List(0, 0), 0, null), null, 0, 0, 1, 1, false, 0, 0, 1728317014422, 1728317015709, 4, 0, null, List(0, 0), 7, 7, 0, 0, null)"


In [0]:
%sql
SELECT * FROM kafka.gold.gold_stock_data ;

window,symbol,avg_price,open_price,high_price,low_price,close_price,daily_volume
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",MSFT,,415.88,418.18,415.6025,415.655,115084.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",GOOGL,,166.95,167.06,164.54,166.85,579984.0
"List(2024-10-04T00:00:00Z, 2024-10-05T00:00:00Z)",AAPL,,226.37,226.8,224.42,226.635,665630.0


In [0]:
%sql
select * from kafka.gold.daily_stock_aggregates

symbol,timestamp,avg_open,avg_close,max_high,min_low,total_volume
MSFT,2024-10-04 18:45:00,415.95,415.84,415.95,415.8,305.0
GOOGL,2024-10-04 18:33:00,166.86000000000004,166.86999999999998,166.87,166.86,552.0
AAPL,2024-10-04 19:58:00,225.29,226.4189000000001,226.46,225.29,1015.0
GOOGL,2024-10-04 19:57:00,166.93000000000004,166.98,166.98,166.88,5736.0
GOOGL,2024-10-04 19:51:00,166.87799999999996,166.80400000000003,166.88,166.804,264.0
GOOGL,2024-10-04 19:00:00,166.86999999999998,166.88000000000002,166.88,166.87,60.0
AAPL,2024-10-04 18:31:00,226.6,226.59,226.64,226.54,665.0
MSFT,2024-10-04 19:38:00,415.85,415.84,415.85,415.83,1206.0
MSFT,2024-10-04 19:53:00,415.88000000000005,415.88000000000005,415.88,415.825,258.0
MSFT,2024-10-04 18:32:00,415.7,415.69,415.7,415.61,1925.0
