In [1]:
blob_account_name = 'springcapital'
blob_container_name = 'springcapitalfiles'
account_key = ''
# trade_blob_relative_path = 'trade/trade_dt=2020-08-06/*.parquet'
# quote_blob_relative_path = 'quote/trade_dt=2020-08-05/*.parquet'
filepath = 'wasbs://{}@{}.blob.core.windows.net/'.format(blob_container_name,blob_account_name)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1645742404082_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('app').getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set('fs.azure.account.key.{}.blob.core.windows.net'.format(blob_account_name), account_key)

In [3]:
trade = spark.read.option("basePath", filepath + 'trade/').parquet(filepath + 'trade/*/*.parquet')

In [4]:
trade.show()

+------+--------+--------------------+------------+--------+-------------------+----------+
|symbol|exchange|            event_tm|event_seq_nb|trade_pr|       arrival_time|  trade_dt|
+------+--------+--------------------+------------+--------+-------------------+----------+
|  SYMB|  NASDAQ|2020-08-06 15:20:...|          50|   33.08|2020-08-06 09:30:00|2020-08-06|
|  SYMA|  NASDAQ|2020-08-06 18:01:...|          70|   76.98|2020-08-06 09:30:00|2020-08-06|
|  SYMA|  NASDAQ|2020-08-06 10:42:...|          10|   78.93|2020-08-06 09:30:00|2020-08-06|
|  SYMA|  NASDAQ|2020-08-06 19:09:...|          80|   78.62|2020-08-06 09:30:00|2020-08-06|
|  SYMA|  NASDAQ|2020-08-06 21:33:...|         100|   77.44|2020-08-06 09:30:00|2020-08-06|
|  SYMB|  NASDAQ|2020-08-06 21:10:...|         100|   35.75|2020-08-06 09:30:00|2020-08-06|
|  SYMC|  NASDAQ|2020-08-06 15:30:...|          50|  160.83|2020-08-06 09:30:00|2020-08-06|
|  SYMB|  NASDAQ|2020-08-06 13:04:...|          30|   36.70|2020-08-06 09:30:00|

In [5]:
trade.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- exchange: string (nullable = true)
 |-- event_tm: timestamp (nullable = true)
 |-- event_seq_nb: integer (nullable = true)
 |-- trade_pr: decimal(5,2) (nullable = true)
 |-- arrival_time: timestamp (nullable = true)
 |-- trade_dt: date (nullable = true)

In [6]:
import pyspark.sql.functions as f
trade.select(f.min('trade_dt')).show(), trade.select(f.max('trade_dt')).show()

+-------------+
|min(trade_dt)|
+-------------+
|   2020-08-05|
+-------------+

+-------------+
|max(trade_dt)|
+-------------+
|   2020-08-06|
+-------------+

(None, None)

In [7]:
#create temp trade table
trade.createOrReplaceTempView("temp_trades")

In [8]:
df = spark.sql("""select trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr 
from temp_trades 
where trade_dt = '2020-08-06'""")

df.createOrReplaceTempView("trade")

#Calculate The 30-min Moving Average Using The Spark Temp View
mov_avg_df = spark.sql('''select trade_dt,symbol,exchange,event_tm,event_seq_nb,trade_pr, avg(trade_pr) over(partition by symbol order by event_tm range between interval 30 minutes preceding and current row ) as mov_avg_price 
from trade''')

In [9]:
mov_avg_df.show()

+----------+------+--------+--------------------+------------+--------+-------------+
|  trade_dt|symbol|exchange|            event_tm|event_seq_nb|trade_pr|mov_avg_price|
+----------+------+--------+--------------------+------------+--------+-------------+
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:42:...|          10|   78.93|    78.930000|
|2020-08-06|  SYMA|    NYSE|2020-08-06 10:49:...|          10|   74.49|    76.710000|
|2020-08-06|  SYMA|    NYSE|2020-08-06 12:00:...|          20|   76.16|    76.160000|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 12:00:...|          20|   77.10|    76.630000|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 13:09:...|          30|   78.31|    78.310000|
|2020-08-06|  SYMA|    NYSE|2020-08-06 13:11:...|          30|   76.90|    77.605000|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 14:27:...|          40|   75.84|    75.840000|
|2020-08-06|  SYMA|    NYSE|2020-08-06 14:27:...|          40|   77.12|    76.480000|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 15:39:...|     

In [10]:
spark.sql('DROP TABLE IF EXISTS temp_trade_moving_avg') 

DataFrame[]

In [11]:
#Save The Temporary View Into Hive Table For Staging
mov_avg_df.write.saveAsTable("temp_trade_moving_avg")

