In [0]:
from datetime import datetime
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
# Select The Necessary Columns For Trade Records
trade = trade_common.select("trade_dt", "rec_type", "symbol", "exchange", "event_tm", "event_seq_nb", "arrival_tm", "trade_pr")

# Apply Data Correction
def applyLatest(df):
    #trades
    if df.first()["rec_type"] == "T":
        df_grouped = df.groupBy("trade_dt", "rec_type", "symbol", "arrival_tm", "event_seq_nb").agg(max("event_tm").alias("latest_trade"))
        df_joined = df_grouped.join(df.select("event_tm", "exchange", "trade_pr"), df.event_tm == df_grouped.latest_trade, "inner")
        df_final = df_joined.select("trade_dt", "rec_type", col("symbol").alias("stock_symbol"), col("exchange").alias("stock_exchange"), "latest_trade", "event_seq_nb", "arrival_tm", "trade_pr").orderBy("trade_dt", "symbol", "event_seq_nb")
        return df_final
    #quotes
    elif df.first()["rec_type"] == "Q":
        df_grouped = df.groupBy("trade_dt", "rec_type", "symbol", "arrival_tm", "event_seq_nb").agg(max("event_tm").alias("latest_quote"))
        df_joined = df_grouped.join(df.select("event_tm", "exchange", "bid_pr", "bid_size", "ask_pr", "ask_size"), df.event_tm == df_grouped.latest_quote, "inner")
        df_final = df_joined.select("trade_dt", "rec_type", col("symbol").alias("stock_symbol"), col("exchange").alias("stock_exchange"), "latest_quote", "event_seq_nb", "arrival_tm", "bid_pr", "bid_size", "ask_pr", "ask_size").orderBy("trade_dt", "symbol", "event_seq_nb")
        return df_final

trade_corrected = applyLatest(trade)
trades_080520 = trade_corrected.where(trade_corrected.trade_dt == "2020-08-05")
trades_080620 = trade_corrected.where(trade_corrected.trade_dt == "2020-08-06")

#  Write The Trade Dataset Back To Parquet On Azure Blob Storage
trades_080520.write.mode('overwrite').parquet("/trade/trade_dt={}".format('2020-08-05'))
trades_080620.write.mode('overwrite').parquet("/trade/trade_dt={}".format('2020-08-06'))

# ****************** REPEAT FOR QUOTES ******************
quote_common = spark.read.format('parquet').load("/output_dir/partition=Q")

# Select The Necessary Columns For Quote Records
quote = quote_common.select("trade_dt", "rec_type", "symbol", "exchange", "event_tm", "event_seq_nb", "arrival_tm", "bid_pr", "bid_size", "ask_pr", "ask_size")

# Apply Data Correction
quote_corrected = applyLatest(quote)

# Separate dataframes by trade date
quotes_080520 = quote_corrected.where(quote_corrected.trade_dt == "2020-08-05")
quotes_080620 = quote_corrected.where(quote_corrected.trade_dt == "2020-08-06")

# Write The Quote Dataset Back To Parquet On Azure Blob Storage
quotes_080520.write.mode('overwrite').parquet("/quote/trade_dt={}".format('2020-08-05'))
quotes_080620.write.mode('overwrite').parquet("/quote/trade_dt={}".format('2020-08-06'))


In [0]:
# 4.1 Read Parquet Files From Azure Blob Storage Partition
# df = spark.read.parquet("cloud-storage-path/trade/date={}".format("2020-07-29"))
df =spark.read.parquet("/trade/trade_dt={}".format("2020-08-05"))

In [0]:
# 4.2 Create Trade Staging Table
# 4.2.1 Use Spark To Read The Trade Table With Date Partition “2020-07-29”
# df = spark.sql("select symbol, event_tm, event_seq_nb, trade_pr from trades where trade_dt = '2020-07-29'")
df = df.select("stock_symbol","latest_trade", "event_seq_nb", "trade_pr").where(df.trade_dt == "2020-08-05")

In [0]:
# 4.2.2 Create A Spark Temporary View
df.createOrReplaceTempView("tmp_trade_moving_avg")

In [0]:
"""
4.2.3 Calculate The 30-min Moving Average Using The Spark Temp View 
Partition by symbol and order by time. The window should contain all records within 30-min of the corresponding row.
mov_avg_df = spark.sql('select symbol, exchange, event_tm, event_seq_nb, trade_pr,# [logic to derive last 30 min moving average price] as mov_avg_pr\from tmp_trade_moving_avg')
"""
mov_avg_df = spark.sql('select stock_symbol, latest_trade, event_seq_nb, trade_pr,() as mov_avg_pr from tmp_trade_moving_avg')