In [None]:
%%configure -f
{ "conf":{
     "spark.pyspark.python": "python3"
    ,"spark.pyspark.virtualenv.enabled": "true"
    ,"spark.kubernetes.executor.node.selector.node-lifecycle":"spot"
    ,"spark.pyspark.virtualenv.type":"native"
    ,"spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    ,"spark.sql.files.ignoreCorruptFiles":"true"
    ,"spark.dynamicAllocation.executorIdleTimeout":"18000"
    ,"spark.driver.memory":"32g"
    ,"spark.driver.cores":"32"
    ,"spark.driver.maxResultSize":"24g"
    ,"spark.executor.memory":"32g"
    ,"spark.network.timeout":"300"
    ,"spark.executor.cores":"8"
    ,"spark.dynamicAllocation.maxExecutors":"500"
    ,"livy.server.session.timeout":"24h"
    ,"spark.sql.shuffle.partitions":"15000"
    ,"spark.driver.extraJavaOptions":"-DPYARROW_IGNORE_TIMEZONE:1"
    ,"spark.executor.extraJavaOptions":"-DPYARROW_IGNORE_TIMEZONE:1"
    }
}  

In [1]:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

s3_path='s3://maystreetdata/feeds_norm/mstnorm_parquet_0_5_0/'
s3_working_dir='s3://maystreetdata/analysis/'
# col('ReceiptTimestamp') is in nano seconds in UTC timezone so we convert it to unix timestamp in exchange timezone
# keeping first 10 digits and then convert it pySpark timestamp, 
f_hour_from_ns_ts= f.from_utc_timestamp(f.from_unixtime(f.col('ReceiptTimestamp').cast(t.StringType())[0:10]),'America/New_York').cast(t.StringType())[12:2]
select_cols = ['f','Feed','SequenceNumber','Product','ReceiptTimestamp','dt']
nbbo_df= spark.read.parquet(f'{s3_path}mt=nbbo_quote/').select(select_cols+['BidPrice','BidQuantity','AskPrice','AskQuantity']).withColumn('dtHour', f_hour_from_ns_ts)
trade_df= spark.read.parquet(f'{s3_path}mt=trade/').select(select_cols+['Price','Quantity','Side','OrderReferenceNumber','LeavesQuantity']).withColumn('dtHour', f_hour_from_ns_ts)
order_df= spark.read.parquet(f'{s3_path}mt=add_order/').select(select_cols+['Price','Quantity','Side','OrderReferenceNumber']).withColumn('dtHour', f_hour_from_ns_ts)
one_mill_const = 1000000

## ORDERS SECTION ##

In [None]:
#calculate trade volume and number of orders per exchange
order_df.groupBy(['feed','f']).agg((f.sum(f.col('Quantity') * f.col('Price'))/f.lit(one_mill_const)).cast('float').alias("DollarVolume(Million)"),
                           f.count('SequenceNumber').alias("NumOfOrders")).orderBy(['feed','f']).pandas_api()\
.style.format(thousands=',',precision=0).background_gradient()

In [None]:
# below we calculate the following metrics grouped per hour
# trade volume and number of orders per exchangetrade volume and number of orders per hour across all exchanges
order_df_analysis = order_df\
.groupBy(['dtHour']).agg(
     f.countDistinct('OrderReferenceNumber').alias("NumOfOrders"),
    f.round(f.sum(f.col('Quantity') * f.col('Price'))/f.lit(one_mill_const),0).cast('float').alias("DollarVolume(Million)"),
    (f.avg(f.when(f.col('Side')=='Bid', (f.col('Quantity') * f.col('Price'))).otherwise(None))/f.lit(1)).cast('float').alias("AvgDollarVolume_Bid"),
    (f.avg(f.when(f.col('Side')=='Ask', (f.col('Quantity') * f.col('Price'))).otherwise(None))/f.lit(1)).cast('float').alias("AvgDollarVolume_Ask"),
                        ).orderBy(['dtHour'])#.cache()

order_df_analysis.pandas_api().style.format(thousands=',',precision=0).background_gradient()

