In [0]:
import pyspark
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [0]:
azure_key = "somevalue"
azure_storage = "somevalue"
azure_container = "somevalue"
trade_dt_05 = "2020-08-05"
trade_dt_06 = "2020-08-06"

In [0]:
# Set Azure blob configuration
spark.conf.set(f"fs.azure.account.key.{azure_storage}.blob.core.windows.net", azure_key)

# Read parquet and create dataframe
quote_05 = spark.read.format("parquet").load(f"wasbs://{azure_container}@{azure_storage}.blob.core.windows.net/quote/trade_dt={trade_dt_05}")
quote_06 = spark.read.format("parquet").load(f"wasbs://{azure_container}@{azure_storage}.blob.core.windows.net/quote/trade_dt={trade_dt_06}")

In [0]:
quote_05.union(quote_06).createOrReplaceTempView("quotes")

In [0]:
quote = spark.sql("select trade_dt, rec_type, stock_symbol, stock_exchange, latest_quote, event_seq_nb, bid_pr, bid_size, ask_pr, ask_size from quotes")

In [0]:
quote.createOrReplaceTempView("tmp_quote")

In [0]:
mov_avg_df = spark.sql("""select trade_dt, rec_type, stock_symbol, stock_exchange, latest_quote, event_seq_nb, 
    bid_pr, 
    avg(bid_pr) over(partition by stock_symbol order by latest_quote range between interval '30' minutes preceding and current row) as mov_avg_bid_pr,
    bid_size,
    ask_pr,
    avg(ask_pr) over(partition by stock_symbol order by latest_quote range between interval '30' minutes preceding and current row) as mov_avg_ask_pr,
    ask_size
    from tmp_quote
""")

In [0]:
mov_avg_df.createOrReplaceTempView("temp_quote_moving_avg")

In [0]:
current_date = datetime.datetime.strptime("2020-08-06", "%Y-%m-%d")
previous_date = current_date - datetime.timedelta(1)

In [0]:
previous_date_quote = spark.sql(f"select stock_symbol, stock_exchange, latest_quote, event_seq_nb, bid_pr, ask_pr from quotes where trade_dt = '{previous_date}'")

In [0]:
previous_date_quote.createOrReplaceTempView("previous_date_quote")

In [0]:
temp_last_quote = spark.sql("""select stock_symbol as symbol, last_bid_pr, last_ask_pr from
(select stock_symbol,
 last_value(bid_pr) over (partition by stock_symbol order by latest_quote rows between unbounded preceding and unbounded following) as last_bid_pr,
 last_value(ask_pr) over (partition by stock_symbol order by latest_quote rows between unbounded preceding and unbounded following) as last_ask_pr
 from temp_quote_moving_avg)
 """).distinct()

In [0]:
# Read parquet and create dataframe
trade_05 = spark.read.format("parquet").load(f"wasbs://{azure_container}@{azure_storage}.blob.core.windows.net/trade/trade_dt={trade_dt_05}")
trade_06 = spark.read.format("parquet").load(f"wasbs://{azure_container}@{azure_storage}.blob.core.windows.net/trade/trade_dt={trade_dt_06}")

In [0]:
trade_05.union(trade_06).createOrReplaceTempView("trades")

In [0]:
# Define common schema
common_schema = StructType([StructField("trade_dt", DateType(), True),
                            StructField("rec_type", StringType(), True),
                            StructField("stock_symbol", StringType(), True),
                            StructField("event_tm", TimestampType(), True),
                            StructField("event_seq_nb", IntegerType(), True),
                            StructField("stock_exchange", StringType(), True),
                            StructField("bid_pr", FloatType(), True),
                            StructField("bid_size", IntegerType(), True),
                            StructField("ask_pr", FloatType(), True),
                            StructField("ask_size", IntegerType(), True),
                            StructField("trade_pr", FloatType(), True), 
                            StructField("mov_avg_bid_pr", StringType(), True),
                            StructField("mov_avg_ask_pr", StringType(), True)])

In [0]:
quote_trade_union = spark.sql("""
select trade_dt,
       rec_type,
       stock_symbol, 
       latest_quote as event_tm,
       event_seq_nb,
       stock_exchange,
       bid_pr,
       bid_size,
       ask_pr,
       ask_size,
       null as trade_pr,
       mov_avg_bid_pr,
       mov_avg_ask_pr
from temp_quote_moving_avg
union
select trade_dt,
       rec_type,
       stock_symbol,
       latest_trade as event_tm,
       event_seq_nb,
       stock_exchange,
       null as bid_pr,
       null as bid_size,
       null as ask_pr,
       null as ask_size,
       trade_pr,
       null as mov_avg_bid_pr,
       null as mov_avg_ask_pr
from trades""")

quote_trade_union = spark.createDataFrame(quote_trade_union.rdd, common_schema)

In [0]:
quote_trade_union.createOrReplaceTempView("quote_trade_union")

In [0]:
quote_trade_union_update = spark.sql("""
select *,
last_value(trade_pr) over(partition by stock_symbol) as last_trade_pr
from quote_trade_union
""")

In [0]:
quote_trade_union_update.createOrReplaceTempView("quote_trade_union_update")

In [0]:
quote_trade_update = spark.sql("""

select *
from quote_trade_union_update
where rec_type = 'Q'

""")

In [0]:
temp_table = quote_trade_update.join(broadcast(temp_last_quote), quote_trade_update.stock_symbol == temp_last_quote.symbol, how="left")

In [0]:
temp_table.createOrReplaceTempView("temp_table")

In [0]:
quote_final = spark.sql("""
select *
from temp_table
""")