In [0]:
# GUIDED CAPSTONE STEP 2: THIS NOTEBOOK TAKES THE INPUT STOCK FILES (CSV OR JSON) FROM AZURE BLOB STORAGE AND PARTITIONS THEM INTO PARQUET FILES IN A DATABRICKS CLUSTER BASED ON TRADE DATA, QUOTE DATA, AND BAD DATA

In [0]:
# packages used
import os
from datetime import datetime
import json
from pyspark.sql.types import StructType, StructField, DateType, StringType, TimestampType, IntegerType, FloatType

In [0]:
# Azure credentials
storageAccountName = 'saderekguidedcapstone'
storageAccountAccessKey = '<STORAGE-ACCOUNT-ACCESS-KEY'
blobContainerName = 'container1'

In [0]:
# make files in Azure accessible to PySpark
if not any(mount.mountPoint == '/mnt/FileStore/MountFolder/' for mount in dbutils.fs.mounts()):
  try:
    dbutils.fs.mount(
    source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
    mount_point = "/mnt/FileStore/MountFolder/",
    extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
  )
  except Exception as e:
    print("already mounted. Try to unmount first")

display(dbutils.fs.ls("dbfs:/mnt/FileStore/MountFolder"))

path,name,size
dbfs:/mnt/FileStore/MountFolder/data/,data/,0


In [0]:
# this formats tuple from csv
def parse_csv(line):
    record_type_pos = 2
    record = line.split(',')
    try:
        # logic to parse records
        if record[record_type_pos] == 'T':
            event = (
                datetime.strptime(record[0], '%Y-%m-%d'), #trade_dt
                record[2], #rec_type
                record[3], #symbol
                record[6], #exchange
                datetime.strptime(record[4], '%Y-%m-%d %H:%M:%S.%f'), #event_tm
                int(record[5]), #event_seq_nb
                datetime.strptime(record[1], '%Y-%m-%d %H:%M:%S.%f'), #arrival_tm
                float(record[7]), #trade_pr
                int(record[8]), #trade_size
                None, #bid_pr
                None, #bid_size
                None, #ask_pr
                None, #ask_size
                'T', #partition
                '' #list line if bad data
            )
            return event
        elif record[record_type_pos] == 'Q':
            event = (
                datetime.strptime(record[0], '%Y-%m-%d'), #trade_dt
                record[2], #rec_type
                record[3], #symbol
                record[6], #exchange
                datetime.strptime(record[4], '%Y-%m-%d %H:%M:%S.%f'), #event_tm
                int(record[5]), #event_seq_nb
                datetime.strptime(record[1], '%Y-%m-%d %H:%M:%S.%f'), #arrival_tm
                None, #trade_pr
                None, #trade_size
                float(record[7]), #bid_pr
                int(record[8]), #bid_size
                float(record[9]), #ask_pr
                int(record[10]), #ask_size
                'Q', #partition
                '' #list line if bad data
            )
            return event
    except Exception as e:
        # save record to dummy event in bad partition
        event = (
            None, # trade_dt
            '', # rec_type
            '', # symbol
            '', # exchange
            None, # event_tm
            None, # event_seq_nb
            None, #arrival_tm
            None, #trade_pr
            None, #trade_size
            None, # bid_pr
            None, # bid_size
            None, # ask_pr
            None, # ask_size
            'B', # partition
            line # list line if bad data
        )
        return event

