### IMPORT ALL THE NECESSARY SPARK LIBRARIES

In [1]:
import os
import json
import yaml
import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType,StructField,DateType,TimestampType,StringType,IntegerType,DecimalType
from pyspark.sql.functions import year, month, dayofmonth,to_timestamp,to_date,split,substring,col,when
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit,split,concat,ceil
import logging
from pyspark.sql.window import Window

### CREATING A SPARK SESSION

In [2]:
# Creating a spark session
spark = SparkSession.builder.enableHiveSupport().appName("Analytical_ETL_spark").getOrCreate()

### LOADING AND PROCESSING DATA FOR CURRENT DATE 2021-04-18

In [19]:
# Loading the current date (4/18/2020) to a dataframe (df)
df = spark.read.parquet("hdfs://localhost:8020/stock/trade/partition=2021-04-18")

In [20]:
df.createOrReplaceTempView("tmp_trade_moving_avg")

In [21]:
# Calculating the moving average for last 30 minutes
mov_avg_df = spark.sql("""select symbol, exchange, trade_dt,event_tm, event_seq_nb, trade_pr,
                        AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                        RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as mov_avg_pr
                        from tmp_trade_moving_avg""")

In [22]:
# DROP HIVE TABLE IF EXISTS
spark.sql("DROP TABLE IF EXISTS temp_trade_moving_avg")

DataFrame[]

In [23]:
# Save the data frame as table in hive
mov_avg_df.write.saveAsTable("temp_trade_moving_avg")

In [24]:
mov_avg_df.show(5)

+------+--------+----------+--------------------+------------+--------+----------+
|symbol|exchange|  trade_dt|            event_tm|event_seq_nb|trade_pr|mov_avg_pr|
+------+--------+----------+--------------------+------------+--------+----------+
|  SYMA|  NASDAQ|2021-04-18|2021-04-18 10:37:...|          10|      15|      15.0|
|  SYMA|  NASDAQ|2021-04-18|2021-04-18 10:49:...|          10|      15|      15.0|
|  SYMA|  NASDAQ|2021-04-18|2021-04-18 11:56:...|          20|      25|      25.0|
|  SYMA|  NASDAQ|2021-04-18|2021-04-18 12:00:...|          20|      25|      25.0|
|  SYMA|  NASDAQ|2021-04-18|2021-04-18 13:09:...|          30|      35|      35.0|
+------+--------+----------+--------------------+------------+--------+----------+
only showing top 5 rows



### LOADING AND PROCESSING DATA FOR PREVIOUS DATE 2021-04-17

In [25]:
# Loading the previous date (4/17/2020) to a dataframe (df)
df = spark.read.parquet("hdfs://localhost:8020/stock/trade/partition=2021-04-17")

In [26]:
df.createOrReplaceTempView("temp_last_trade")

In [27]:
# Calculating the moving average for last 30 minutes
last_pr_df = spark.sql("""select symbol, trade_dt,exchange, event_tm, event_seq_nb, trade_pr,
                        AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                        RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as last_pr
                        from temp_last_trade""")

In [30]:
# DROP HIVE TABLE IF EXISTS
spark.sql("DROP TABLE IF EXISTS temp_last_trade")

DataFrame[]

In [31]:
# Save the data frame as table in hive
last_pr_df.write.saveAsTable("temp_last_trade")

In [32]:
last_pr_df.show(5)

+------+----------+--------+--------------------+------------+--------+-------+
|symbol|  trade_dt|exchange|            event_tm|event_seq_nb|trade_pr|last_pr|
+------+----------+--------+--------------------+------------+--------+-------+
|  SYMA|2021-04-17|  NASDAQ|2021-04-17 10:37:...|          10|      15|   15.0|
|  SYMA|2021-04-17|  NASDAQ|2021-04-17 10:49:...|          10|      15|   15.0|
|  SYMA|2021-04-17|  NASDAQ|2021-04-17 11:56:...|          20|      25|   25.0|
|  SYMA|2021-04-17|  NASDAQ|2021-04-17 12:00:...|          20|      25|   25.0|
|  SYMA|2021-04-17|  NASDAQ|2021-04-17 13:09:...|          30|      35|   35.0|
+------+----------+--------+--------------------+------------+--------+-------+
only showing top 5 rows



