In [21]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.functions import col, signum

spark = SparkSession.builder.master("local[1]").getOrCreate()


In [22]:
#Need to carry over from day to day
#Dollar bars
df=spark.read.parquet("SPYcandlestick.parquet")
df=df.withColumn("Ticker",f.lit("SPY"))
df.show(5)
print(df.count())

+------+--------+-------+------+--------+----------+------+------+
|  Open|    High|    Low| Close|  Volume|      Time|Status|Ticker|
+------+--------+-------+------+--------+----------+------+------+
|326.65|  326.67| 326.63|326.63|791327.0|1578622260|    ok|   SPY|
|295.99|  295.99| 295.99|295.99|   780.0|1571102520|    ok|   SPY|
|306.67|306.6735|306.355|306.45|352821.0|1591139280|    ok|   SPY|
|310.05|  310.09| 310.01|310.02| 56166.0|1574295780|    ok|   SPY|
|308.88|  308.93| 308.87|308.91| 71974.0|1575415920|    ok|   SPY|
+------+--------+-------+------+--------+----------+------+------+
only showing top 5 rows

200255


In [23]:
import sys
df=df.withColumn('DolExch',col('Close')*col('Volume'))
df.select("DolExch").distinct().show(5)
df=df.withColumn("CumDolExch",f.sum("DolExch").over(Window.partitionBy('Ticker').orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show()
df.select("CumDolExch").distinct().show(5)
df=df.withColumn('DolBars', col('CumDolExch')%(1E7))#Take data pt every 10M dollars exchanged per stock
df=df.withColumn('Mark',f.when(df.DolBars<=df.DolExch,1).otherwise(0))
df.select("Mark").distinct().show()

+-------------+
|      DolExch|
+-------------+
|    526530.09|
|1.232265061E7|
| 6.16455684E7|
|4.233734953E7|
|3.878613142E7|
+-------------+
only showing top 5 rows

+------+------+------+------+-------+----------+------+------+------------------+------------------+
|  Open|  High|   Low| Close| Volume|      Time|Status|Ticker|           DolExch|        CumDolExch|
+------+------+------+------+-------+----------+------+------+------------------+------------------+
|297.53|297.53|297.53|297.53|  100.0|1562167500|    ok|   SPY|29752.999999999996|29752.999999999996|
| 297.5|297.56| 297.5|297.56| 1500.0|1562167620|    ok|   SPY|          446340.0|          476093.0|
|297.51|297.51|297.51|297.51|  100.0|1562167680|    ok|   SPY|           29751.0|          505844.0|
| 297.5| 297.5|297.47| 297.5| 1438.0|1562167740|    ok|   SPY|          427805.0|          933649.0|
| 297.5|297.51|297.47|297.49| 1634.0|1562167800|    ok|   SPY|486098.66000000003|1419747.6600000001|
|297.46|297.47|297.46|2

In [24]:
df=df.withColumn('CumMark',f.sum("Mark").over(Window.partitionBy('Ticker').orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show(20)
df.select("CumMark").distinct().show(10)

+------+------+------+------+-------+----------+------+------+------------------+------------------+------------------+----+-------+
|  Open|  High|   Low| Close| Volume|      Time|Status|Ticker|           DolExch|        CumDolExch|           DolBars|Mark|CumMark|
+------+------+------+------+-------+----------+------+------+------------------+------------------+------------------+----+-------+
|297.53|297.53|297.53|297.53|  100.0|1562167500|    ok|   SPY|29752.999999999996|29752.999999999996|29752.999999999996|   1|      1|
| 297.5|297.56| 297.5|297.56| 1500.0|1562167620|    ok|   SPY|          446340.0|          476093.0|          476093.0|   0|      1|
|297.51|297.51|297.51|297.51|  100.0|1562167680|    ok|   SPY|           29751.0|          505844.0|          505844.0|   0|      1|
| 297.5| 297.5|297.47| 297.5| 1438.0|1562167740|    ok|   SPY|          427805.0|          933649.0|          933649.0|   0|      1|
| 297.5|297.51|297.47|297.49| 1634.0|1562167800|    ok|   SPY|486098.

In [25]:
df=df.withColumn("Volume", f.sum("Volume").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))

df=df.withColumn("Low",f.min("Low").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
df.show()
print(df.head())
df=df.withColumn("Open",f.first("Open").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
print(df.head())
df=df.withColumn("Close",f.last("Close").over(Window.partitionBy("CumMark").orderBy("Time").rowsBetween(-sys.maxsize,0)))
df=df.filter(df.Mark==1)

#Add target vector(2 for +change, 1 for no change 0 for -change)
df=df.withColumn("next_val", f.lead(col("Close"),1).over(Window.partitionBy('Ticker').orderBy(df["Time"])))#Need to add partitionBy()
df=df.withColumn("label", (col("next_val")-col("Close"))/col("Close"))
print(df.head())
df=df.select("Ticker","Time","Close","Volume","label")#.withColumn("Target", 100*(col("next_val")-col("Close"))/col("Close"))
#df=df.drop("next_val").withColumn("Target",1+signum(col("Target")))
df.head()

+--------+--------+--------+--------+--------+----------+------+------+--------------------+--------------------+------------------+----+-------+
|    Open|    High|     Low|   Close|  Volume|      Time|Status|Ticker|             DolExch|          CumDolExch|           DolBars|Mark|CumMark|
+--------+--------+--------+--------+--------+----------+------+------+--------------------+--------------------+------------------+----+-------+
|  297.24|  297.32|  297.23|  297.27|131122.0|1562175840|    ok|   SPY|       3.897863694E7|    9.201479921487E8|147992.14869999886|   1|     26|
|  297.34|  297.43|  297.33| 297.415| 86366.0|1562176020|    ok|   SPY|       2.568654389E7|    9.941301698587E8| 4130169.858700037|   1|     29|
|  297.79| 297.795|  297.74|297.7515|146296.0|1562360220|    ok|   SPY|4.3559853444000006E7|1.701835819697521...| 8358196.975212097|   1|    474|
|  296.38|  296.43|  296.35|  296.43| 79853.0|1562624820|    ok|   SPY|       2.367082479E7|3.144494718092561...| 4947180.92

Row(Ticker='SPY', Time=1562167500, Close=297.53, Volume=100.0, label=0.0)

In [26]:
#Convert Ticker value to int to include in ML algo, dictionary present (tick_dict) to convert back if needed
tickers=df.select("Ticker").distinct().rdd.flatMap(lambda x: x).collect()

tick_dict = {val : str(idx + 1) for idx, val in enumerate(tickers)} 
print(tick_dict)

from pyspark.sql.types import IntegerType
df=df.replace(to_replace=tick_dict, subset=['Ticker'])

df.printSchema()
df=df.withColumn("Ticker",col("Ticker").cast(IntegerType()))
df.printSchema()

{'SPY': '1'}
root
 |-- Ticker: string (nullable = false)
 |-- Time: long (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- label: double (nullable = true)

root
 |-- Ticker: integer (nullable = true)
 |-- Time: long (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- label: double (nullable = true)



In [27]:
from pyspark.ml.feature import VectorAssembler
#Create input features
df_cols=df.columns
df_cols=[ elem for elem in df_cols if elem not in ["Time","label"]]
#Move features to a single vector
assembler=VectorAssembler(inputCols=df_cols,outputCol="features")
df=assembler.transform(df)
df=df.na.drop()
df=df.dropDuplicates()
print(df.count())
df.show(10)

118183
+------+----------+-------+--------+--------------------+--------------------+
|Ticker|      Time|  Close|  Volume|               label|            features|
+------+----------+-------+--------+--------------------+--------------------+
|     1|1562344620| 297.97| 14200.0|-3.35604255462035...|[1.0,297.97,14200.0]|
|     1|1562365800| 298.14| 53824.0|2.180183806265436...|[1.0,298.14,53824.0]|
|     1|1562696880| 296.26| 47518.0|3.037872139338143E-4|[1.0,296.26,47518.0]|
|     1|1562721000| 297.08| 57480.0|1.346438669719283...|[1.0,297.08,57480.0]|
|     1|1562781000| 299.34|238514.0|8.351707088940372E-5|[1.0,299.34,23851...|
|     1|1562870520| 298.73| 83874.0|-3.34750443544413...|[1.0,298.73,83874.0]|
|     1|1562961660| 299.97| 28375.0|1.000100010000090...|[1.0,299.97,28375.0]|
|     1|1562970600| 300.18|193117.0|1.332533813046187...|[1.0,300.18,19311...|
|     1|1563299640| 300.65| 78073.0|1.663063362714497...|[1.0,300.65,78073.0]|
|     1|1563395400|298.895| 43447.0|-8.364141

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(df)

# Split the data into training and test sets (last 20% held out for testing)
df=df.sort("Time")
trainingData=df.limit(int(df.count()*0.8))
maxTime=trainingData.agg({'Time':'max'}).collect()[0]
maxTime=maxTime["max(Time)"]
testData=df.filter(df.Time>maxTime)
testData.show(10)


# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [3, 10, 20]) \
                              .addGrid(rf.numTrees, [5, 10]) \
                              .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Train model.  This also runs the indexer.
model = crossval.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)




+------+----------+--------+--------+--------------------+--------------------+
|Ticker|      Time|   Close|  Volume|               label|            features|
+------+----------+--------+--------+--------------------+--------------------+
|     1|1587156060|   284.2|105987.0|1.055594651654805...|[1.0,284.2,105987.0]|
|     1|1587156120|  284.23|124138.0|3.518277451356614E-5|[1.0,284.23,12413...|
|     1|1587156180|  284.24|123893.0|-2.46270757106646...|[1.0,284.24,12389...|
|     1|1587156240|  284.17|136113.0|-3.16711827427356...|[1.0,284.17,13611...|
|     1|1587156300|  284.08|138193.0|-7.04027034637489...|[1.0,284.08,13819...|
|     1|1587156360|  284.06|129099.0|2.464268112370386E-4|[1.0,284.06,12909...|
|     1|1587156420|  284.13|111978.0|4.927322000492252E-4|[1.0,284.13,11197...|
|     1|1587156480|  284.27|155816.0|0.001019805114855...|[1.0,284.27,15581...|
|     1|1587156540|284.5599|229185.0|-3.51068439369172...|[1.0,284.5599,229...|
|     1|1587156600|  284.46|107102.0|-1.

In [None]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R Squared on test data = %g" % r2)

#rfModel = model.stages[1]
#print(rfModel)  # summary only


In [None]:
#r2=0.000213893 for SPY_candlestick random split 80/20%
#r2=-0.000156503 for SPY_candlestick with first 80% being train data, last 20% is test
#r2=9.93879e-05 for "" with first 80% being train data, use 5-fold CV

In [None]:
#RMSE test> RMSE train -> likely overfitting the data

In [None]:
ranFor=RandomForestRegressor(labelCol="label", featuresCol="features")
modelRf=ranFor.fit(trainingData)
print(modelRf.featureImportances) #[Ticker, Close, Volume]


In [None]:
predictions=predictions.repartition(100)
predictions.write.parquet('SPYpred.parquet')

In [21]:
print(dir(rf))

['__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__metaclass__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slotnames__', '__str__', '__subclasshook__', '__weakref__', '_call_java', '_copyValues', '_copy_params', '_create_from_java_class', '_create_model', '_create_params_from_java', '_defaultParamMap', '_dummy', '_empty_java_param_map', '_fit', '_fit_java', '_from_java', '_input_kwargs', '_java_obj', '_make_java_param_pair', '_new_java_array', '_new_java_obj', '_paramMap', '_params', '_randomUID', '_resetUid', '_resolveParam', '_set', '_setDefault', '_shouldOwn', '_to_java', '_transfer_param_map_from_java', '_transfer_param_map_to_java', '_transfer_params_from_java', '_transfer_params_to_java', 'bootstrap', 'cacheNodeIds', 'checkpointInterval', 'clear', 'copy', 'expl

In [None]:
print(df.agg({"Time":'max'}).collect()[0])

In [None]:
int(df.count()*0.8)

In [None]:
df.count()

In [None]:
testData.count()

In [None]:
predictions.filter(predictions.prediction<0).count()

In [25]:
predictions.show(10)

+------+----------+--------+--------+--------------------+--------------------+--------------------+--------------------+
|Ticker|      Time|   Close|  Volume|               label|            features|     indexedFeatures|          prediction|
+------+----------+--------+--------+--------------------+--------------------+--------------------+--------------------+
|     1|1587156060|   284.2|105987.0|1.055594651654805...|[1.0,284.2,105987.0]|[0.0,284.2,105987.0]|-2.83357959245934...|
|     1|1587156120|  284.23|124138.0|3.518277451356614E-5|[1.0,284.23,12413...|[0.0,284.23,12413...|-2.83357959245934...|
|     1|1587156180|  284.24|123893.0|-2.46270757106646...|[1.0,284.24,12389...|[0.0,284.24,12389...|-2.83357959245934...|
|     1|1587156240|  284.17|136113.0|-3.16711827427356...|[1.0,284.17,13611...|[0.0,284.17,13611...|-2.83357959245934...|
|     1|1587156300|  284.08|138193.0|-7.04027034637489...|[1.0,284.08,13819...|[0.0,284.08,13819...|-2.83357959245934...|
|     1|1587156360|  284

In [38]:
pdDf=df.toPandas()

In [None]:
#IDEA: Add length of bar as a feature

In [39]:
#USING MLFLOW library instead
from timeseriescv import cross_validation as cv
crossval=cv.BaseTimeSeriesCrossValidator(10)
crossval=cv.BaseTimeSeriesCrossValidator.split(crossval,pdDf)
print(crossval)

ValueError: pred_times should be a pandas Series.

In [28]:
df.show(5)

+------+----------+------+--------+--------------------+--------------------+
|Ticker|      Time| Close|  Volume|               label|            features|
+------+----------+------+--------+--------------------+--------------------+
|     1|1562344620|297.97| 14200.0|-3.35604255462035...|[1.0,297.97,14200.0]|
|     1|1562365800|298.14| 53824.0|2.180183806265436...|[1.0,298.14,53824.0]|
|     1|1562696880|296.26| 47518.0|3.037872139338143E-4|[1.0,296.26,47518.0]|
|     1|1562721000|297.08| 57480.0|1.346438669719283...|[1.0,297.08,57480.0]|
|     1|1562781000|299.34|238514.0|8.351707088940372E-5|[1.0,299.34,23851...|
+------+----------+------+--------+--------------------+--------------------+
only showing top 5 rows