In [0]:
# this formats tuple from json
def parse_json(line):
    record = json.loads(line)
    record_type = record['event_type']
    try:
        # logic to parse records
        if record_type == 'T':
            event = (
                datetime.strptime(record['trade_dt'], '%Y-%m-%d'), #trade_dt
                record_type, #rec_type
                record['symbol'], #symbol
                record['exchange'], #exchange
                datetime.strptime(record['event_tm'], '%Y-%m-%d %H:%M:%S.%f'), #event_tm
                int(record['event_seq_nb']), #event_seq_nb
                datetime.strptime(record['file_tm'], '%Y-%m-%d %H:%M:%S.%f'), #arrival_tm
                float(record['price']), #trade_pr
                int(record['size']), #trade_size
                None, #bid_pr
                None, #bid_size
                None, #ask_pr
                None, #ask_size
                'T', #partition
                '' #list line if bad data
            )
            return event
        elif record_type == 'Q':
            event = (
                datetime.strptime(record['trade_dt'], '%Y-%m-%d'), #trade_dt
                record_type, #rec_type
                record['symbol'], #symbol
                record['exchange'], #exchange
                datetime.strptime(record['event_tm'], '%Y-%m-%d %H:%M:%S.%f'), #event_tm
                int(record['event_seq_nb']), #event_seq_nb
                datetime.strptime(record['file_tm'], '%Y-%m-%d %H:%M:%S.%f'), #arrival_tm
                None, #trade_pr
                None, #trade_size
                float(record['bid_pr']), #bid_pr
                int(record['bid_size']), #bid_size
                float(record['ask_pr']), #ask_pr
                int(record['ask_size']), #ask_size
                'Q', #partition
                '' #list line if bad data
            )
            return event
    except Exception as e:
        # save record to dummy event in bad partition
        event = (
            None, # trade_dt
            '', # rec_type
            '', # symbol
            '', # exchange
            None, # event_tm
            None, # event_seq_nb
            None, #arrival_tm
            None, #trade_pr
            None, #trade_size
            None, # bid_pr
            None, # bid_size
            None, # ask_pr
            None, # ask_size
            'B', # partition
            line # list line if bad data
        )
        return event

In [0]:
# this partitions input file in cluster (trades, quotes, bad data)

def partition_input_file(exchange, date):
  # exchange = 'nyse', 'nasdaq'
  # date = '20200805', '20200806'
  
  # read file as RDD
  path_excluding_file = '/mnt/FileStore/MountFolder/data/input/{}/{}/'.format(exchange, date)
  filename = os.listdir('/dbfs' + path_excluding_file)[0]
  full_path = path_excluding_file + filename
  raw_rdd = spark.sparkContext.textFile('dbfs:' + full_path)

  # dataframe schema to impose
  schema = StructType([
    StructField('trade_dt', DateType()),
    StructField('rec_type', StringType()),
    StructField('symbol', StringType()),
    StructField('exchange', StringType()),
    StructField('event_tm', TimestampType()),
    StructField('event_seq_nb', IntegerType()),
    StructField('arrival_tm', TimestampType()),
    StructField('trade_pr', FloatType()),
    StructField('trade_size', IntegerType()),
    StructField('bid_pr', FloatType()),
    StructField('bid_size', IntegerType()),
    StructField('ask_pr', FloatType()),
    StructField('ask_size', IntegerType()),
    StructField('partition', StringType()),
    StructField('bad_data', StringType())
  ])

  # format RDD
  if exchange == 'nyse':
    parsed_RDD = raw_rdd.map(lambda line: parse_csv(line))
  elif exchange == 'nasdaq':
    parsed_RDD = raw_rdd.map(lambda line: parse_json(line))
  else:
    return 'Incorrect value was provided for parameter exchange. Currently accepted values are "nyse" and "nasdaq".'

  # create dataframe from RDD with schema
  df = spark.createDataFrame(parsed_RDD, schema=schema)

  # write parquet files in databricks cluster
  df.write.partitionBy('partition').mode('overwrite').parquet('output/{}/{}'.format(exchange, date))
  return

In [0]:
# call the partition function for each of the 4 inputs files
partition_input_file(exchange='nyse', date='20200805')
partition_input_file(exchange='nyse', date='20200806')
partition_input_file(exchange='nasdaq', date='20200805')
partition_input_file(exchange='nasdaq', date='20200806')

In [0]:
# examples of some of the resulting parquet files
print(os.listdir('/dbfs/output/'))
print(os.listdir('/dbfs/output/nyse'))
print(os.listdir('/dbfs/output/nyse/20200805'))
print(os.listdir('/dbfs/output/nyse/20200805/partition=Q/'))