In [0]:
import pandas

from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession. \
                builder. \
                appName("Aggregation"). \
                master("local[2]"). \
                getOrCreate()


In [0]:
#current date string
c_date_str ='2020-08-06'

In [0]:
# Read trade
df = spark.read.parquet('/mnt/data/trade')


In [0]:
df.show(10)


+--------+------+--------+-------------------+------------+-------------------+--------+----------+------+--------+------+--------+----+----------+
|rec_type|symbol|exchange|           event_tm|event_seq_nb|         arrival_tm|trade_pr|trade_size|bid_pr|bid_size|ask_pr|ask_size|line|  trade_dt|
+--------+------+--------+-------------------+------------+-------------------+--------+----------+------+--------+------+--------+----+----------+
|       T|  SYMA|    NYSE|2020-08-06 17:49:37|          10|2020-08-05 16:50:00|    84.0|        27|   0.0|       0|   0.0|       0|    |2020-08-06|
|       T|  SYMA|  NASDAQ|2020-08-06 17:42:21|          10|2020-08-06 16:30:00|78.93246|       368|   0.0|       0|   0.0|       0|    |2020-08-06|
|       T|  SYMA|  NASDAQ|2020-08-06 19:00:29|          20|2020-08-06 16:30:00| 77.0967|        51|   0.0|       0|   0.0|       0|    |2020-08-06|
|       T|  SYMA|  NASDAQ|2020-08-06 20:09:29|          30|2020-08-06 16:30:00|78.31462|       213|   0.0|      

In [0]:
df.createOrReplaceTempView("trades")

### Create Trade Staging table -- tmp_trade_moving_avg

In [0]:

df1 = spark.sql("select trade_dt, symbol,exchange, event_tm, event_seq_nb, trade_pr from trades where trade_dt='{}'".format(c_date_str))


df1.createOrReplaceTempView("temp_trade_moving_avg")


mov_avg_df=spark.sql("""
        select trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr, 
        avg(trade_pr) over(partition by symbol order by event_tm  RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING and current ROW ) as mov_avg_pr  
        from temp_trade_moving_avg order by symbol""")

mov_avg_df.createOrReplaceTempView("tmp_trade_moving_avg")
mov_avg_df.coalesce(1).write.format('parquet').mode("overwrite").saveAsTable("tmp_trade_moving_avg")

### Create Staging table for the prior day's last trade

#### Get previous date last trade and calculate

In [0]:
import datetime as dt

prev_date=dt.datetime.strptime(c_date_str, '%Y-%m-%d') -dt.timedelta (days=1) 
prev_date_str =dt.datetime.strftime(prev_date, '%Y-%m-%d')

df = spark.sql("select symbol,exchange, event_tm, event_seq_nb, trade_pr from trades where trade_dt='{}'".format(prev_date_str))
               
df.createOrReplaceTempView('tmp_last_trade')
spark.sql("""select * from tmp_last_trade""").show(6)

+------+--------+-------------------+------------+---------+
|symbol|exchange|           event_tm|event_seq_nb| trade_pr|
+------+--------+-------------------+------------+---------+
|  SYMA|  NASDAQ|2020-08-05 17:38:50|          10|  77.7757|
|  SYMA|  NASDAQ|2020-08-05 18:58:33|          20|75.715225|
|  SYMA|  NASDAQ|2020-08-05 20:09:24|          30| 75.87926|
|  SYMA|  NASDAQ|2020-08-05 21:22:41|          40|78.324715|
|  SYMA|  NASDAQ|2020-08-05 22:33:58|          50| 75.72602|
|  SYMA|  NASDAQ|2020-08-05 23:46:43|          60|77.479485|
+------+--------+-------------------+------------+---------+
only showing top 6 rows



In [0]:
#Calculate last trade price
last_pr_df=spark.sql("""
        select distinct symbol, exchange, last_pr from(
            select symbol, exchange, event_tm, event_seq_nb, trade_pr,  
                   last(trade_pr) over(partition by exchange, symbol order by event_tm
                     range between UNBOUNDED PRECEDING and  UNBOUNDED FOLLOWING ) as last_pr  
            from tmp_last_trade order by exchange, symbol)
        """)  

In [0]:
last_pr_df.show()

+------+--------+---------+
|symbol|exchange|  last_pr|
+------+--------+---------+
|  SYMA|  NASDAQ| 77.24676|
|  SYMB|  NASDAQ|35.537262|
|  SYMC|  NASDAQ|158.02032|
|  SYMA|    NYSE| 77.78611|
|  SYMB|    NYSE|33.956287|
|  SYMC|    NYSE|160.61949|
+------+--------+---------+



In [0]:
#Save the view into hive table
last_pr_df.coalesce(1).write.format('parquet').mode("overwrite").saveAsTable('temp_last_trade')

#### 4.4 Populate last trade

In [0]:
#Read processed quote data of current date
df_q = spark.read.parquet('/mnt/data/quote/trade_dt='+c_date_str)
df_q.createOrReplaceTempView ('quote')