# what you need to do:
# calculate the overall trading volume for work hours vs after-hours 
# calculating distinct values over high-cardinality column may be expensive, you can try calcualting approximate distinct count and see what performance improvements it brings.   

In [None]:
#plot line graph based on spark query results using seaborn
#import seaborn as sns
#sns.set_theme()
#ax = sns.relplot(
#    data=order_df_analysis.select('dtHour','DollarVolume').withColumn('DollarVolume(millions)',f.col('DollarVolume')/1000000).orderBy('dtHour').toPandas(), 
#    height=3, aspect=3.5, kind='line',
#    x="dtHour", y="DollarVolume(millions)"
#) 
order_df_analysis.pandas_api().set_index('dtHour')[['DollarVolume(Million)']].plot.bar()

In [None]:
#plot line graph based on spark query results using pyspark pandas
#using pyspark pandas is preferred over toPandas() method that copies all data to the driver node. 
order_df_analysis.select('dtHour','AvgDollarVolume_Bid','AvgDollarVolume_Ask')\
.orderBy('dtHour').pandas_api().set_index("dtHour").plot()


In [6]:
# get some individual tickers to analyse
sample_tickers=['AMZN','MSFT','GS','GE']
# calculate daily volumes for those tickers
plot_df = order_df.filter(order_df.Product.isin(sample_tickers)).select(
    f.col('dtHour'),'dt', 'Product', 'Quantity', 'Price','f','Feed')\
    .groupBy('dt','dtHour','Product','f','Feed')\
    .agg(f.sum(f.col('Quantity')*f.col('Price')/f.lit(one_mill_const)).alias('DollarVolume(Millions)')
).cache()

#calculate percentage of stock's volume on given exchange vs total volume on all exchanges(Feed) per hour of the day
plot_df=plot_df.withColumn('Percent',  
    (100*f.col('DollarVolume(Millions)'))/f.sum('DollarVolume(Millions)').over(Window.partitionBy(['dt','dtHour','Product']))
)

#calculate percentage of stock's volume on given exchange per hour across all days vs total stock volume on all exchanges(Feed) per hour across all days
plot_df=plot_df.withColumn('PercentHour',  
    (100*f.sum('DollarVolume(Millions)').over(Window.partitionBy(['dtHour','Product','Feed'])))/f.sum('DollarVolume(Millions)').over(Window.partitionBy(['dtHour','Product']))
)



In [None]:
plot_pd = plot_df.toPandas()
#convertion to pandas looses schema, so need to reset the types
plot_pd['DollarVolume(Millions)']=plot_pd['DollarVolume(Millions)'].astype('float');
plot_pd['PercentHour']=plot_pd['PercentHour'].astype('float');
plot_pd['Product']=plot_pd['Product'].astype('string');
plot_pd['dt']=plot_pd['dt'].astype('string');
plot_pd['dtHour']=plot_pd['dtHour'].astype('string');

plot_pd_wide =pd.pivot_table(plot_pd, values='PercentHour', index=['dtHour'],
                       columns=['Product','Feed']).fillna(0)
plot_pd_wide.style.format(thousands=',',precision=0).background_gradient(axis=0)
#BL: graph it as stacked bars to easily visualize exchange market share would be good visual

In [None]:
#generate visual using built-in pandas integration with matplotlib
# we use toPandas() here 
for one_t in sample_tickers:
    plot_pd.query(f"Product=='{one_t}'").pivot_table(index=["dtHour"], columns=["Feed"],values="DollarVolume(Millions)")\
    .plot(title=f"{one_t}",kind="bar", stacked='True', ylabel="DollarVolume(Millions)", figsize=(6, 3))

#BL: broke this by adding one more field - tried to fix it but gave up since i rarely use seaborn

## TRADES SECTION ##

