In this file, we taken in a .parquet from dataRetrieval, and prepare it for MLFLow. The MLFlow notebook requires that the data be edited to include some special feautures, such as integers in stead of ticker symbols, and a label column. Although MLFlow can be edited to accomodate whatever type of data you want, in this use case we choose to give the log% change of the last five minutes, along with the volume of the last five minutes. The output is a .csv file

In [1]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.functions import col, signum

spark = SparkSession.builder.master("local[1]").getOrCreate()


In [3]:
#Need to carry over from day to day
#Dollar bars
df=spark.read.parquet("A_Candlestick_part.parquet")#This only contains some of the A_Candlestick.parquet data since the original is large.
#df=df.withColumn("Ticker",f.lit("SPY")). This is necessary if we are using SPY_Candlestick.parquet
df.show(5)
print(df.count())

+------+------+------+------+------+------+----------+-----------------+
|Ticker|  Open|  High|   Low| Close|Volume|      Time|__index_level_0__|
+------+------+------+------+------+------+----------+-----------------+
|    AG| 10.38| 10.38| 10.38| 10.38| 140.0|1566513600|         18051935|
|  AVYA|14.715| 14.75|14.705|14.705|1553.0|1591037040|          2235656|
|   ALC|51.215|51.215| 51.18| 51.21|1656.0|1586990160|         15178518|
|   AXL| 10.02| 10.02| 10.02| 10.02| 219.0|1575679980|          1538479|
|  AQUA|  18.8| 18.81|  18.8| 18.81|1191.0|1575936780|          8339089|
+------+------+------+------+------+------+----------+-----------------+
only showing top 5 rows

24393385


After reading in the data, we wish to take data points via dollar bars as opposed to every minute. In this case, a data point for every 1M dollars exchanged per stock. See "Advances in Financial Machine Learning" by Marcos Prado for more info on why this decision was made.

