In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [6]:
from configreader import ConfigReader

reader = ConfigReader("config.cfg", "azure-storage")
config = reader.get_config()

# Get Azure storage info from config
storage_acct_name = config["account_name"]
storage_acct_access_key = config["access_key"]
storage_container = config["container_name"]
mount_root = config["mount_root"]


# Set Spark Azure storage account and key
storage_acct_key_str = f"fs.azure.account.key.{storage_acct_name}.blob.core.windows.net"
spark.conf.set(storage_acct_key_str, storage_acct_access_key)

# Set base Spark filepath for container
container_base_path = f"​wasbs://{storage_container}@{storage_acct_name}.blob.core.windows.net"
mount_base_path = f"{mount_root}/{storage_container}"

input_path = f"{mount_base_path}/ingested-data"

In [7]:
common_df = spark.read.parquet(input_path)
common_df = common_df.filter(common_df["partition"] != 'B')
# trade_common = spark.read.parquet("ingest_data/partition=T")
common_df.printSchema()
common_df.count()

root
 |-- trade_dt: date (nullable = true)
 |-- rec_type: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- exchange: string (nullable = true)
 |-- event_tm: timestamp (nullable = true)
 |-- event_seq_nb: integer (nullable = true)
 |-- arrival_tm: timestamp (nullable = true)
 |-- trade_pr: decimal(17,14) (nullable = true)
 |-- bid_pr: decimal(17,14) (nullable = true)
 |-- bid_size: integer (nullable = true)
 |-- ask_pr: decimal(17,14) (nullable = true)
 |-- ask_size: integer (nullable = true)
 |-- partition: string (nullable = true)



1200

In [3]:
trade = common_df.select("trade_dt", "symbol", "exchange", "event_tm", \
                            "event_seq_nb", "arrival_tm", "trade_pr")
trade.show(5)

+----------+------+--------+--------------------+------------+-------------------+--------+
|  trade_dt|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm|trade_pr|
+----------+------+--------+--------------------+------------+-------------------+--------+
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:38:...|           1|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:46:...|           2|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:52:...|           3|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:58:...|           4|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:07:...|           5|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:15:...|           6|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:22:...|           7|2020-08-06 09:30:00|    null|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:29:...|           8|2020-08-06 09:30:0

In [32]:
common_df.createOrReplaceTempView('data')

In [33]:
row_ranked_data = spark.sql("""
    SELECT 
        *
        , ROW_NUMBER() over (
            PARTITION BY trade_dt, rec_type, symbol, exchange, event_tm, event_seq_nb 
            ORDER BY arrival_tm desc) as rn
    FROM data
""")

row_ranked_data.createOrReplaceTempView('row_ranked_data')
row_ranked_data.show(5)

+----------+--------+------+--------+--------------------+------------+-------------------+--------+------------------+--------+------------------+--------+---------+---+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm|trade_pr|            bid_pr|bid_size|            ask_pr|ask_size|partition| rn|
+----------+--------+------+--------+--------------------+------------+-------------------+--------+------------------+--------+------------------+--------+---------+---+
|2020-08-05|       Q|  SYMA|  NASDAQ|2020-08-05 11:34:...|          17|2020-08-05 09:30:00|    null| 78.06235033524703|     100| 78.36705025855920|     100|        Q|  1|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 14:16:...|          39|2020-08-05 09:30:00|    null| 74.63142350491319|     100| 75.82283182627648|     100|        Q|  1|
|2020-08-05|       Q|  SYMB|  NASDAQ|2020-08-05 11:22:...|          16|2020-08-05 09:30:00|    null| 33.62173563866898|     100| 35.1669689818263

In [34]:
deduped_df = spark.sql("""
    SELECT
        trade_dt
        , rec_type
        , symbol
        , exchange
        , event_tm
        , event_seq_nb
        , arrival_tm
        , trade_pr
        , bid_pr
        , bid_size
        , ask_pr
        , ask_size
        , partition
    FROM row_ranked_data
    WHERE rn = 1
""")
deduped_df.count()
deduped_df.show(5)
deduped_df.createOrReplaceTempView('deduped_data')

+----------+--------+------+--------+--------------------+------------+-------------------+--------+------------------+--------+------------------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm|trade_pr|            bid_pr|bid_size|            ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+-------------------+--------+------------------+--------+------------------+--------+---------+
|2020-08-05|       Q|  SYMA|  NASDAQ|2020-08-05 11:34:...|          17|2020-08-05 09:30:00|    null| 78.06235033524703|     100| 78.36705025855920|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 14:16:...|          39|2020-08-05 09:30:00|    null| 74.63142350491319|     100| 75.82283182627648|     100|        Q|
|2020-08-05|       Q|  SYMB|  NASDAQ|2020-08-05 11:22:...|          16|2020-08-05 09:30:00|    null| 33.62173563866898|     100| 35.16696898182639|     100|        Q

In [13]:
trade_df = spark.sql("""
   SELECT
        trade_dt
        , rec_type
        , symbol
        , exchange
        , event_tm
        , event_seq_nb
        , arrival_tm
        , trade_pr
        , partition
    FROM deduped_data
    WHERE rec_type = 'T'
""") 
quote_df = spark.sql("""
    SELECT
        trade_dt
        , rec_type
        , symbol
        , exchange
        , event_tm
        , event_seq_nb
        , arrival_tm
        , bid_pr
        , bid_size
        , ask_pr
        , ask_size
        , partition
    FROM deduped_data
    WHERE rec_type = 'Q'
""")

trade_df.show(5)
quote_df.show(5)

+----------+--------+------+--------+--------------------+------------+-------------------+------------------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm|          trade_pr|
+----------+--------+------+--------+--------------------+------------+-------------------+------------------+
|2020-08-06|       T|  SYMC|  NASDAQ|2020-08-06 20:09:...|          90|2020-08-06 09:30:00|160.98419316231528|
|2020-08-05|       T|  SYMB|  NASDAQ|2020-08-05 10:40:...|          10|2020-08-05 09:30:00| 32.15344380416123|
|2020-08-05|       T|  SYMB|    NYSE|2020-08-05 13:10:...|          30|2020-08-05 09:30:00| 34.18391410021153|
|2020-08-05|       T|  SYMA|    NYSE|2020-08-05 14:24:...|          40|2020-08-05 09:30:00| 78.43095407886027|
|2020-08-05|       T|  SYMB|    NYSE|2020-08-05 12:02:...|          20|2020-08-05 09:30:00| 33.17828008517566|
+----------+--------+------+--------+--------------------+------------+-------------------+------------------+
o

In [35]:
# Write data partitioned by trade_dt and coalesced to 4 partitions per record type
trade_df.coalesce(4).write.partitionBy("trade_dt").mode("overwrite").parquet("EOD-load/trade/")
quote_df.coalesce(4).write.partitionBy("trade_dt").mode("overwrite").parquet("EOD-load/quote/")