In [0]:
df_q.toPandas().head(5)

Unnamed: 0,rec_type,symbol,exchange,event_tm,event_seq_nb,arrival_tm,trade_pr,trade_size,bid_pr,bid_size,ask_pr,ask_size,line,trade_dt
0,Q,SYMA,NASDAQ,2020-08-06 16:38:08,1,2020-08-06 16:30:00,0.0,0,78.133705,100,79.825165,100,,2020-08-06
1,Q,SYMA,NASDAQ,2020-08-06 16:46:05,2,2020-08-06 16:30:00,0.0,0,76.523048,100,76.572411,100,,2020-08-06
2,Q,SYMA,NASDAQ,2020-08-06 16:52:14,3,2020-08-06 16:30:00,0.0,0,78.745354,100,79.092796,100,,2020-08-06
3,Q,SYMA,NASDAQ,2020-08-06 16:58:51,4,2020-08-06 16:30:00,0.0,0,75.613625,100,76.949776,100,,2020-08-06
4,Q,SYMA,NASDAQ,2020-08-06 17:07:40,5,2020-08-06 16:30:00,0.0,0,77.450844,100,78.725334,100,,2020-08-06


In [0]:
#create denormalized view of union quotes and  tmp_trade_moving_avg 


df_quote_union=spark.sql('''
    select  trade_dt, 
      "T" as rec_type, 
      symbol, 
      exchange, 
      event_tm, 
      event_seq_nb, 
      trade_pr, 
      null as bid_pr, 
      null as bid_size, 
      null as ask_pr, 
      null as ask_size, 
      mov_avg_pr from tmp_trade_moving_avg 
    union 
    select 
      trade_dt, 
      rec_type, 
      symbol, 
      exchange, 
      event_tm, 
      event_seq_nb, 
      trade_pr,  
      bid_pr, 
      bid_size, 
      ask_pr,  
      ask_size, 
      null from 
      quote''')
   

df_quote_union.createOrReplaceTempView("quote_union")

df_quote_union.orderBy("symbol", "exchange", "event_tm").toPandas().head(20)

Unnamed: 0,trade_dt,rec_type,symbol,exchange,event_tm,event_seq_nb,trade_pr,bid_pr,bid_size,ask_pr,ask_size,mov_avg_pr
0,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:38:08,1,0.0,78.133705,100.0,79.825165,100.0,
1,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:46:05,2,0.0,76.523048,100.0,76.572411,100.0,
2,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:52:14,3,0.0,78.745354,100.0,79.092796,100.0,
3,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:58:51,4,0.0,75.613625,100.0,76.949776,100.0,
4,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:07:40,5,0.0,77.450844,100.0,78.725334,100.0,
5,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:15:34,6,0.0,79.298424,100.0,81.071915,100.0,
6,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:22:14,7,0.0,77.761452,100.0,79.245232,100.0,
7,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:29:36,8,0.0,75.601135,100.0,76.955353,100.0,
8,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:35:26,9,0.0,76.300049,100.0,77.618706,100.0,
9,2020-08-06,T,SYMA,NASDAQ,2020-08-06 17:42:21,10,78.932457,,,,,78.932457


In [0]:
#Populate The Latest trade_pr and mov_avg_pr

quote_union_update = spark.sql("""
    select *,
    last_value(trade_pr, true) OVER (PARTITION BY symbol, exchange ORDER BY event_tm ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as last_trade_pr,
    last_value(mov_avg_pr, true) OVER (PARTITION BY symbol, exchange ORDER BY event_tm ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as last_mov_avg_pr
    from quote_union
""")
quote_union_update.createOrReplaceTempView("quote_union_update")

In [0]:
quote_union_update.toPandas().head(20)


Unnamed: 0,trade_dt,rec_type,symbol,exchange,event_tm,event_seq_nb,trade_pr,bid_pr,bid_size,ask_pr,ask_size,mov_avg_pr,last_trade_pr,last_mov_avg_pr
0,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:38:08,1,0.0,78.133705,100.0,79.825165,100.0,,0.0,
1,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:46:05,2,0.0,76.523048,100.0,76.572411,100.0,,0.0,
2,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:52:14,3,0.0,78.745354,100.0,79.092796,100.0,,0.0,
3,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 16:58:51,4,0.0,75.613625,100.0,76.949776,100.0,,0.0,
4,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:07:40,5,0.0,77.450844,100.0,78.725334,100.0,,0.0,
5,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:15:34,6,0.0,79.298424,100.0,81.071915,100.0,,0.0,
6,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:22:14,7,0.0,77.761452,100.0,79.245232,100.0,,0.0,
7,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:29:36,8,0.0,75.601135,100.0,76.955353,100.0,,0.0,
8,2020-08-06,Q,SYMA,NASDAQ,2020-08-06 17:35:26,9,0.0,76.300049,100.0,77.618706,100.0,,0.0,
9,2020-08-06,T,SYMA,NASDAQ,2020-08-06 17:42:21,10,78.932457,,,,,78.932457,78.932457,78.932457