In [12]:
spark.catalog.listTables()

[Table(name=u'hivesampletable', database=u'default', description=None, tableType=u'MANAGED', isTemporary=False), Table(name=u'temp_trade_moving_avg', database=u'default', description=None, tableType=u'MANAGED', isTemporary=False), Table(name=u'temp_trades', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'trade', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True)]

## prior day

In [13]:
import datetime
date = datetime.datetime.strptime('2020-08-06', '%Y-%m-%d')
prev_date_str = date - datetime.timedelta(days=1)
prev_date_str = str(prev_date_str.date())

In [14]:
prev_date_str

'2020-08-05'

In [15]:
prior_df = spark.sql("""select trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr 
from temp_trades 
where trade_dt = '2020-08-05'""")


In [16]:
prior_df.createOrReplaceTempView('prev_trade')

In [17]:
#Calculate Last Trade Price Using The Spark Temp View
prev_temp_last_trade = spark.sql("""select * 
                                    from 
                                        (select symbol, exchange, last_trade_pr, rank() over(partition by symbol, exchange order by event_tm desc) rank 
                                                        from (select symbol,
                                                                    exchange,
                                                                    event_tm,
                                                                    avg(trade_pr) over(partition by symbol, exchange order by event_tm desc 
                                                                    range between interval '30' minutes preceding and current row) as last_trade_pr 
                                                                from prev_trade) a ) b
                                                 where rank=1""")

In [18]:
prev_temp_last_trade.show()

+------+--------+-------------+----+
|symbol|exchange|last_trade_pr|rank|
+------+--------+-------------+----+
|  SYMA|  NASDAQ|    77.250000|   1|
|  SYMA|    NYSE|    77.790000|   1|
|  SYMB|  NASDAQ|    35.540000|   1|
|  SYMB|    NYSE|    33.960000|   1|
|  SYMC|  NASDAQ|   158.020000|   1|
|  SYMC|    NYSE|   160.620000|   1|
+------+--------+-------------+----+

In [19]:
prev_temp_last_trade.write.saveAsTable("temp_last_trade")

## quote data

In [20]:
#quote temp table
quote = spark.read.option("basePath", filepath + 'quote/').parquet(filepath + 'quote/*/*.parquet')

In [21]:
quote.show()

+------+--------+--------------------+------------+------+--------+------+--------+-------------------+----------+
|symbol|exchange|            event_tm|event_seq_nb|bid_pr|bid_size|ask_pr|ask_size|       arrival_time|  trade_dt|
+------+--------+--------------------+------------+------+--------+------+--------+-------------------+----------+
|  SYMC|    NYSE|2020-08-05 19:54:...|          82|157.37|     100|158.24|     100|2020-08-05 09:30:00|2020-08-05|
|  SYMC|    NYSE|2020-08-05 13:03:...|          29|158.43|     100|159.17|     100|2020-08-05 09:30:00|2020-08-05|
|  SYMB|    NYSE|2020-08-05 15:26:...|          49| 34.30|     100| 34.70|     100|2020-08-05 09:30:00|2020-08-05|
|  SYMB|    NYSE|2020-08-05 15:01:...|          45| 33.67|     100| 33.79|     100|2020-08-05 09:30:00|2020-08-05|
|  SYMC|    NYSE|2020-08-05 09:35:...|           1|158.95|     100|160.20|     100|2020-08-05 09:30:00|2020-08-05|
|  SYMA|    NYSE|2020-08-05 15:23:...|          49| 78.58|     100| 79.14|     1

In [22]:
quote.createOrReplaceTempView("temp_quote")

In [23]:
quotes = spark.sql("""select trade_dt,symbol,exchange,event_tm,event_seq_nb,bid_pr,bid_size,ask_pr,ask_size 
from temp_quote 
where trade_dt='2020-08-06'""")

In [24]:
quotes.createOrReplaceTempView("quotes")

In [25]:
# Populate The Latest Trade and Latest Moving Average Trade Price To The Quote
# Records

# You need to join “quotes” and “temp_trade_moving_avg” to populate trade_pr and mov_avg_pr
# into quotes. However, you cannot use equality join in this case; trade events don’t happen at the
# same quote time. You want the latest in time sequence. This is a typical time sequence
# analytical use case. A good method for this problem is to merge both tables in a common time
# sequence.

