In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
raw = sc.textFile("file:///home/alan/spark/guided-capstone/part-00000-092ec1db-39ab-4079-9580-f7c7b516a283-c000.txt")

In [7]:
parsed = raw.map(lambda line: parse_json(line)) 

In [13]:
x = parsed.take(10)
x

# raw.collect()[0]

[['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 09:38:08.093',
  1,
  '2020-08-06 09:30:00.000',
  '',
  78.13370587077013,
  100,
  79.8251633824899,
  100,
  'Q'],
 ['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 09:46:05.163',
  2,
  '2020-08-06 09:30:00.000',
  '',
  76.52304470696788,
  100,
  76.57240785476783,
  100,
  'Q'],
 ['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 09:52:14.798',
  3,
  '2020-08-06 09:30:00.000',
  '',
  78.74535582037817,
  100,
  79.09279949812425,
  100,
  'Q'],
 ['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 09:58:51.806',
  4,
  '2020-08-06 09:30:00.000',
  '',
  75.61362753973539,
  100,
  76.94977735069746,
  100,
  'Q'],
 ['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 10:07:40.796',
  5,
  '2020-08-06 09:30:00.000',
  '',
  77.45084535147971,
  100,
  78.72533317224415,
  100,
  'Q'],
 ['2020-08-06',
  'Q',
  'SYMA',
  'NASDAQ',
  '2020-08-06 10:15:34.939',
  6,
  '2020-08-06 09:30:00.000',
  '

In [36]:
struct1 = StructType([StructField("trade_dt", T.DateType(), False),
                      StructField("rec_type", T.StringType(), False),
                      StructField("symbol",   T.StringType(), False),
                      StructField("exchange", T.StringType(), False),
                      StructField("event_tm", T.TimestampType(), False),
                      StructField("event_seq_nb", T.IntegerType(), False),
                      StructField("arrival_tm",   T.TimestampType(), False),
                      StructField("trade_pr", T.DecimalType(), True),
                      StructField("bid_pr",   T.DecimalType(), False),
                      StructField("bid_size", T.IntegerType(), False),
                      StructField("ask_pr",   T.DecimalType(), False),
                      StructField("ask_size", T.IntegerType(), False),
                      StructField("partition", T.StringType(), False)]
                    )

In [8]:
struct2 = StructType([StructField("trade_dt", T.StringType(), False),
                      StructField("rec_type", T.StringType(), False),
                      StructField("symbol",   T.StringType(), False),
                      StructField("exchange", T.StringType(), False),
                      StructField("event_tm", T.StringType(), False),
                      StructField("event_seq_nb", T.StringType(), False),
                      StructField("arrival_tm",   T.StringType(), False),
                      StructField("trade_pr", T.StringType(), False),
                      StructField("bid_pr",   T.StringType(), False),
                      StructField("bid_size", T.StringType(), False),
                      StructField("ask_pr",   T.StringType(), False),
                      StructField("ask_size", T.StringType(), False),
                      StructField("partition", T.StringType(), False)]
                    )

In [37]:
len(struct1)

13

In [28]:
data = spark.createDataFrame(parsed, schema=struct2)

In [29]:
data.show(4)

+----------+--------+------+--------+--------------------+------------+--------------------+--------+-----------------+--------+-----------------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|          arrival_tm|trade_pr|           bid_pr|bid_size|           ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+--------------------+--------+-----------------+--------+-----------------+--------+---------+
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:38:...|           1|2020-08-06 09:30:...|        |78.13370587077013|     100| 79.8251633824899|     100|        Q|
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:46:...|           2|2020-08-06 09:30:...|        |76.52304470696788|     100|76.57240785476783|     100|        Q|
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:52:...|           3|2020-08-06 09:30:...|        |78.74535582037817|     100|79.09279949812425|     100|        Q|
|202

In [30]:
data = data.withColumn('trade_dt', F.col('trade_dt').cast(T.DateType()))
data = data.withColumn('event_tm', F.col('event_tm').cast(T.TimestampType()))
data = data.withColumn('arrival_tm', F.col('arrival_tm').cast(T.TimestampType()))
data = data.withColumn('event_seq_nb', F.col('event_seq_nb').cast(T.IntegerType()))
data = data.withColumn('bid_size', F.col('bid_size').cast(T.IntegerType()))
data = data.withColumn('ask_size', F.col('ask_size').cast(T.IntegerType()))
data = data.withColumn('trade_pr', F.col('trade_pr').cast(T.DecimalType(10,2)))
data = data.withColumn('bid_pr', F.col('bid_pr').cast(T.DecimalType(10,2)))
data = data.withColumn('ask_pr', F.col('ask_pr').cast(T.DecimalType(10,2)))



In [31]:
data.show(4)

+----------+--------+------+--------+--------------------+------------+-------------------+--------+------+--------+------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm|trade_pr|bid_pr|bid_size|ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+-------------------+--------+------+--------+------+--------+---------+
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:38:...|           1|2020-08-06 09:30:00|    null| 78.13|     100| 79.83|     100|        Q|
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:46:...|           2|2020-08-06 09:30:00|    null| 76.52|     100| 76.57|     100|        Q|
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:52:...|           3|2020-08-06 09:30:00|    null| 78.75|     100| 79.09|     100|        Q|
|2020-08-06|       Q|  SYMA|  NASDAQ|2020-08-06 09:58:...|           4|2020-08-06 09:30:00|    null| 75.61|     100| 76.95|     100|        Q|

In [32]:
data.printSchema()

root
 |-- trade_dt: date (nullable = true)
 |-- rec_type: string (nullable = false)
 |-- symbol: string (nullable = false)
 |-- exchange: string (nullable = false)
 |-- event_tm: timestamp (nullable = true)
 |-- event_seq_nb: integer (nullable = true)
 |-- arrival_tm: timestamp (nullable = true)
 |-- trade_pr: decimal(10,2) (nullable = true)
 |-- bid_pr: decimal(10,2) (nullable = true)
 |-- bid_size: integer (nullable = true)
 |-- ask_pr: decimal(10,2) (nullable = true)
 |-- ask_size: integer (nullable = true)
 |-- partition: string (nullable = false)



In [5]:
import json

def parse_json(line : str):
    record = json.loads(line)
    record_type = record['event_type']
    record_keys = set(record.keys())
    
    def translate(s):
                      
        translations = {'event_type' : 'rec_type', 'file_tm' : 'arrival_tm', 'price' : 'trade_pr'}
        return s if s not in translations else translations[s]
    
    quote_columns = {'trade_dt', 'file_tm', 'event_type', 'symbol', 'event_tm', 'event_seq_nb',
                    'exchange', 'bid_pr', 'bid_size', 'ask_pr', 'ask_size'}
    
    trade_columns = {'trade_dt', 'file_tm', 'event_type', 'symbol', 'event_tm', 'event_seq_nb',
                    'exchange', 'price', 'size', 'execution_id'}
    
    is_valid_record = (record_type == 'Q' and record_keys == quote_columns) or \
                      (record_type == 'T' and record_keys == trade_columns)
   
    if is_valid_record:
       translated_record =  {translate(k):v for k,v in record.items()}  
       translated_record['partition'] = record_type                
       return get_common_event(translated_record)                      
    else:
       translated_record['partition'] = 'B' 
       translated_record['line'] = line              
       return get_common_event(translated_record)
    
    


In [6]:
def get_common_event(record : dict ):
    
    common_data = ('trade_dt', 'rec_type', 'symbol', 'exchange', 'event_tm', 'event_seq_nb','arrival_tm', 
                   'trade_pr',  'bid_pr', 'bid_size', 'ask_pr', 'ask_size', 'partition')
    
    return [record.get(k,"") for k in common_data] 
    
    

In [33]:
data.write.partitionBy('partition').mode('overwrite').parquet('output_dir')