In [73]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-price-movement-predictor').getOrCreate()

In [74]:
import pandas as pd
pd_df = pd.read_csv('mock_stream_out.csv')

#removing columns with a lot of NaNs
pd_df = pd_df.drop(columns=['OBV','Acc/Dist_ROC']).dropna()

pd_df['Change_in_close'] = pd_df['Close'] - pd_df['Close'].shift(1)

def label_change_direction (row):
    if row['Change_in_close'] > 0 :
        return 1
    if row['Change_in_close'] == 0 :
        return 0
    if row['Change_in_close'] < 0 :
        return -1

pd_df['Change_direction'] = pd_df.apply (lambda row: label_change_direction (row), axis=1)

pd_df = pd_df.drop(columns=['Change_in_close'])

In [75]:
numeric_data = pd_df.select_dtypes(include=['int64','float64'])

In [76]:
pd_df = pd_df.drop(columns = ['Unnamed: 0', 'Unix Timestamp', 'Date', 'Symbol'])
cols = pd_df.columns.tolist()

In [77]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = []
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Change_direction', outputCol = 'label')
stages += [label_stringIdx]
numericCols = pd_df.columns.tolist()
numericCols.remove('Change_direction')
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [78]:
df = spark.createDataFrame(pd_df.dropna())

In [79]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- SMA: double (nullable = true)
 |-- EMA: double (nullable = true)
 |-- Momentum: double (nullable = true)
 |-- ROC: double (nullable = true)
 |-- RSI: double (nullable = true)
 |-- BBANDUP: double (nullable = true)
 |-- BBANDLO: double (nullable = true)
 |-- MACD: double (nullable = true)
 |-- MACDsign: double (nullable = true)
 |-- MACDdiff: double (nullable = true)
 |-- Change_direction: double (nullable = true)



In [80]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,2,2,0,2,0
features,"[243.6, 243.6, 243.6, 243.6, 0.0, 243.60000000...","[243.6, 243.6, 243.6, 243.6, 0.0, 243.60000000...","[243.6, 243.75, 243.6, 243.63, 1.0, 243.610000...","[243.63, 243.63, 243.63, 243.63, 0.0, 243.6200...","[243.63, 244.0, 243.63, 244.0, 3.973254282, 24..."
Open,243.6,243.6,243.6,243.63,243.63
High,243.6,243.6,243.75,243.63,244
Low,243.6,243.6,243.6,243.63,243.63
Close,243.6,243.6,243.63,243.63,244
Volume,0,0,1,0,3.97325
SMA,243.6,243.6,243.61,243.62,243.753
EMA,244.27,244.237,244.208,244.18,244.171
Momentum,-0.35,0,0.03,0.03,0.4


In [81]:
train, test = df.randomSplit([0.7, 0.3], seed = 14122019)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 25711
Test Dataset Count: 10879


In [82]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.show(1)

+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|label|            features|  Open| High|   Low|Close|     Volume|               SMA|               EMA|          Momentum|                ROC|            RSI|           BBANDUP|           BBANDLO|               MACD|          MACDsign|            MACDdiff|Change_direction|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|  0.0|[243.63,2

In [83]:
#import matplotlib.pyplot as plt
#import numpy as np
#lrModel.coefficientMatrix

In [84]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.6985017005239452


In [89]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.show(1)
predictionAndLabels = predictions.select("prediction", "label")

+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|label|            features|  Open| High|   Low|Close|     Volume|               SMA|               EMA|          Momentum|                ROC|            RSI|           BBANDUP|           BBANDLO|               MACD|          MACDsign|            MACDdiff|Change_direction|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|  0.0|[243.63,2

In [90]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.5790054232925821


In [91]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.show(1)
predictionAndLabels = predictions.select("prediction", "label")

+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|label|            features|  Open| High|   Low|Close|     Volume|               SMA|               EMA|          Momentum|                ROC|            RSI|           BBANDUP|           BBANDLO|               MACD|          MACDsign|            MACDdiff|Change_direction|       rawPrediction|         probability|prediction|
+-----+--------------------+------+-----+------+-----+-----------+------------------+------------------+------------------+-------------------+---------------+------------------+------------------+-------------------+------------------+--------------------+----------------+--------------------+--------------------+----------+
|  0.0|[243.63,2

In [92]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.5957349021049729


In [72]:
#Won't work on our dataset as GBT only supports binary classification yet
#from pyspark.ml.classification import GBTClassifier
#gbt = GBTClassifier(maxIter=10)
#gbtModel = gbt.fit(train)
#predictions = gbtModel.transform(test)
#predictions.show(1)

In [93]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [len(assemblerInputs), 5, 5, 5, 5, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
predictions = model.transform(test)
predictionAndLabels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))


Test set accuracy = 0.702178509054141