### LOADING QUOTES DATA FOR 2021-04-18

In [33]:
quotes = spark.read.parquet("hdfs://localhost:8020/stock/partition=Q")

In [34]:
quotes.createOrReplaceTempView("quotes")

### UNION QUOTES AND TRADES INTO A SINGLE UNION VIEW

In [35]:
quote_union = spark.sql("""
SELECT trade_dt,rec_type,symbol,event_tm,event_seq_nb,exchange,bid_pr,bid_size,ask_pr,
ask_size,null as trade_pr,null as mov_avg_pr FROM quotes
UNION
SELECT trade_dt,"T" as rec_type,symbol,event_tm,event_seq_nb,exchange,null as bid_pr,null as bid_size,null as ask_pr,
null as ask_size,trade_pr,mov_avg_pr FROM temp_trade_moving_avg
""")

In [36]:
quote_union.createOrReplaceTempView("quote_union")

### POPULATE THE LAST NOT NULL TRADE PRICE AND MOVING AVERAGE PRICE FROM TRADE RECORDS

In [66]:
quote_union_update = spark.sql("""
select *,
last_value(trade_pr,true) OVER(PARTITION BY (symbol) 
ORDER BY CAST(event_tm AS timestamp) DESC) as last_trade_pr,
last_value(mov_avg_pr,true) OVER(PARTITION BY (symbol) 
ORDER BY CAST(event_tm AS timestamp) DESC) as last_mov_avg_pr
from quote_union
""")

In [67]:
quote_union_update.createOrReplaceTempView("quote_union_update")

### FILTER FOR QUOTE RECORDS

In [71]:
quote_update = spark.sql("""
select trade_dt, symbol, event_tm, event_seq_nb, exchange,
bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr
from quote_union_update
where rec_type = 'Q'
""")
quote_update.createOrReplaceTempView("quote_update")

### Join With Table temp_last_trade To Get The Prior Day Close Price

In [75]:
quote_final = spark.sql("""
select trade_dt, A.symbol, event_tm, event_seq_nb, exchange,
bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr,
bid_pr - close_pr as bid_pr_mv, ask_pr - close_pr as ask_pr_mv
from quote_update A LEFT OUTER JOIN (select distinct symbol,last_pr as close_pr from temp_last_trade) B
on A.symbol = B.symbol
""")

In [76]:
quote_final.show(5)

+----------+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+---------+---------+
|  trade_dt|symbol|            event_tm|event_seq_nb|exchange|bid_pr|bid_size|ask_pr|ask_size|last_trade_pr|last_mov_avg_pr|bid_pr_mv|ask_pr_mv|
+----------+------+--------------------+------------+--------+------+--------+------+--------+-------------+---------------+---------+---------+
|2021-04-18|  SYMA|2021-04-18 21:51:...|          99|  NASDAQ|    77|     100|    77|     100|          105|          105.0|     12.0|     12.0|
|2021-04-18|  SYMA|2021-04-18 21:51:...|          99|  NASDAQ|    77|     100|    77|     100|          105|          105.0|     62.0|     62.0|
|2021-04-18|  SYMA|2021-04-18 21:51:...|          99|  NASDAQ|    77|     100|    77|     100|          105|          105.0|     -8.0|     -8.0|
|2021-04-18|  SYMA|2021-04-18 21:51:...|          99|  NASDAQ|    77|     100|    77|     100|          105|          105.0|     4

In [79]:
trade_date = "2021-04-18"
quote_final.write.mode("overwrite").parquet("hdfs://localhost:8020/stock/quote-trade-analytical/date={}".format(trade_date))