In [259]:
#cover for order partial refills, replacing price with vwap price and timestamp with the latest filled part of the order
#we also make sure we only look for the fully filled orders, last LeavesQuantity=0 
trade_df_grouped = trade_df.groupBy('OrderReferenceNumber','Product','Side','Feed','f',"dtHour",'dt').agg(
    f.sum(f.col('Quantity')).alias('Quantity'),
    f.sum(f.col('Quantity') * f.col('Price')).alias('TradeSizeUsd'),
    f.min('ReceiptTimestamp').alias('ReceiptTimestamp'),
    f.last(f.col('LeavesQuantity'),True).alias('Unfilled')
).withColumn('vWAP',f.col('TradeSizeUsd')/f.col('Quantity')).filter(f.col('Unfilled')<1).cache()

In [None]:
#verify that calculation is correct
#
#BL: order hits the order book few minutes before the fill, so the timestamp and MID price has to be calculated using thisd time. if you use join need to join on [OrderReferenceNumber,Product, Side, f, Feed] 
#BL: as orderRefNums are unique only per day within scope of single feed/f/product. technically not product but within single session but since we dont know how their shard sessions we need to join on all columns
order_df.where("OrderReferenceNumber=='1000045252' and Product=='SSB'").show()
trade_df.filter(f.col('OrderReferenceNumber')=='1000045252').show(10)
trade_df_grouped.filter(f.col('OrderReferenceNumber')=='1000045252').show(10)
nbbo_df.groupBy("Feed",'f').count().show()
# BL: https://www.sec.gov/comments/265-29/26529-11.pdf for context on UQDF vs CQS

In [None]:
#create the union of trades and nbbo data to simulate as-of join 
# normalize list of columns for trades_df and nbbo_df, initialize missing columns as nulls
#we also add DataType column initialised as 1 for trades and 0 for nbbo to separate datasets
trade_df_grouped=trade_df_grouped\
    .withColumn('BidPrice',f.lit(None).cast(t.NullType()))\
    .withColumn('BidQuantity',f.lit(None).cast(t.NullType()))\
    .withColumn('AskPrice',f.lit(None).cast(t.NullType()))\
    .withColumn('AskQuantity',f.lit(None).cast(t.NullType()))\
    .withColumn('DataType',f.lit(1).cast(t.IntegerType()))
nbbo_df = nbbo_df.withColumn('vWAP',f.lit(None).cast(t.NullType()))\
    .withColumn('Quantity',f.lit(None).cast(t.NullType()))\
    .withColumn('Side',f.lit(None).cast(t.NullType()))\
    .withColumn('DataType',f.lit(0).cast(t.IntegerType()))

#union both dataframes, and make sure the names are order of columns are identical
#BL: i added Feed and f as helps to create a story later
select_cols = ["Feed","f",'Product','dtHour','vWAP','Quantity','Side','BidPrice','BidQuantity','AskPrice','AskQuantity','ReceiptTimestamp','DataType','dtHour','dt']
union_df = trade_df_grouped.select(select_cols)\
.union(nbbo_df.select(select_cols))
union_df

In [12]:
from pyspark.sql.window import Window
#we are looking for the closest timestamp to the current row. 
lookup_window=Window.partitionBy('Product').orderBy("Feed","f",'ReceiptTimestamp','DataType').rowsBetween(Window.unboundedPreceding, Window.currentRow)


In [59]:
# caclulate the price difference based on Nbbo midpoint
where_list="','".join(sample_tickers)
if False:
    union_df_price = union_df \
        .where(f"Product in ('{where_list}')")\
        .select('Feed','f','Product','vWAP','Quantity','Side','ReceiptTimestamp','dtHour','dt',
        f.last('AskPrice', True).over(lookup_window).alias('AskPriceNbbo'),
        f.last('BidPrice', True).over(lookup_window).alias('BidPriceNbbo'),
        f.last(f.when(f.col('DataType') < 1, f.col('ReceiptTimestamp') ).otherwise(None), True).over(lookup_window).alias('TimestampNbbo')        
        )\
        .withColumn('MidPrice', (f.col('AskPriceNbbo') + f.col('BidPriceNbbo'))/2)\
        .withColumn('ArrivalPriceImprovementBP', ((f.col('vWAP') -f.col('MidPrice'))/f.col('MidPrice') )*1000).filter(f.col('AskPrice').isNull()).cache()
