In [None]:
from pyspark.sql import SparkSession

In [None]:
#dbutils.fs.unmount("/mnt/data")

In [None]:
# Specify some variables to mount to blob storage
container_name = "####"
storage_name = "####"
mount_dir = "/mnt/data"
key = "####"


dbutils.fs.mount(
  source = "wasbs://%s@%s.blob.core.windows.net" %(container_name, storage_name),
  mount_point = mount_dir,
  extra_configs = {"fs.azure.account.key.%s.blob.core.windows.net" % (storage_name): key })


In [None]:
# Create Spark Session
spark = SparkSession.builder.master('local').appName('app').getOrCreate()

# read parquet
df = spark.read.parquet("/mnt/data/cloud-storage-path/trade/trade_dt={}".format("2020-08-06"))

In [None]:
df.show()

In [None]:
# Create a temp view for df
df.createOrReplaceTempView("trades")

# Create a new dataframe
temp_df = spark.sql("""
            SELECT trade_dt, 
                   record_type, 
                   symbol, 
                   exchange, 
                   event_tm, 
                   event_seq_nb,
                   bid_pr AS trade_pr
              FROM trades 
              WHERE trade_dt = '2020-08-06'

            """)

# Create a temp view from temp_df
temp_df.createOrReplaceTempView("tmp_trade_moving_avg")

In [None]:
temp_df.show()

In [None]:
# Calculate 30 mins moving average 
mov_avg_df = spark.sql("""
    SELECT 
           trade_dt, 
           record_type, 
           symbol, 
           exchange, 
           event_tm, 
           event_seq_nb,
           trade_pr,
           AVG(trade_pr) OVER (PARTITION BY (symbol)
                ORDER BY CAST(event_tm AS timestamp) 
                RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as mov_avg_pr    
    FROM tmp_trade_moving_avg
""")

In [None]:
# Save Temp View -> Hive Table for Staging
mov_avg_df.write.saveAsTable("temp_trade_moving_avg")

# Show
mov_avg_df.show(5)

# Process Previous Trades Data

In [None]:
prev_date_str = "2020-08-05"

# Create dateframe from prev_date_str
prev_df = spark.sql("select trade_dt, exchange, symbol, event_tm, event_seq_nb, bid_pr as trade_pr from trades where trade_dt = '{}'".format(prev_date_str))

# Create temp view
prev_df.createOrReplaceTempView("tmp_last_trade")

# Calculate last trade price 
last_prev_df = spark.sql("""
        SELECT a.symbol, a.exchange, a.last_pr FROM (
            SELECT symbol, exchange, event_tm, event_seq_nb, trade_pr,
            AVG(trade_pr) OVER (PARTITION BY (symbol)
            ORDER BY CAST(event_tm AS timestamp) 
            RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as last_pr
             FROM tmp_trade_moving_avg
        ) a
""")

# Save Temp View Into Hive Table For Staging
last_prev_df.write.saveAsTable("temp_last_trade")

# Show head
last_prev_df.show(5)

# Populate The Latest Trade and Latest Moving Average Trade Price To The Quote Records

In [None]:
# Load the quote dataframe
quote_df = spark.read.parquet("/mnt/data/cloud-storage-path/quote/quote_dt={}".format("2020-08-06"))

# Create a temp view
quote_df.createOrReplaceTempView("quotes")

In [None]:
quote_df.show(4)

# Problem
Since we want to join `quotes` and `tmp_trade_moving_avg` to populate `trade_pr` and `mov_avg_pr` into quotes. However, we cannot use equality join in this case; trade events don't happen at the same quote time.

We want the latest in time sequence. This is a typical time sequence anatylical use case. A good method for this problem is to merge both tables in a common time sequence.

![Google Drive Image](https://drive.google.com/uc?export=view&id=1FNEf80Tw-lBb5JDkr1gDMmIPrgEn1plo)

In [None]:
# Perform data normalization in the select statement from both table followed by union
quote_union = spark.sql("""
        SELECT  trade_dt,
                record_type,
                symbol,
                event_tm,
                event_seq_nb,
                exchange,
                bid_pr,
                bid_size,
                ask_pr,
                ask_size,
                null as trade_pr,
                null as mov_avg_pr 
            FROM quotes
            UNION ALL
        SELECT  trade_dt, 
                "T" as record_type,
                symbol,
                event_tm,
                event_seq_nb,
                exchange,
                null as bid_pr,
                null as bid_size,
                null as ask_pr,
                null as ask_size,
                trade_pr,
                mov_avg_pr 
             FROM temp_trade_moving_avg
""")

quote_union.show(10)

In [None]:
# Create temp view
quote_union.createOrReplaceTempView("quote_union")

# Populate The Latest trade_pr and mov_avg_pr

In [None]:
quote_union_update = spark.sql("""
    select *,
    last_value(trade_pr, true) OVER (PARTITION BY symbol, exchange ORDER BY event_tm ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as last_trade_pr,
    last_value(mov_avg_pr, true) OVER (PARTITION BY symbol, exchange ORDER BY event_tm ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as last_mov_avg_pr
    from quote_union
""")

quote_union_update.createOrReplaceTempView("quote_union_update")

In [None]:
quote_union_update.show()

In [None]:
# Check the "T" - trade rows
spark.sql("""
        SELECT *
        FROM quote_union_update
        WHERE record_type = "T"
        
""").show()

In [None]:
# Filter For Quote Records
quote_update = spark.sql("""
    SELECT  trade_dt, 
            symbol, 
            event_tm, 
            event_seq_nb, 
            exchange,
            bid_pr, 
            bid_size, 
            ask_pr, 
            ask_size, 
            last_trade_pr, 
            last_mov_avg_pr
    FROM quote_union_update
    WHERE record_type = 'Q'
""")

# Create temporary view of data
quote_update.createOrReplaceTempView("quote_update")


# Join With Table temp_last_trade To Get The Prior Day Close Price
The prior day close price table has a single record per symbol and exchange. For this join, you can use equality join since the join fields are only “symbol” and “exchange”. However, note that this table has a very limited number of records (no more than the number of symbol and exchange combinations). This is an excellent opportunity to use broadcast join to achieve optimal join performance. Broadcast join is always recommended if one of the join tables is small enough so that the whole dataset fits in memory.

In [None]:
quote_final = spark.sql("""
    SELECT trade_dt, 
           symbol, 
           event_tm, 
           event_seq_nb, 
           exchange,
           bid_pr, 
           bid_size, 
           ask_pr, 
           ask_size, 
           last_trade_pr, 
           last_mov_avg_pr,
           bid_pr - close_pr as bid_pr_mv,
           ask_pr - close_pr as ask_pr_mv
    FROM ( 
            SELECT /* + BROADCAST(t) */
                q.trade_dt, 
                q.symbol, 
                q.event_tm, 
                q.event_seq_nb, 
                q.exchange,
                q.bid_pr, 
                q.bid_size, 
                q.ask_pr, 
                q.ask_size, 
                q.last_trade_pr,
                q.last_mov_avg_pr,
                t.last_pr AS close_pr
            FROM quote_update q
             JOIN temp_last_trade t 
               ON (q.symbol = t.symbol AND q.exchange = t.exchange))
""")

# Show us the data so we can pick which trade_date to use
quote_final.show()

In [None]:
# Write The Final Dataframe Into Azure Blob Storage At Corresponding Partition
trade_date = "2020-08-06"

quote_final.write.parquet("/mnt/data/cloud-storage-path/quote-trade-analytical/date={}".format(trade_date))


# END