In [0]:
#Filter For Quote Records
quote_update = spark.sql("""
    select trade_dt, symbol, event_tm, event_seq_nb, exchange,
            bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr
    from quote_union_update
    where rec_type = 'Q'
""")
quote_update.createOrReplaceTempView("quote_update")

In [0]:
# show the results
print("quote_update count:")
print(quote_update .count())
quote_update.toPandas().head(10)


quote_update count:
540


Unnamed: 0,trade_dt,symbol,event_tm,event_seq_nb,exchange,bid_pr,bid_size,ask_pr,ask_size,last_trade_pr,last_mov_avg_pr
0,2020-08-06,SYMA,2020-08-06 16:38:08,1,NASDAQ,78.133705,100,79.825165,100,0.0,
1,2020-08-06,SYMA,2020-08-06 16:46:05,2,NASDAQ,76.523048,100,76.572411,100,0.0,
2,2020-08-06,SYMA,2020-08-06 16:52:14,3,NASDAQ,78.745354,100,79.092796,100,0.0,
3,2020-08-06,SYMA,2020-08-06 16:58:51,4,NASDAQ,75.613625,100,76.949776,100,0.0,
4,2020-08-06,SYMA,2020-08-06 17:07:40,5,NASDAQ,77.450844,100,78.725334,100,0.0,
5,2020-08-06,SYMA,2020-08-06 17:15:34,6,NASDAQ,79.298424,100,81.071915,100,0.0,
6,2020-08-06,SYMA,2020-08-06 17:22:14,7,NASDAQ,77.761452,100,79.245232,100,0.0,
7,2020-08-06,SYMA,2020-08-06 17:29:36,8,NASDAQ,75.601135,100,76.955353,100,0.0,
8,2020-08-06,SYMA,2020-08-06 17:35:26,9,NASDAQ,76.300049,100,77.618706,100,0.0,
9,2020-08-06,SYMA,2020-08-06 17:50:52,11,NASDAQ,77.962067,100,79.181129,100,0.0,78.932457


In [0]:
# Join With Table temp_last_trade To Get The Prior Day Close Price
quote_final = spark.sql("""
    select t1.trade_dt, t1.symbol, event_tm, event_seq_nb, t1.exchange,
        bid_pr, bid_size, ask_pr, ask_size, 
        last_trade_pr, 
        last_mov_avg_pr,
        bid_pr - t2.last_pr as bid_pr_mv, 
        ask_pr -t2.last_pr as ask_pr_mv,
        t2.last_pr
    from  quote_update t1
    left join  temp_last_trade t2 on 
            t1.symbol= t2.symbol and  t1.exchange= t2.exchange
    """)

In [0]:
quote_final.toPandas().head(10)

Unnamed: 0,trade_dt,symbol,event_tm,event_seq_nb,exchange,bid_pr,bid_size,ask_pr,ask_size,last_trade_pr,last_mov_avg_pr,bid_pr_mv,ask_pr_mv,last_pr
0,2020-08-06,SYMA,2020-08-06 16:38:08,1,NASDAQ,78.133705,100,79.825165,100,0.0,,0.886948,2.578407,77.246758
1,2020-08-06,SYMA,2020-08-06 16:46:05,2,NASDAQ,76.523048,100,76.572411,100,0.0,,-0.723709,-0.674347,77.246758
2,2020-08-06,SYMA,2020-08-06 16:52:14,3,NASDAQ,78.745354,100,79.092796,100,0.0,,1.498596,1.846039,77.246758
3,2020-08-06,SYMA,2020-08-06 16:58:51,4,NASDAQ,75.613625,100,76.949776,100,0.0,,-1.633133,-0.296982,77.246758
4,2020-08-06,SYMA,2020-08-06 17:07:40,5,NASDAQ,77.450844,100,78.725334,100,0.0,,0.204086,1.478577,77.246758
5,2020-08-06,SYMA,2020-08-06 17:15:34,6,NASDAQ,79.298424,100,81.071915,100,0.0,,2.051666,3.825157,77.246758
6,2020-08-06,SYMA,2020-08-06 17:22:14,7,NASDAQ,77.761452,100,79.245232,100,0.0,,0.514694,1.998474,77.246758
7,2020-08-06,SYMA,2020-08-06 17:29:36,8,NASDAQ,75.601135,100,76.955353,100,0.0,,-1.645622,-0.291405,77.246758
8,2020-08-06,SYMA,2020-08-06 17:35:26,9,NASDAQ,76.300049,100,77.618706,100,0.0,,-0.946709,0.371948,77.246758
9,2020-08-06,SYMA,2020-08-06 17:50:52,11,NASDAQ,77.962067,100,79.181129,100,0.0,78.932457,0.715309,1.934372,77.246758


In [0]:
#Write The Final Dataframe Into Azure Blob Storage# 


wfp ='/mnt/data/quote-trade-analytical'
print(wfp)

/mnt/data/quote-trade-analytical


In [0]:
quote_final.write.partitionBy("trade_dt").mode("append").parquet(wfp)