else:
    union_df_price = union_df \
        .select('Feed','f','Product','vWAP','Quantity','Side','ReceiptTimestamp','dtHour','dt',
        f.last('AskPrice', True).over(lookup_window).alias('AskPriceNbbo'),
        f.last('BidPrice', True).over(lookup_window).alias('BidPriceNbbo'),
        f.last(f.when(f.col('DataType') < 1, f.col('ReceiptTimestamp') ).otherwise(None), True).over(lookup_window).alias('TimestampNbbo')        
        )\
        .withColumn('MidPrice', (f.col('AskPriceNbbo') + f.col('BidPriceNbbo'))/2)\
        .withColumn('ArrivalPriceImprovementBP', ((f.col('vWAP') -f.col('MidPrice'))/f.col('MidPrice') )*1000).filter(f.col('AskPrice').isNull()).cache()
    
union_df_price
if False:
    union_partition_columns=['dt','Product']
    union_df_price.repartition(*union_partition_columns).write.partitionBy(*union_partition_columns)\
            .mode('overwrite').parquet(f"{s3_working_dir}/price_trade_df.parquet")
union_df_price=spark.read.parquet(f"{s3_working_dir}/price_trade_df.parquet")

In [None]:
#BL: please check if gap fill forward works properly and then we just remove filter on top to do this visual accross the board
right_tale_limit = 500
if True:
    arrival_price_stats_pd=union_df_price.groupby('dtHour','Feed','f','Product').agg(f.mean('ArrivalPriceImprovementBP').alias('AvgArrivalPriceImprovementBP'))\
    .where("AvgArrivalPriceImprovementBP is not null").toPandas()
    
arrival_price_stats_less_oiutliers_pd=arrival_price_stats_pd.query(f"AvgArrivalPriceImprovementBP<{right_tale_limit}")
#arrival_price_stats_pd.groupby('Product').agg(f.mean('AvgArrivalPriceImprovementBP').alias('AvgArrivalPriceImprovementBP')).pandas_api()[["AvgArrivalPriceImprovementBP"]].astype(float).hist()
def graph_hist(l_df,l_title,bins=100,figsize=(6,2)):
    l_df.hist(bins=bins,figsize=figsize);plt.title(l_title);
    plt.suptitle('').set_visible(False)
graph_hist((arrival_price_stats_pd[['AvgArrivalPriceImprovementBP']].astype(float)),"Average Arrival Price Improvement")
graph_hist(arrival_price_stats_less_oiutliers_pd[['AvgArrivalPriceImprovementBP']].astype(float),"Average Arrival Price Improvement(no outliers)")
graph_hist(arrival_price_stats_pd.groupby('Product').agg(AvgArrivalPriceImprovementBP=("AvgArrivalPriceImprovementBP","mean")),"Average Arrival Price Improvement by single name")
graph_hist(arrival_price_stats_less_oiutliers_pd.groupby('Product').agg(AvgArrivalPriceImprovementBP=("AvgArrivalPriceImprovementBP","mean")),"Average Arrival Price Improvement by single name (no outliers)")

In [None]:
plt.figure(figsize=(15,4));sns.histplot(arrival_price_stats_less_oiutliers_pd, x='AvgArrivalPriceImprovementBP', hue='dtHour',multiple='dodge', shrink=.75, bins=20)\
.set_title('Average Arrival Price Improvement - by hour of the day');

In [None]:
plt.figure(figsize=(15,4));sns.histplot(arrival_price_stats_less_oiutliers_pd, x='AvgArrivalPriceImprovementBP', hue='Feed',multiple='dodge', shrink=.75, bins=20)\
.set_title('Average Arrival Price Improvement - by Feed');

In [None]:
plt.figure(figsize=(15,4));sns.histplot(arrival_price_stats_less_oiutliers_pd, x='AvgArrivalPriceImprovementBP', hue='f',multiple='dodge', shrink=.75, bins=20)\
.set_title('Average Arrival Price Improvement - by "f"');

