In [255]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [256]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [257]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [258]:
%matplotlib inline

In [259]:
import pyspark
from pyspark.sql import SparkSession

In [260]:
spark = SparkSession.builder.master("local[1]").appName("StockPrediction").getOrCreate()

In [261]:
df = spark.read.csv("etfs/SPY.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [262]:
df = df.withColumnRenamed("_c0","Date").withColumnRenamed("_c1","Open").withColumnRenamed("_c2","High").withColumnRenamed("_c3","Low").withColumnRenamed("_c4","Close").withColumnRenamed("_c5","Adj Close").withColumnRenamed("_c6","Volume")
df = df.where(df.Date != "Date")
df.show(5)

+----------+--------+--------+--------+--------+------------------+-------+
|      Date|    Open|    High|     Low|   Close|         Adj Close| Volume|
+----------+--------+--------+--------+--------+------------------+-------+
|1993-01-29|43.96875|43.96875|   43.75| 43.9375|26.299287796020508|1003200|
|1993-02-01|43.96875|   44.25|43.96875|   44.25|26.486324310302734| 480500|
|1993-02-02|44.21875|  44.375|  44.125|44.34375|26.542448043823242| 201300|
|1993-02-03|44.40625|44.84375|  44.375| 44.8125|   26.822998046875| 529400|
|1993-02-04|44.96875|45.09375|44.46875|    45.0|26.935239791870117| 531500|
+----------+--------+--------+--------+--------+------------------+-------+
only showing top 5 rows



In [263]:
days = lambda i: i * 86400
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(to_timestamp(F.col("Date")).cast('long')).rangeBetween(-days(7), 0))

w2 = Window.partitionBy().orderBy("date")

#create Features

df = df.withColumn('rolling_average', F.avg("Close").over(w))
df = df.withColumn('is_higher_than_rollavg', F.when(F.lag(df["Close"]).over(w2) < F.col("Close"), 'yep').otherwise('nah'))
df = df.withColumn('diffOpenClose', df["Open"] - df["Close"])
df = df.withColumn('diffHighLow', df["High"] - df["Low"])

#df = df.withColumn('Open', F.col("Open").cast('float'))
#df = df.withColumn('High', F.col("High").cast('float'))
#df = df.withColumn('Low', F.col("Low").cast('float'))
#df = df.withColumn('Close', F.col("Close").cast('float'))

In [264]:
df.show(10)

+----------+--------+--------+--------+--------+------------------+-------+------------------+----------------------+-------------+-----------+
|      Date|    Open|    High|     Low|   Close|         Adj Close| Volume|   rolling_average|is_higher_than_rollavg|diffOpenClose|diffHighLow|
+----------+--------+--------+--------+--------+------------------+-------+------------------+----------------------+-------------+-----------+
|1993-01-29|43.96875|43.96875|   43.75| 43.9375|26.299287796020508|1003200|           43.9375|                   nah|      0.03125|    0.21875|
|1993-02-01|43.96875|   44.25|43.96875|   44.25|26.486324310302734| 480500|          44.09375|                   yep|     -0.28125|    0.28125|
|1993-02-02|44.21875|  44.375|  44.125|44.34375|26.542448043823242| 201300|44.177083333333336|                   yep|       -0.125|       0.25|
|1993-02-03|44.40625|44.84375|  44.375| 44.8125|   26.822998046875| 529400|        44.3359375|                   yep|     -0.40625|    0

In [265]:
def linearRegression3features(df):
    df = df.withColumn('Open', F.col("Open").cast('float'))
    df = df.withColumn('High', F.col("High").cast('float'))
    df = df.withColumn('Low', F.col("Low").cast('float'))
    df = df.withColumn('Close', F.col("Close").cast('float'))
    df = df.withColumn('diffOpenClose', F.col("diffOpenClose").cast('float'))
    df = df.withColumn('diffHighLow', F.col("diffHighLow").cast('float'))
    df = df.withColumn('rolling_average', F.col("rolling_average").cast('float'))
    df = df.drop("is_higher_than_rollavg")

    assembler = VectorAssembler(inputCols=["rolling_average","diffOpenClose","diffHighLow"], outputCol="features")
    vecTrainDF = assembler.transform(df)
    vecTrainDF.select("rolling_average","diffOpenClose","diffHighLow", "features", "Close").show(10)
    pipeline = Pipeline(stages=[assembler])
    pipelineModel = pipeline.fit(df)
    res = pipelineModel.transform(df)

    (trainingData, testData) = res.randomSplit([0.8, 0.2])

    lr = LinearRegression(featuresCol="features", labelCol="Close")
    lrModel = lr.fit(trainingData)
    predictions = lrModel.transform(testData)

    evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    predictions = predictions.withColumn("Error", F.col("Close") - F.col("prediction"))
    predictions.show(10)
    return rmse, predictions