# Column Value
# trade_dt Value from corresponding records
# rec_type “Q” for quotes, “T” for trades
# symbol Value from corresponding records
# event_tm Value from corresponding records
# event_seq_nb From quotes, null for trades
# exchange Value from corresponding records
# bid_pr From quotes, null for trades
# bid_size From quotes, null for trades
# ask_pr From quotes, null for trades
# ask_size From quotes, null for trades
# trade_pr From trades, null for quotes
# mov_avg_pr From trades, null for quotes

quotes_union = spark.sql("""
                      select trade_dt,"Q" as rec_type,symbol,event_tm,
                             event_seq_nb,exchange,bid_pr,bid_size,ask_pr,
                             ask_size,null as trade_pr,null as mov_avg_price
                        from quotes
                      union
                      select trade_dt,"T" as rec_type,symbol,event_tm,
                             event_seq_nb,exchange,null as bid_pr,null as bid_size,
                             null as ask_pr,null as ask_size,trade_pr,mov_avg_price
                        from temp_trade_moving_avg
                    """)

quotes_union.createOrReplaceTempView("quotes_union")

In [26]:
quotes_union.show()

+----------+--------+------+--------------------+------------+--------+------+--------+------+--------+--------+-------------+
|  trade_dt|rec_type|symbol|            event_tm|event_seq_nb|exchange|bid_pr|bid_size|ask_pr|ask_size|trade_pr|mov_avg_price|
+----------+--------+------+--------------------+------------+--------+------+--------+------+--------+--------+-------------+
|2020-08-06|       Q|  SYMB|2020-08-06 10:14:...|           6|    NYSE| 35.00|     100| 36.79|     100|    null|         null|
|2020-08-06|       Q|  SYMA|2020-08-06 12:52:...|          27|    NYSE| 77.00|     100| 78.24|     100|    null|         null|
|2020-08-06|       Q|  SYMC|2020-08-06 13:19:...|          32|  NASDAQ|160.26|     100|161.88|     100|    null|         null|
|2020-08-06|       Q|  SYMA|2020-08-06 16:22:...|          55|    NYSE| 79.21|     100| 80.53|     100|    null|         null|
|2020-08-06|       Q|  SYMA|2020-08-06 18:50:...|          77|  NASDAQ| 75.40|     100| 77.08|     100|    null

In [27]:
# quote_union_update = spark.sql("""
# select *,
# # [logic for the last not null trade price] AS last_trade_pr,
# # [logic for the last not null mov_avg_pr price] AS last_mov_avg_pr
# from quote_union
# """)
# quote_union_update.createOrReplaceTempView("quote_union_update")


# Populate The Latest trade_pr and mov_avg_pr
quote_union_update=spark.sql("""select * 
                                    from (select trade_dt,
                                                 event_tm,
                                                 event_seq_nb,
                                                 symbol,
                                                 exchange,
                                                 trade_pr as last_trade_pr,
                                                 mov_avg_price as last_mov_avg_pr,
                                                 rank() over (partition by symbol, exchange order by event_tm desc)as rank 
                                          from  quotes_union
                                          where rec_type='T') a
                                where rank =1
                            """)

quote_union_update.createOrReplaceTempView("quote_union_update")

In [28]:
quote_union_update.show()

+----------+--------------------+------------+------+--------+-------------+---------------+----+
|  trade_dt|            event_tm|event_seq_nb|symbol|exchange|last_trade_pr|last_mov_avg_pr|rank|
+----------+--------------------+------------+------+--------+-------------+---------------+----+
|2020-08-06|2020-08-06 21:33:...|         100|  SYMA|  NASDAQ|        77.44|      77.440000|   1|
|2020-08-06|2020-08-06 22:00:...|         100|  SYMA|    NYSE|        76.31|      76.875000|   1|
|2020-08-06|2020-08-06 21:10:...|         100|  SYMB|  NASDAQ|        35.75|      35.750000|   1|
|2020-08-06|2020-08-06 21:46:...|         100|  SYMB|    NYSE|        35.92|      35.920000|   1|
|2020-08-06|2020-08-06 21:17:...|         100|  SYMC|  NASDAQ|       156.84|     156.840000|   1|
|2020-08-06|2020-08-06 21:32:...|         100|  SYMC|    NYSE|       159.38|     158.110000|   1|
+----------+--------------------+------------+------+--------+-------------+---------------+----+

In [29]:
# quote_update = spark.sql("""
# select trade_dt, symbol, event_tm, event_seq_nb, exchange,
# bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr
# from quote_union_update
# where rec_type = 'Q'
# """)
# quote_update.createOrReplaceTempView("quote_update")