In [None]:
#pick few orders to sample to simulate "your own" trades
#trade_df.filter(f.col('OrderReferenceNumber')=='1000045252').show(10)
lookback_offset=1*1000*1000*1000*60
trade_stats_df=trade_df.where("OrderReferenceNumber is not null and OrderReferenceNumber != 0 and Product='AMZN'")\
.groupBy('OrderReferenceNumber',"Product","Feed",'f','dt','Side').agg(f.count('ReceiptTimestamp').alias('partial_count'),
                                                                  f.min('ReceiptTimestamp').alias('fill_start'),
                                                                  f.max('ReceiptTimestamp').alias('fill_end')
                                                                 )\
.select("*",
        f.round((f.col("fill_end")-f.col("fill_start"))/1000/1000/1000/60,1).alias('fill_duration_min'),
        (f.col('fill_start')-lookback_offset).alias('ts_start'),
        (f.col('fill_end')+lookback_offset).alias('ts_end')
       ).where("fill_duration_min>1 and partial_count>20")
print(trade_stats_df.count())
trade_stats_df.orderBy('partial_count','fill_duration_min',ascending=False).orderBy("OrderReferenceNumber").show()
sample_trades=trade_stats_df.drop('fill_start','fill_end','SequenceNumber').join(trade_df.drop('SequenceNumber'),on=['OrderReferenceNumber',"Product",'dt','Feed','f'])
sample_trades_dict=trade_stats_df.orderBy("OrderReferenceNumber").collect()


In [None]:
where_list=[]
for one_trade_det in sample_trades_dict:
    product,ts_s,ts_e=one_trade_det['Product'],one_trade_det['ts_start'],one_trade_det['ts_end']
    where_list.append(f"Product=='{product}' and (ReceiptTimestamp between {ts_s} and  {ts_e}) ")
where_text = "("+') OR ('.join(where_list)+")"
where_text

In [238]:
nbbo_sample_pd  = nbbo_df.where(where_text).toPandas()
sample_trades_pd=sample_trades.toPandas()

In [239]:
sample_trade_price_pd  = sample_trades_pd.set_index('ReceiptTimestamp')[['OrderReferenceNumber','Side','Price','dt','ts_start','ts_end']].join(nbbo_sample_pd.set_index('ReceiptTimestamp')[['BidPrice','AskPrice','dt']],how='outer',lsuffix='_tr', rsuffix='_nb').sort_index()
sample_trade_price_pd['BidPrice']=np.where(sample_trade_price_pd['dt_nb'].astype(str) < '2022-06-06' , sample_trade_price_pd['BidPrice'].astype(float)/20,sample_trade_price_pd['BidPrice'].astype(float))
sample_trade_price_pd['AskPrice']=np.where(sample_trade_price_pd['dt_nb'].astype(str) < '2022-06-06' , sample_trade_price_pd['AskPrice'].astype(float)/20,sample_trade_price_pd['AskPrice'].astype(float))
sample_trade_price_pd['Price']=np.where(sample_trade_price_pd['dt_tr'].astype(str) < '2022-06-06' , sample_trade_price_pd['Price'].astype(float)/20,sample_trade_price_pd['Price'].astype(float))


sample_trade_price_pd['BidPrice']=sample_trade_price_pd['BidPrice'].fillna(method='ffill').fillna(method='bfill')
sample_trade_price_pd['AskPrice']=sample_trade_price_pd['AskPrice'].fillna(method='ffill').fillna(method='bfill')
sample_trade_price_pd['Price']=sample_trade_price_pd['Price'].astype(float)


