In [0]:
import sys
module_path = '/dbfs/spark/stocksETL/script/'
if module_path not in sys.path:
  sys.path.insert(0,'/dbfs/spark/stocksETL/script/')

In [0]:
import os
import json
import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType,StructField,DateType,TimestampType,StringType,IntegerType,DecimalType
from pyspark.sql.functions import year, month, dayofmonth,to_timestamp,to_date,split,substring,col,when
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit,split,concat,ceil
from pyspark.sql.window import Window

In [0]:
import jobTracker as track

In [0]:
# Database connection details

dbutils.widgets.text("DB_NAME", "","")
dbName = dbutils.widgets.get("DB_NAME")

dbutils.widgets.text("DB_HOST", "","")
dbHost = dbutils.widgets.get("DB_HOST")

dbutils.widgets.text("DB_USER", "","")
dbUser = dbutils.widgets.get("DB_USER")

dbutils.widgets.text("DB_PWD", "","")
dbPwd = dbutils.widgets.get("DB_PWD")

dbutils.widgets.text("DB_PORT", "","")
dbPort = dbutils.widgets.get("DB_PORT")

dbutils.widgets.text("storage_acct", "","")
storage_acct = dbutils.widgets.get("storage_acct")
 
dbutils.widgets.text("container_name", "","")
container_name = dbutils.widgets.get("container_name")
 
dbutils.widgets.text("blob_key", "","")
blob_key = dbutils.widgets.get("blob_key")
 
dbutils.widgets.text("curr_date", "","")
curr_date = dbutils.widgets.get("curr_date")
 
dbutils.widgets.text("prev_date", "","")
prev_date = dbutils.widgets.get("prev_date")

In [0]:
mountName = "stocksETL"
mounts = [str(i) for i in dbutils.fs.ls('/mnt/')] 
 
if "FileInfo(path='dbfs:/mnt/" + mountName + "/', name='" + mountName + "/', size=0)" in mounts:
  print("mount already created")
else:
  dbutils.fs.mount(
    source = "wasbs://" + container_name + "@" + storage_acct +".blob.core.windows.net",
    mount_point = "/mnt/stocksETL",
    extra_configs = {"fs.azure.account.key." + storage_acct + ".blob.core.windows.net":blob_key})

