In [51]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, countDistinct, lead, lag
from pyspark.sql.window import Window

In [52]:
%load_ext sparkmagic.magics

The sparkmagic.magics extension is already loaded. To reload it, use:
  %reload_ext sparkmagic.magics


In [53]:
findspark.init()

In [54]:
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()
work_dir = 'test_data'

In [55]:
executions_df = spark.read.parquet(f'{work_dir}/exectuions.parquet')
ref_df = spark.read.parquet(f'{work_dir}/refdata.parquet')
market_df = spark.read.parquet(f'{work_dir}/marketdata.parquet')

In [56]:
%%time
record_count = executions_df.count()
print(f"Executions count: {record_count}")

Executions count: 4203
CPU times: user 2.18 ms, sys: 2.28 ms, total: 4.46 ms
Wall time: 201 ms


In [57]:
%%time
unique_executions = executions_df.agg(countDistinct("Venue", "TradeTime").alias("count_unique_venue_executions"))
unique_executions.show()

+-----------------------------+
|count_unique_venue_executions|
+-----------------------------+
|                         3847|
+-----------------------------+

CPU times: user 7.14 ms, sys: 4.42 ms, total: 11.6 ms
Wall time: 472 ms


In [58]:
%%time
count_continuous_trades = executions_df.filter(col("Phase") == 'CONTINUOUS_TRADING').count()
print(f"Total count of continuous trades: {count_continuous_trades}")

Total count of continuous trades: 4103
CPU times: user 3.36 ms, sys: 2.5 ms, total: 5.86 ms
Wall time: 183 ms


In [59]:
%%time
transformed_executions_df = executions_df.withColumn("side" , 
                         when((col('Quantity') < 0 ) , 2)
                         .otherwise(1)
                        )

enriched_df = transformed_executions_df.join(ref_df.withColumnRenamed('Currency','reference_Currency')
                                             , on='ISIN', how='left')
enriched_df.write.mode("overwrite").parquet(f'{work_dir}/output/enriched_df/')

CPU times: user 6.09 ms, sys: 3.2 ms, total: 9.3 ms
Wall time: 410 ms


In [60]:
window_spec = Window().partitionBy("listing_id").orderBy("event_timestamp")

In [61]:
%%time
best_bid_ask_df = market_df.withColumn(
    "best_bid_min_1s",
    lag("best_bid_price").over(window_spec),
).withColumn(
    "best_bid_1s",
    lead("best_bid_price").over(window_spec),
).withColumn(
    "best_ask_min_1s",
    lag("best_ask_price").over(window_spec),
).withColumn(
    "best_ask_1s",
    lead("best_ask_price").over(window_spec),
)
best_bid_ask_df.write.mode("overwrite").parquet(f'{work_dir}/output/best_bid_ask/')



CPU times: user 19.6 ms, sys: 7.98 ms, total: 27.6 ms
Wall time: 6.91 s


                                                                                

In [62]:
%%time
# Assuming (mind price = the average of the current quoted bid and ask price)
mid_price_df = market_df.withColumn(
    "mid_price",
    (col("best_bid_price") + col("best_ask_price")) / 2
).withColumn(
    "mid_price_min_1s",
    lag("mid_price").over(window_spec),
).withColumn(
    "mid_price_1s",
    lead("mid_price").over(window_spec),
)
mid_price_df.write.mode("overwrite").parquet(f'{work_dir}/output/mid_price/')



CPU times: user 18.6 ms, sys: 7.75 ms, total: 26.4 ms
Wall time: 5.72 s


                                                                                

In [None]:
%%time
# market_df.printSchema
# enriched_df.printSchema
slippage_df = market_df.join(enriched_df.withColumnRenamed('primary_mic','enriched_primary_mic'), 
                             market_df["listing_id"] == enriched_df["id"], how="left").withColumn('slippage', 
                                  when(col('side') == 2  , 
                                       (col("Price") - col("best_bid_price") ) / ( col("best_ask_price") - col("best_bid_price")) )
                                  .otherwise(
                                       (col("best_ask_price")  - col("Price") )/ ( col("best_ask_price") - col("best_bid_price"))
                         )
           )
slippage_df.write.mode("overwrite").parquet(f'{work_dir}/output/slippage/')