# Filter the quote records after the above trade records calculation
quotes_update = spark.sql("""select q.trade_dt, q.symbol, q.event_tm, 
                                         q.event_seq_nb, q.exchange,q.bid_pr, q.bid_size,
                                         q.ask_pr, q.ask_size, u.last_trade_pr, u.last_mov_avg_pr
                                    from quotes_union as q
                                    left join quote_union_update as u
                                       on q.exchange=u.exchange
                                       and q.symbol=u.symbol
                                  where q.rec_type = 'Q'
                              """)


quotes_update.createOrReplaceTempView("quotes_update")

In [30]:
quotes_update.show()

+----------+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+
|  trade_dt|symbol|            event_tm|event_seq_nb|exchange|bid_pr|bid_size|ask_pr|ask_size|last_trade_pr|last_mov_avg_pr|
+----------+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+
|2020-08-06|  SYMB|2020-08-06 10:14:...|           6|    NYSE| 35.00|     100| 36.79|     100|        35.92|      35.920000|
|2020-08-06|  SYMA|2020-08-06 12:52:...|          27|    NYSE| 77.00|     100| 78.24|     100|        76.31|      76.875000|
|2020-08-06|  SYMC|2020-08-06 13:19:...|          32|  NASDAQ|160.26|     100|161.88|     100|       156.84|     156.840000|
|2020-08-06|  SYMA|2020-08-06 16:22:...|          55|    NYSE| 79.21|     100| 80.53|     100|        76.31|      76.875000|
|2020-08-06|  SYMA|2020-08-06 18:50:...|          77|  NASDAQ| 75.40|     100| 77.08|     100|        77.44|      77.440000|


In [32]:
# Join With Table temp_last_trade To Get The Prior Day Close Price

# SELECT /*+ BROADCAST(small) */ * 
# FROM large JOIN small
# ON larger.foo = small.foo

# quote_final = spark.sql("""
# select trade_dt, symbol, event_tm, event_seq_nb, exchange,
# bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr,
# bid_pr - close_pr as bid_pr_mv, ask_pr - close_pr as ask_pr_mv
# from (
# # [Broadcast temp_last_trade table. Use quote_update to left outer join
# temp_last_trade]
# ) a
# """)

quote_final = spark.sql(""" select /*+ broadcast(temp_last_trade) */ q.symbol, q.event_tm, 
                                        q.event_seq_nb, q.exchange, q.bid_pr, q.bid_size, 
                                        q.ask_pr, q.ask_size,q.last_trade_pr, q.last_mov_avg_pr,
                                        q.bid_pr - t.last_trade_pr as bid_pr_mv, 
                                        q.ask_pr - t.last_trade_pr as ask_pr_mv
                            from quotes_update as q
                            left outer join temp_last_trade as t
                            on q.exchange= t.exchange
                            and q.symbol = t.symbol""")


In [34]:
quote_final.show()

+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+---------+---------+
|symbol|            event_tm|event_seq_nb|exchange|bid_pr|bid_size|ask_pr|ask_size|last_trade_pr|last_mov_avg_pr|bid_pr_mv|ask_pr_mv|
+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+---------+---------+
|  SYMB|2020-08-06 10:14:...|           6|    NYSE| 35.00|     100| 36.79|     100|        35.92|      35.920000| 1.040000| 2.830000|
|  SYMA|2020-08-06 12:52:...|          27|    NYSE| 77.00|     100| 78.24|     100|        76.31|      76.875000|-0.790000| 0.450000|
|  SYMC|2020-08-06 13:19:...|          32|  NASDAQ|160.26|     100|161.88|     100|       156.84|     156.840000| 2.240000| 3.860000|
|  SYMA|2020-08-06 16:22:...|          55|    NYSE| 79.21|     100| 80.53|     100|        76.31|      76.875000| 1.420000| 2.740000|
|  SYMA|2020-08-06 18:50:...|          77|  NASDAQ| 75.40|    

In [35]:
spark.catalog.listTables()

[Table(name=u'hivesampletable', database=u'default', description=None, tableType=u'MANAGED', isTemporary=False), Table(name=u'temp_last_trade', database=u'default', description=None, tableType=u'MANAGED', isTemporary=False), Table(name=u'temp_trade_moving_avg', database=u'default', description=None, tableType=u'MANAGED', isTemporary=False), Table(name=u'prev_trade', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'quote_union_update', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'quotes', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'quotes_union', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'quotes_update', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'temp_quote', database=None, description=None, tableType=u'TEMPORARY', isTemporary=True), Table(name=u'temp_trades', database=

In [36]:
#Write The Final Dataframe Into Azure Blob Storage At Corresponding Partition
quote_final.write.parquet(filepath + "/quote-trade-analytical/date={}".format('2020-08-06'))