In [0]:
def main():
    
    try:
 
        output_path = "/mnt/stocksETL/spark/outputfiles/"
 
        jobTrack = track.Tracker("stockETL2")
        jobId = jobTrack.assign_job_id()
        dbConn = jobTrack.get_db_connection(dbName,dbHost,dbUser,dbPwd,dbPort)
      
        # Reading the corrected output for the trade files
 
        trade_path = output_path + 'partition=T'
        trade_common = spark.read.parquet(trade_path)
 
        trade = trade_common.select("trade_dt","symbol","exchange","event_tm","event_seq_nb",\
                                "arrival_tm","bid_pr")
 
        job_sts = "reading corrected output files for trade"
        jobTrack.update_job_status(jobId,job_sts,dbConn)
  
        # Renaming bid_pr to trade_pr
 
        trade = trade.withColumnRenamed("bid_pr","trade_pr")
 
        trade_cnt = trade.groupBy('trade_dt','symbol','exchange',\
            'event_tm','event_seq_nb',"trade_pr").agg(func.max('arrival_tm').alias("max_arrival_dtm"))
 
        #Join and filter the records for latest transaction
 
        trade.createOrReplaceTempView("trade_tbl")
        trade_cnt.createOrReplaceTempView("trade_cnt_tbl")
 
        join_trade_df = spark.sql("""
            select * 
            from trade_tbl t1
            where EXISTS (
                    select 1 from trade_cnt_tbl t2
                    WHERE t1.trade_dt = t2.trade_dt
                        and t1.symbol = t2.symbol
                        and t1.exchange = t2.exchange
                        and t1.event_tm = t2.event_tm
                        and t1.event_seq_nb = t2.event_seq_nb
                        and t1.trade_pr = t2.trade_pr
                        and t1.arrival_tm = t2.max_arrival_dtm
                    )
            """)
 
        join_trade_df.createOrReplaceTempView("join_trade_df")
 
        job_sts = "write corrected output files to blob"
        jobTrack.update_job_status(jobId,job_sts,dbConn)
    
        # Write the corrected data to the blob
 
        trade_dates = spark.sql("""
        select distinct trade_dt from join_trade_df
        """)
 
        trade_output_path = output_path
 
        for t in trade_dates.collect(): 
            trade_date = t.trade_dt
 
            spark_sql_stmt = "select * from join_trade_df where trade_dt ='" + trade_date +"'"
            trade_dt_df = spark.sql(spark_sql_stmt)
 
            trade_dt_df.write.mode("overwrite").\
                parquet(trade_output_path + "trade/trade_dt={}".format(trade_date))
            
        trade_path = output_path + "trade/trade_dt=" + curr_date
 
        df = spark.read.parquet(trade_path)
 
        df.createOrReplaceTempView("tmp_trade_moving_avg")
            
        job_sts = "calculating moving average"
        jobTrack.update_job_status(jobId,job_sts,dbConn)
        
        # Calculating the moving average for last 30 minutes
        mov_avg_df = spark.sql("""select symbol, exchange, trade_dt,event_tm, event_seq_nb, trade_pr,
                            AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                            RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as mov_avg_pr
                            from tmp_trade_moving_avg""")       
 
        # DROP HIVE TABLE IF EXISTS
        spark.sql("DROP TABLE IF EXISTS temp_trade_moving_avg")
 
        # Save the data frame as table in hive
        mov_avg_df.write.saveAsTable("temp_trade_moving_avg")
        
        # Loading the previous date to a dataframe (df)
 
        trade_path = output_path + "trade/trade_dt=" + prev_date
 
        # read the parquet files from the output path
        
        df = spark.read.parquet(trade_path)
 
        df.createOrReplaceTempView("tmp_last_trade")
 
        # Calculating the moving average for last 30 minutes
        last_pr_df = spark.sql("""select symbol, trade_dt,exchange, event_tm, event_seq_nb, trade_pr,
                                AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                                RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as last_pr
                                from tmp_last_trade""")
 
 
        # DROP HIVE TABLE IF EXISTS
        spark.sql("DROP TABLE IF EXISTS temp_last_trade")
 
        # Save the data frame as table in hive
        last_pr_df.write.saveAsTable("temp_last_trade")
        
        job_sts = "Loading quotes data"
        jobTrack.update_job_status(jobId,job_sts,dbConn)
        
        # Load the quotes data
 
        quotes_path = output_path 
 
        quotes = spark.read.parquet(quotes_path + "partition=Q")
 
        quotes.createOrReplaceTempView("quotes")
 
        quote_union = spark.sql("""
                SELECT trade_dt,rec_type,symbol,event_tm,event_seq_nb,exchange,bid_pr,bid_size,ask_pr,
                ask_size,null as trade_pr,null as mov_avg_pr FROM quotes
                UNION
                SELECT trade_dt,"T" as rec_type,symbol,event_tm,event_seq_nb,exchange,null as bid_pr,null as bid_size,null as ask_pr,
                null as ask_size,trade_pr,mov_avg_pr FROM temp_trade_moving_avg
                """)
 
        quote_union.createOrReplaceTempView("quote_union")
 
        quote_union_update = spark.sql("""
                select *,
                last_value(trade_pr,true) OVER(PARTITION BY (symbol) 
                ORDER BY CAST(event_tm AS timestamp) DESC) as last_trade_pr,
                last_value(mov_avg_pr,true) OVER(PARTITION BY (symbol) 
                ORDER BY CAST(event_tm AS timestamp) DESC) as last_mov_avg_pr
                from quote_union
                """)
 
        quote_union_update.createOrReplaceTempView("quote_union_update")
 
        quote_update = spark.sql("""
                select trade_dt, symbol, event_tm, event_seq_nb, exchange,
                bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr
                from quote_union_update
                where rec_type = 'Q'
                """)
 
        quote_update.createOrReplaceTempView("quote_update")
 
        quote_final = spark.sql("""
                select trade_dt, A.symbol, event_tm, event_seq_nb, exchange,
                bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr,
                bid_pr - close_pr as bid_pr_mv, ask_pr - close_pr as ask_pr_mv
                from quote_update A LEFT OUTER JOIN (select distinct symbol,last_pr as close_pr from temp_last_trade) B
                on A.symbol = B.symbol
                """)    
 
        out_path = output_path 
 
        job_sts = "writing quotes data to blob"
        jobTrack.update_job_status(jobId,job_sts,dbConn)
    
        trade_date = curr_date
        quote_final.write.mode("overwrite").parquet( out_path + "quote-trade-analytical/date={}".format(trade_date))        
 
        dbConn.commit()
        dbConn.close()
    
    except Exception as e:
        print(str(e))


In [0]:
if __name__ == "__main__":
    main()

In [0]:
main()