In [4]:
import sys
df=df.withColumn('DolExch',col('Close')*col('Volume'))
df.select("DolExch").distinct().show(5)
df=df.withColumn("CumDolExch",f.sum("DolExch").over(Window.partitionBy('Ticker').orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show()
df.select("CumDolExch").distinct().show(5)
df=df.withColumn('DolBars', col('CumDolExch')%(1E6))#Take data pt every 1M dollars exchanged per stock
df=df.withColumn('Mark',f.when(df.DolBars<=df.DolExch,1).otherwise(0))
df.select("Mark").distinct().show()

+------------------+
|           DolExch|
+------------------+
|520016.95999999996|
|         672760.36|
|             934.0|
|         211842.54|
|           32957.4|
+------------------+
only showing top 5 rows

+------+-------+-------+------+--------+-------+----------+-----------------+------------------+--------------------+
|Ticker|   Open|   High|   Low|   Close| Volume|      Time|__index_level_0__|           DolExch|          CumDolExch|
+------+-------+-------+------+--------+-------+----------+-----------------+------------------+--------------------+
|  ALXN| 124.79| 124.79| 124.5|   124.5|  300.0|1562693340|         13422685|           37350.0|             37350.0|
|  ALXN|  124.5|  124.5|123.34|  123.68|31059.0|1562693400|         13422686|        3841377.12|          3878727.12|
|  ALXN| 123.49|124.024|123.49|  123.92| 4011.0|1562693460|         13422687|         497043.12|          4375770.24|
|  ALXN| 123.91| 123.91|123.55| 123.695| 5511.0|1562693520|         13422688| 

In [5]:
df=df.withColumn('CumMark',f.sum("Mark").over(Window.partitionBy('Ticker').orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show(20)
df.select("CumMark").distinct().show(10)

+------+-------+-------+------+--------+-------+----------+-----------------+------------------+--------------------+------------------+----+-------+
|Ticker|   Open|   High|   Low|   Close| Volume|      Time|__index_level_0__|           DolExch|          CumDolExch|           DolBars|Mark|CumMark|
+------+-------+-------+------+--------+-------+----------+-----------------+------------------+--------------------+------------------+----+-------+
|  ALXN| 124.79| 124.79| 124.5|   124.5|  300.0|1562693340|         13422685|           37350.0|             37350.0|           37350.0|   1|      1|
|  ALXN|  124.5|  124.5|123.34|  123.68|31059.0|1562693400|         13422686|        3841377.12|          3878727.12| 878727.1200000001|   1|      2|
|  ALXN| 123.49|124.024|123.49|  123.92| 4011.0|1562693460|         13422687|         497043.12|          4375770.24| 375770.2400000002|   1|      3|
|  ALXN| 123.91| 123.91|123.55| 123.695| 5511.0|1562693520|         13422688|        681683.145|    

In [6]:
#Next block re-accumulates the values with the new DolBars
df=df.withColumn("Volume", f.sum("Volume").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
df=df.withColumn("Low",f.min("Low").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show()
print(df.head())
df=df.withColumn("Open",f.first("Open").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
print(df.head())
df=df.withColumn("Close",f.last("Close").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
df=df.filter(df.Mark==1)

#Add a next_val column, and delete unnecessary columns
df=df.withColumn("next_val", f.lead(col("Close"),1).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
print(df.head())
df=df.select("Ticker","Time","Close","Volume","next_val")
df.head();

+------+--------+-------+-----+-------+--------+----------+-----------------+------------------+--------------------+------------------+----+-------+
|Ticker|    Open|   High|  Low|  Close|  Volume|      Time|__index_level_0__|           DolExch|          CumDolExch|           DolBars|Mark|CumMark|
+------+--------+-------+-----+-------+--------+----------+-----------------+------------------+--------------------+------------------+----+-------+
|  ACIA|   65.81|  65.87| 65.8|  65.87|  3152.0|1562691360|         21904876|207622.24000000002|       3.200109169E7| 1091.690000001341|   1|     26|
|  AAPL| 199.975| 200.29| 65.8| 200.27|237043.0|1562693760|         24137816|     4.684135057E7|4.2663912367859995E8| 639123.6785999537|   1|     26|
|  AMZN| 1956.16|1956.75| 65.8|1953.94|247030.0|1562694360|         11009143|     1.951399878E7|    7.983673328832E8| 367332.8832000494|   1|     26|
|  ABBV|   71.37|  71.37| 65.8|  71.35|261567.0|1562694840|         23549518|        1037214.95|    

In [7]:
#Convert Ticker value to int to include in ML algo, dictionary present (tick_dict) to convert back if needed
tickers=df.select("Ticker").distinct().rdd.flatMap(lambda x: x).collect()

tick_dict = {val : str(idx + 1) for idx, val in enumerate(tickers)} 
print(tick_dict)

from pyspark.sql.types import IntegerType
df=df.replace(to_replace=tick_dict, subset=['Ticker'])

df.printSchema()
df=df.withColumn("Ticker",col("Ticker").cast(IntegerType()))
df.printSchema()

{'ALXN': '1', 'AWAY': '2', 'AAT': '3', 'ABMD': '4', 'AEFC': '5', 'AESR': '6', 'AIV': '7', 'APM': '8', 'ARYAW': '9', 'AVY': '10', 'ARAY': '11', 'AMTX': '12', 'ARL': '13', 'AXP': '14', 'ACCO': '15', 'ARTW': '16', 'ADIL': '17', 'AVNW': '18', 'AVUV': '19', 'ALDX': '20', 'APYX': '21', 'ASH': '22', 'ARGT': '23', 'ATCX': '24', 'ACSI': '25', 'AHCO': '26', 'AMRHW': '27', 'AMRK': '28', 'AVEO': '29', 'AVXL': '30', 'AXU': '31', 'AEB': '32', 'ALE': '33', 'ALSN': '34', 'ASYS': '35', 'AVDL': '36', 'ACH': '37', 'AGIO': '38', 'ARA': '39', 'AQMS': '40', 'ASET': '41', 'ALG': '42', 'ASHX': '43', 'AVAL': '44', 'ALKS': '45', 'ACWX': '46', 'AEY': '47', 'AM': '48', 'AOM': '49', 'ARCT': '50', 'AA': '51', 'ACHC': '52', 'AMRH': '53', 'ASIX': '54', 'AAPL': '55', 'AMPH': '56', 'ANDE': '57', 'AY': '58', 'ADNT': '59', 'ALL': '60', 'AOSL': '61', 'ADVM': '62', 'AMRX': '63', 'ACGLO': '64', 'ADI': '65', 'AFGC': '66', 'ARB': '67', 'AZZ': '68', 'AGBA': '69', 'AGNC': '70', 'ALTR': '71', 'AGMH': '72', 'APDN': '73', 'AEF': '

root
 |-- Ticker: string (nullable = true)
 |-- Time: long (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- next_val: double (nullable = true)

root
 |-- Ticker: integer (nullable = true)
 |-- Time: long (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- next_val: double (nullable = true)



In [8]:
from pyspark.ml.feature import VectorAssembler
#Replace close with logChange. Give it the previous 5 values to make decision off of.
df=df.withColumn('logChange',f.log(col("next_val")/col("Close")))
df=df.select("Ticker","Time","Volume","logChange")
df=df.withColumn("label", f.lead(col("logChange"),1).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prev1',f.lag(col('logChange'),1).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prev2',f.lag(col('logChange'),2).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prev3',f.lag(col('logChange'),3).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prev4',f.lag(col('logChange'),4).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prevVol1',f.lag(col('Volume'),1).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prevVol2',f.lag(col('Volume'),2).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prevVol3',f.lag(col('Volume'),3).over(Window.partitionBy('Ticker').orderBy(df["Time"])))
df=df.withColumn('prevVol4',f.lag(col('Volume'),4).over(Window.partitionBy('Ticker').orderBy(df["Time"])))

df=df.na.drop()
df.show(10)
print(df.count())

+------+----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+-----------+
|Ticker|      Time|     Volume|           logChange|               label|               prev1|               prev2|               prev3|               prev4|   prevVol1|   prevVol2|   prevVol3|   prevVol4|
+------+----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+-----------+
|   148|1572991200|7.6665204E7|-0.01358562930309...|-0.04514684436130...| 0.07589209394722322|-0.03962743516439...| 0.06636637081562258|-0.05175044673140...|7.4975236E7|7.1581359E7|7.2044253E7|7.4975462E7|
|   148|1573157340|7.1343068E7|-0.04514684436130...|-0.02488166637673666|-0.01358562930309...| 0.07589209394722322|-0.03962743516439...| 0.06636637081562258|7.6665204E7|7.49752

In [1]:
#Time to export! 
df.to_csv("A_Candlestick_Cleaned.csv")