linear3_score = linearRegression3features(df)

+---------------+-------------+-----------+--------------------+--------+
|rolling_average|diffOpenClose|diffHighLow|            features|   Close|
+---------------+-------------+-----------+--------------------+--------+
|        43.9375|      0.03125|    0.21875|[43.9375,0.03125,...| 43.9375|
|       44.09375|     -0.28125|    0.28125|[44.09375,-0.2812...|   44.25|
|      44.177082|       -0.125|       0.25|[44.1770820617675...|44.34375|
|      44.335938|     -0.40625|    0.46875|[44.3359375,-0.40...| 44.8125|
|       44.46875|     -0.03125|      0.625|[44.46875,-0.0312...|    45.0|
|      44.552082|          0.0|    0.34375|[44.5520820617675...|44.96875|
|      44.723957|          0.0|    0.21875|[44.7239570617675...|44.96875|
|      44.791668|      0.15625|       0.25|[44.7916679382324...|44.65625|
|      44.854168|      -0.0625|    0.21875|[44.8541679382324...|44.71875|
|         44.875|     -0.15625|    0.34375|[44.875,-0.15625,...| 44.9375|
+---------------+-------------+-------

In [266]:
def linearRegressionAlone(df):
    df = df.withColumn('Open', F.col("Open").cast('float'))
    df = df.withColumn('High', F.col("High").cast('float'))
    df = df.withColumn('Low', F.col("Low").cast('float'))
    df = df.withColumn('Close', F.col("Close").cast('float'))
    df = df.withColumn('diffOpenClose', F.col("diffOpenClose").cast('float'))
    df = df.withColumn('diffHighLow', F.col("diffHighLow").cast('float'))
    df = df.withColumn('rolling_average', F.col("rolling_average").cast('float'))
    df = df.drop("is_higher_than_rollavg")

    assembler = VectorAssembler(inputCols=["rolling_average"], outputCol="features")
    vecTrainDF = assembler.transform(df)
    vecTrainDF.select("rolling_average", "features", "Close").show(10)
    pipeline = Pipeline(stages=[assembler])
    pipelineModel = pipeline.fit(df)
    res = pipelineModel.transform(df)

    (trainingData, testData) = res.randomSplit([0.8, 0.2])

    lr = LinearRegression(featuresCol="features", labelCol="Close")
    lrModel = lr.fit(trainingData)
    predictions = lrModel.transform(testData)

    evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    predictions = predictions.withColumn("Error", F.col("Close") - F.col("prediction"))
    predictions.drop("Open").drop("High").drop("Low").drop("Adj Close").drop("features").show(10)
    return rmse, predictions

linear1_score = linearRegressionAlone(df)

+---------------+-------------------+--------+
|rolling_average|           features|   Close|
+---------------+-------------------+--------+
|        43.9375|          [43.9375]| 43.9375|
|       44.09375|         [44.09375]|   44.25|
|      44.177082|[44.17708206176758]|44.34375|
|      44.335938|       [44.3359375]| 44.8125|
|       44.46875|         [44.46875]|    45.0|
|      44.552082|[44.55208206176758]|44.96875|
|      44.723957|[44.72395706176758]|44.96875|
|      44.791668|[44.79166793823242]|44.65625|
|      44.854168|[44.85416793823242]|44.71875|
|         44.875|           [44.875]| 44.9375|
+---------------+-------------------+--------+
only showing top 10 rows

Root Mean Squared Error (RMSE) on test data = 2.20525
+----------+--------+-------+---------------+-------------+-----------+-----------------+--------------------+
|      Date|   Close| Volume|rolling_average|diffOpenClose|diffHighLow|       prediction|               Error|
+----------+--------+-------+-----------

In [268]:
def DecisionTreeRegressorLessCategorical(df):
    df = df.withColumn('Open', F.col("Open").cast('float'))
    df = df.withColumn('High', F.col("High").cast('float'))
    df = df.withColumn('Low', F.col("Low").cast('float'))
    df = df.withColumn('Close', F.col("Close").cast('float'))
    df = df.withColumn('diffOpenClose', F.col("diffOpenClose").cast('float'))
    df = df.withColumn('diffHighLow', F.col("diffHighLow").cast('float'))
    df = df.withColumn('rolling_average', F.col("rolling_average").cast('float'))
    stages = []

    label_stringIdx = StringIndexer(inputCol='is_higher_than_rollavg', outputCol='label')
    stages += [label_stringIdx]

    assembler = VectorAssembler(inputCols=["diffOpenClose","rolling_average","diffHighLow"], outputCol="features")
    
    stages += [assembler]

    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df)
    df = pipelineModel.transform(df)
    df.select('close', 'label', 'features').show()
    
    (trainingData, testData) = df.randomSplit([0.8, 0.2])
    
    dr = DecisionTreeRegressor(labelCol="label", featuresCol="features")

    model = dr.fit(trainingData)
    predictions = model.transform(testData)
    predictions.select("close","label","prediction").show(10)

    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    return rmse

DecisionTree2_score = DecisionTreeRegressorLessCategorical(df)

+--------+-----+--------------------+
|   close|label|            features|
+--------+-----+--------------------+
| 43.9375|  1.0|[0.03125,43.9375,...|
|   44.25|  0.0|[-0.28125,44.0937...|
|44.34375|  0.0|[-0.125,44.177082...|
| 44.8125|  0.0|[-0.40625,44.3359...|
|    45.0|  0.0|[-0.03125,44.4687...|
|44.96875|  1.0|[0.0,44.552082061...|
|44.96875|  1.0|[0.0,44.723957061...|
|44.65625|  1.0|[0.15625,44.79166...|
|44.71875|  0.0|[-0.0625,44.85416...|
| 44.9375|  0.0|[-0.15625,44.875,...|
|44.59375|  1.0|[0.28125,44.80729...|
|43.46875|  1.0|[1.0,44.474998474...|
| 43.4375|  1.0|[0.03125,44.23125...|
|43.40625|  1.0|[0.53125,43.96875...|
| 43.5625|  0.0|[-0.15625,43.6937...|
|43.71875|  0.0|[-0.03125,43.5187...|
| 43.6875|  1.0|[0.15625,43.54687...|
|   44.25|  0.0|[-0.53125,43.6770...|
|44.34375|  0.0|[-0.125,43.828125...|
|44.40625|  0.0|[0.03125,43.99479...|
+--------+-----+--------------------+
only showing top 20 rows

+--------+-----+-------------------+
|   close|label|         

In [None]:
def DecisionTreeRegressorProc(df):
    categoricalColumns = ['High', 'Low', 'Open', 'Close']
    stages = []

    for categoricalCol in categoricalColumns:
        stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Index')
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
        stages += [stringIndexer, encoder]

    label_stringIdx = StringIndexer(inputCol='is_higher_than_rollavg', outputCol='label')
    stages += [label_stringIdx]

    assembler = VectorAssembler(inputCols=[c + "classVec" for c in categoricalColumns], outputCol="features")
    stages += [assembler]

    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df)
    df = pipelineModel.transform(df)
    df.select('close', 'label', 'features').show()
    
    (trainingData, testData) = df.randomSplit([0.8, 0.2])
    
    dr = DecisionTreeRegressor(labelCol="label", featuresCol="features")

    model = dr.fit(trainingData)
    predictions = model.transform(testData)
    predictions.select("close","label","prediction").show(10)

    evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    return rmse

DecisionTree_score = DecisionTreeRegressorProc(df)

+--------+-----+--------------------+
|   close|label|            features|
+--------+-----+--------------------+
| 43.9375|  1.0|(21724,[165,9859,...|
|   44.25|  0.0|(21724,[842,5658,...|
|44.34375|  0.0|(21724,[166,6293,...|
| 44.8125|  0.0|(21724,[16,5662,1...|
|    45.0|  0.0|(21724,[29,5467,1...|
|44.96875|  1.0|(21724,[4,5468,10...|
|44.96875|  1.0|(21724,[5,5458,10...|
|44.65625|  1.0|(21724,[7,5523,11...|
|44.71875|  0.0|(21724,[6,5480,10...|
| 44.9375|  0.0|(21724,[5,5482,10...|
|44.59375|  1.0|(21724,[851,5524,...|
|43.46875|  1.0|(21724,[845,9853,...|
| 43.4375|  1.0|(21724,[4488,6288...|
|43.40625|  1.0|(21724,[840,9852,...|
| 43.5625|  0.0|(21724,[4489,5657...|
|43.71875|  0.0|(21724,[4490,9856...|
| 43.6875|  1.0|(21724,[839,9854,...|
|   44.25|  0.0|(21724,[842,6289,...|
|44.34375|  0.0|(21724,[166,6293,...|
|44.40625|  0.0|(21724,[844,5660,...|
+--------+-----+--------------------+
only showing top 20 rows