In [None]:
plt.rcParams["figure.figsize"] = (10,3)
for one_ord in sample_trade_price_pd[['OrderReferenceNumber','ts_start','ts_end','dt_tr']].drop_duplicates().dropna().iterrows():
    ord_ref = one_ord[1].OrderReferenceNumber
    ts_start = (one_ord[1].ts_start)
    ts_end = (one_ord[1].ts_end)
    dt_tr = (one_ord[1].dt_tr)
    one_pd = sample_trade_price_pd.query(f"ReceiptTimestamp>={ts_start}").query(f"ReceiptTimestamp<={ts_end}")
    plot_title = f"{ord_ref} executed on {dt_tr} at {one_pd['Side'].drop_duplicates().dropna().values[0][0]}"
    #one_pd[['BidPrice','Price','AskPrice']].plot(figsize=(10,3),title=plot_title)
    plt.plot(one_pd.index,one_pd['BidPrice'], marker='', color='green', linewidth=1, label="BID")
    plt.plot(one_pd.index,one_pd['AskPrice'], marker='', color='red', linewidth=1, label="ASK")
    plt.plot(one_pd.index,one_pd['Price'], marker='*', color='blue', linewidth=0, label="ASK")
    plt.legend()
    plt.title(plot_title)
    plt.show()
    

In [None]:
trade_df\
.withColumn('ReceiptTimestampEST',f.from_utc_timestamp(f.from_unixtime(f.col('ReceiptTimestamp').cast(t.StringType())[0:10]),'America/New_York'))\
.withColumn('ReceiptTimestampESTDay',f.date_format(f.col('ReceiptTimestampEST'),'yyyy-MM-dd'))\
.withColumn('ReceiptTimestampESTTime',f.date_format(f.col('ReceiptTimestampEST'),'HH:mm:ss'))\
.withColumn('ReceiptTimestampESTWorkingHours',f.when( (f.col('ReceiptTimestampESTTime')>='09:30:00') & (f.col('ReceiptTimestampESTTime')<'16:00:00'), 1).otherwise(0)).show()

In [262]:
#calculate price difference vs Close
#adding necessary time columns
trade_df_grouped = trade_df_grouped\
.withColumn('ReceiptTimestampEST',f.from_utc_timestamp(f.from_unixtime(f.col('ReceiptTimestamp').cast(t.StringType())[0:10]),'America/New_York'))\
.withColumn('ReceiptTimestampESTDay',f.date_format(f.col('ReceiptTimestampEST'),'yyyy-MM-dd'))\
.withColumn('ReceiptTimestampESTTime',f.date_format(f.col('ReceiptTimestampEST'),'HH:mm:ss'))\
.withColumn('ReceiptTimestampESTWorkingHours',f.when( (f.col('ReceiptTimestampESTTime')>='09:30:00') & (f.col('ReceiptTimestampESTTime')<'16:00:00'), 1).otherwise(0))


In [None]:
trade_df_grouped.printSchema()

In [None]:
#define window
daily_window=Window.partitionBy('Product','ReceiptTimestampESTDay','Side').orderBy('ReceiptTimestampEST').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

#run aggregation to compare price improvements for AMZN stock
#vs daily Close price 
#vs daily max price
#vs daily max price during exchange working hours 
trade_df_grouped.filter((f.col('Side')=="Bid") | (f.col('Side')=="Ask")).filter(f.col('Product')=='AMZN').select(
    'Product','Side','vWAP','ReceiptTimestampESTDay','ReceiptTimestampESTTime','ReceiptTimestampESTWorkingHours', 
    f.last(f.when(f.col('ReceiptTimestampESTWorkingHours')>0, f.col('vWAP')).otherwise(None),True).over(daily_window).alias('vWAPClose'),
    f.max('vWAP').over(daily_window).alias('vWAPDaily'),
    f.max(f.when(f.col('ReceiptTimestampESTWorkingHours')>0, f.col('vWAP')).otherwise(None)).over(daily_window).alias('vWAPDailyWorkingHours')                        
).withColumn('ArrivalPriceImprovementBPvsClose', (f.col('vWAP') - f.col('vWAPClose'))*1000)\
.withColumn('ArrivalPriceImprovementBPvsDaily', (f.col('vWAP') - f.col('vWAPDaily'))*1000)\
.withColumn('ArrivalPriceImprovementBPvsDailyWH', (f.col('vWAP') - f.col('vWAPDailyWorkingHours'))*1000).show(100)