#### Import Libraries

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
%matplotlib inline

In [2]:
from pyspark.sql import types
from pyspark.sql.functions import col

In [3]:
import requests, pandas as pd, numpy as np
from pandas import DataFrame
from io import StringIO
import time, json
from datetime import date
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.arima_model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_squared_error
import matplotlib.pylab as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

#### Create Spark Session

In [4]:
# Reason why we have the getOrCreate code
# http://stackoverflow.com/questions/28999332/how-to-access-sparkcontext-in-pyspark-script
sc = SparkContext.getOrCreate()

#### Load Data

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Stock_price_prediction').getOrCreate()
df = spark.read.csv('AAPL_stock_Quandl.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Adj_Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [6]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+--------------------+
|summary|             Open|              High|               Low|         Adj_Close|              Volume|
+-------+-----------------+------------------+------------------+------------------+--------------------+
|  count|             9594|              9594|              9594|              9594|                9594|
|   mean|24.55178285275794|24.785660443571015|24.301431815118807| 24.54790937998181|1.2427037658640817E7|
| stddev| 45.1043238715968| 45.48898305164385|44.702582603777564|45.102037176071086| 1.687100863123329E7|
|    min|      0.159815622|       0.159815622|       0.158090993|       0.158090993|                4471|
|    max|      228.9938134|       231.6629934|       228.0015532|       230.2738291|           189560600|
+-------+-----------------+------------------+------------------+------------------+--------------------+



#### Feature Engineering

In [10]:
df = df.withColumn('O-L', df['Open'] - df['Low'])
df = df.withColumn('O-C', df.Open - df.Adj_Close)
df = df.withColumn('H-L', df.High - df.Low)

In [11]:
from pyspark.sql.functions import lag, lit, lead
from pyspark.sql.window import Window

In [12]:
df = df.withColumn('dummy_column', lit('Stocks!'))

In [13]:
w = Window.orderBy("dummy_column")
value_lead = lead('Adj_Close', count=10).over(w)
df = df.withColumn('Next_Adj_Close', value_lead)

#### Create Seasonality Features

In [19]:
from pyspark.sql.functions import to_date, date_format
from pyspark.sql.functions import year, month
from pyspark.sql.functions import dayofmonth, weekofyear, dayofweek

In [20]:
df = df.withColumn('DATEFORMAT', to_date('Date'))

In [21]:
df = df.withColumn('YEAR', year('DATEFORMAT'))
df = df.withColumn('MONTH', month('DATEFORMAT'))
df = df.withColumn('DAYOFMONTH', dayofmonth('DATEFORMAT'))
df = df.withColumn('DAYOFMONTH', dayofmonth('DATEFORMAT'))
df = df.withColumn('DAYOFWEEK', dayofweek('DATEFORMAT'))
df = df.withColumn('WEEKOFYEAR', weekofyear('DATEFORMAT'))
df = df.withColumn("week", date_format('DATEFORMAT', "W"))

In [22]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window

In [23]:
w = Window().orderBy(df['DATE'])

In [24]:
from pyspark.sql.functions import coalesce, lit, col, lead, lag
from operator import add
from functools import reduce

def weighted_average(c, window, offsets, weights):
    assert len(weights) == len(offsets)

    def value(i):
        if i < 0: return lag(c, -i).over(window)
        if i > 0: return lead(c, i).over(window)
        return c

    values = [coalesce(value(i) * w, lit(0)) for i, w in zip(offsets, weights)]

    return reduce(add, values, lit(0))

In [25]:
w = Window.partitionBy("dummy_column").orderBy("DATEFORMAT")
# offsets, delays =  [-1,0], [0.5, 0.5]
a = [1/20] *20
offsets, delays =   [int(e) for e in list(np.arange(20)*-1)] , a
df = df.withColumn("Avg_Close_20", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [26]:
a = [1/10] *10
offsets, delays =   [int(e) for e in list(np.arange(10)*-1)] , a
df = df.withColumn("Avg_Close_10", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [27]:
a = [1/5] *5
offsets, delays =   [int(e) for e in list(np.arange(5)*-1)] , a
df = df.withColumn("Avg_Close_5", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [28]:
a = [1/80] *80
offsets, delays =   [int(e) for e in list(np.arange(80)*-1)] , a
df = df.withColumn("Avg_Close_80", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [29]:
offsets, delays =   [0, 1, 8, 15], [0.25,0.25,0.25,0.25]
df = df.withColumn("Avg_Close_0_1_8_15", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [30]:
offsets, delays =   [0, 1, 3, 5], [0.25,0.25,0.25,0.25]
df = df.withColumn("Avg_Close_0_1_3_5", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [31]:
offsets, delays =   [0, 1, 3, 5], [0.25,0.25,0.25,0.25]
df = df.withColumn("Avg_Close_0_1_3_5", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [32]:
offsets, delays =   [0, 1, 5, 20, 80], [0.2,0.2,0.2,0.2,0.2]
df = df.withColumn("Avg_Close_0_1_5_20_80", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [33]:
offsets, delays =   [0, 1, 2], [0.333,0.333,0.333]
df = df.withColumn("Avg_Close_3", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [34]:
offsets, delays =   [0, 1], [0.5,0.5]
df = df.withColumn("Avg_Close_2", weighted_average(col("DAYOFMONTH"), w, offsets, delays))

In [35]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Adj_Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- O-L: double (nullable = true)
 |-- dummy_column: string (nullable = false)
 |-- Next_Adj_Close: double (nullable = true)
 |-- DATEFORMAT: date (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAYOFMONTH: integer (nullable = true)
 |-- DAYOFWEEK: integer (nullable = true)
 |-- WEEKOFYEAR: integer (nullable = true)
 |-- week: string (nullable = true)
 |-- Avg_Close_20: double (nullable = false)
 |-- Avg_Close_10: double (nullable = false)
 |-- Avg_Close_5: double (nullable = false)
 |-- Avg_Close_80: double (nullable = false)
 |-- Avg_Close_0_1_8_15: double (nullable = false)
 |-- Avg_Close_0_1_3_5: double (nullable = false)
 |-- Avg_Close_0_1_5_20_80: double (nullable = false)
 |-- Avg_Close_3: double (null

In [42]:
df1[cols] = df1[cols].apply(pd.to_numeric, errors='coerce')

In [43]:
inputCols = list(df1.columns.drop(['Date','Next_Adj_Close']))

#### Create vector Assembler

In [47]:
from pyspark.ml.feature import VectorAssembler

In [48]:
df = spark.read.csv('final_data_2weeks.csv', header = True, inferSchema = True)
# df.printSchema()

In [49]:
df_assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
df = df_assembler.transform(df)

#### Split Data for Train & Testing

In [53]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("date")))

In [54]:
test_df = df.where("rank > .8").drop("rank")
train_df = df.where("rank <= .8").drop("rank")

In [55]:
training_df=train_df.select(['features','Next_Adj_Close'])
test_df=test_df.select(['features','Next_Adj_Close'])

In [56]:
training_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Next_Adj_Close: double (nullable = true)



In [57]:
training_df.show(1)

+--------------------+--------------+
|            features|Next_Adj_Close|
+--------------------+--------------+
|[0.371801272,0.37...|   0.409599391|
+--------------------+--------------+
only showing top 1 row



#### Linear Regression Model

In [58]:
from pyspark.ml.regression import LinearRegression
lin_Reg=LinearRegression(labelCol='Next_Adj_Close')

In [59]:
lr_model = lin_Reg.fit(training_df)

In [60]:
lr_model.intercept

-12.152734028777544

In [61]:
print(lr_model.coefficients)

[0.26196493790931297,0.25649065029117035,0.26836888636629863,0.26242684905452435,2.0081525907301185e-09,0.30118745915321854,0.005678093969356122,-0.0033289035238435994,-0.010611173931461954,0.009928366024230046,0.0017398715152661785,0.08895654288664033,-0.022478078530639913,-0.0034150175142458107,0.0009118580230399939,0.061967872576272245,0.006000093492723331,0.006197580379800791,0.0017835013928192288,0.000922637516614471,-0.007989496329536097,-0.4872375944015094,-0.2142385342059671,0.005558242357116584,-0.13795933522645296,-0.09226942179169335,0.04608427787325021,0.06331251808076013,0.00837133986862244,0.0962730938815078,-0.015237715629850705,-0.36950274396457533,-0.7752821277517656,-0.39996915837749114,0.15682925949588186]


In [62]:
training_predictions=lr_model.evaluate(training_df)

#### Model Evaluation

In [63]:
training_predictions.meanSquaredError

0.38500370265179373

In [64]:
training_predictions.r2

0.995049883290615

In [65]:
test_results=lr_model.evaluate(test_df)

In [66]:
#view the residual errors based on predictions 
test_results.residuals.show(10)

+--------------------+
|           residuals|
+--------------------+
| -2.0107649347974714|
| -2.2989881562630856|
|  -2.933175019558533|
|  -3.082065158504612|
| -3.4125569273422656|
| -3.5286413502111174|
| -1.3318217029092523|
| -1.5577026476689326|
| -0.4189549525743175|
|-0.34703610709799904|
+--------------------+
only showing top 10 rows



In [67]:
#Test Set results

results=lr_model.evaluate(test_df).predictions

results.select(['Next_Adj_Close','prediction']).show(10,False)

+--------------+-----------------+
|Next_Adj_Close|prediction       |
+--------------+-----------------+
|41.02800281   |43.03876774479747|
|41.76163274   |44.06062089626309|
|41.04684604   |43.98002105955853|
|40.84710776   |43.92917291850461|
|40.23156209   |43.64411901734226|
|39.61099157   |43.13963292021112|
|40.86469478   |42.19651648290925|
|40.52677277   |42.08447541766893|
|41.60963065   |42.02858560257432|
|40.99659742   |41.343633527098  |
+--------------+-----------------+
only showing top 10 rows



In [68]:
type(test_results)

pyspark.ml.regression.LinearRegressionSummary

In [69]:
test_results.r2

0.982910168051495

In [70]:
test_results.rootMeanSquaredError

5.785639449169558

In [71]:
test_results.meanSquaredError

33.47362383578703

In [72]:
results.show()

+--------------------+--------------+------------------+
|            features|Next_Adj_Close|        prediction|
+--------------------+--------------+------------------+
|[42.05809963,42.4...|   41.02800281| 43.03876774479747|
|[42.84951549,43.6...|   41.76163274| 44.06062089626309|
|[43.82559504,44.2...|   41.04684604| 43.98002105955853|
|[43.52787193,43.7...|   40.84710776| 43.92917291850461|
|[43.11080834,43.3...|   40.23156209| 43.64411901734226|
|[43.31934014,43.5...|   39.61099157| 43.13963292021112|
|[42.4814443,42.48...|   40.86469478| 42.19651648290925|
|[41.67872251,42.0...|   40.52677277| 42.08447541766893|
|[41.86338621,41.9...|   41.60963065| 42.02858560257432|
|[41.52420799,41.6...|   40.99659742|   41.343633527098|
|[41.10337575,41.2...|   41.71138412|41.290774744566576|
|[41.45511613,41.8...|   42.11588555|  42.0806183659393|
|[41.42371074,41.4...|   41.96262725| 41.47662680404716|
|[41.06568928,41.2...|   42.16739039| 41.27144750777135|
|[41.32823835,41.3...|   43.120

In [76]:
best_features_lin = np.array(sorted(range(len(list((lr_model.coefficients)))), key=lambda i: list((lr_model.coefficients))[i], reverse=True)[:15])

In [77]:
inputCols = pd.Series(inputCols)

#### Important Features

In [78]:
print(list(inputCols[best_features_lin]))

['O-L', 'Low', 'Adj_Close', 'Open', 'High', 'StdClose_80', 'MaxClose_20', 'week', 'MaxClose_5', 'Avg_Close_80', 'MinClose_80', 'DAYOFWEEK', 'MaxClose_10', 'Avg_Close_0_1_3_5', 'Avg_Close_0_1_8_15']


#### Random Forest Regressor

In [79]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(labelCol='Next_Adj_Close')


In [80]:
rf_model = rf.fit(training_df)

In [81]:
results = rf_model.transform(test_df)

In [82]:
results_rf = rf_model.transform(test_df)

results_rf.select(['Next_Adj_Close','prediction']).show(10,False)

+--------------+------------------+
|Next_Adj_Close|prediction        |
+--------------+------------------+
|41.02800281   |41.94620892863519 |
|41.76163274   |41.06625277648734 |
|41.04684604   |37.958130722299764|
|40.84710776   |38.05938084661707 |
|40.23156209   |38.18338037732402 |
|39.61099157   |38.4054204218328  |
|40.86469478   |38.04074070697382 |
|40.52677277   |37.954718894714674|
|41.60963065   |37.9064421237059  |
|40.99659742   |38.39565183656198 |
+--------------+------------------+
only showing top 10 rows



In [84]:
evaluator = RegressionEvaluator(
    labelCol="Next_Adj_Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(results)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 77.9411


In [85]:
test_results = rf_model.transform(test_df)

#### Gradient Boosting model

In [87]:
from pyspark.ml.regression import GBTRegressor
gbr = GBTRegressor(labelCol='Next_Adj_Close')
gbr_model = gbr.fit(training_df)
results_gbr = gbr_model.transform(test_df)

In [88]:
results_gbr = gbr_model.transform(test_df)
# test_results=gbr_model.transform(test_df)

results_gbr.select(['Next_Adj_Close','prediction']).show(10,False)

+--------------+------------------+
|Next_Adj_Close|prediction        |
+--------------+------------------+
|41.02800281   |42.54162418140933 |
|41.76163274   |43.09764074468664 |
|41.04684604   |42.66045155565485 |
|40.84710776   |42.917523149026486|
|40.23156209   |42.917523149026486|
|39.61099157   |42.66045155565485 |
|40.86469478   |41.79420747427775 |
|40.52677277   |42.39503119893849 |
|41.60963065   |42.41858608131081 |
|40.99659742   |42.756618724460196|
+--------------+------------------+
only showing top 10 rows



In [89]:
results.show()

+--------------------+--------------+------------------+
|            features|Next_Adj_Close|        prediction|
+--------------------+--------------+------------------+
|[42.05809963,42.4...|   41.02800281| 41.94620892863519|
|[42.84951549,43.6...|   41.76163274| 41.06625277648734|
|[43.82559504,44.2...|   41.04684604|37.958130722299764|
|[43.52787193,43.7...|   40.84710776| 38.05938084661707|
|[43.11080834,43.3...|   40.23156209| 38.18338037732402|
|[43.31934014,43.5...|   39.61099157|  38.4054204218328|
|[42.4814443,42.48...|   40.86469478| 38.04074070697382|
|[41.67872251,42.0...|   40.52677277|37.954718894714674|
|[41.86338621,41.9...|   41.60963065|  37.9064421237059|
|[41.52420799,41.6...|   40.99659742| 38.39565183656198|
|[41.10337575,41.2...|   41.71138412| 38.22188856306198|
|[41.45511613,41.8...|   42.11588555| 38.22188856306198|
|[41.42371074,41.4...|   41.96262725| 38.22188856306198|
|[41.06568928,41.2...|   42.16739039| 38.03097212170301|
|[41.32823835,41.3...|   43.120

In [90]:
evaluator = RegressionEvaluator(
    labelCol="Next_Adj_Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(results_gbr)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 76.1831


In [91]:
results_gbr.select(['prediction'])

DataFrame[prediction: double]

#### Decision Tree Regressor

In [93]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'Next_Adj_Close')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="Next_Adj_Close", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 75.9021


In [94]:
DT_best_features = list(dt_model.featureImportances)

best_features = np.array(sorted(range(len(list((DT_best_features)))), key=lambda i: list((lr_model.coefficients))[i], reverse=True)[:15])

In [95]:
best_features

array([ 5,  2,  3,  0,  1, 34, 29, 11, 27, 15, 26,  9, 28, 17, 16])

In [96]:
print(list(inputCols[best_features]))

['O-L', 'Low', 'Adj_Close', 'Open', 'High', 'StdClose_80', 'MaxClose_20', 'week', 'MaxClose_5', 'Avg_Close_80', 'MinClose_80', 'DAYOFWEEK', 'MaxClose_10', 'Avg_Close_0_1_3_5', 'Avg_Close_0_1_8_15']
