In [125]:
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

import sklearn.metrics as metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import RandomizedSearchCV

def init_spark():
    spark = SparkSession \
        .builder \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

# read and clean the csv
spark = init_spark()
filename = 'data/stock_histories.csv/part-00000-d2cb8732-8515-48d6-bda6-4f0b77fe86bb-c000.csv'
df = spark.read.csv(filename, header=True, mode="DROPMALFORMED")

In [126]:
df = df.withColumn("volume",df.volume.cast(IntegerType()))
df = df.withColumn("open",df.open.cast(FloatType()))
df = df.withColumn("close",df.close.cast(FloatType()))
df = df.withColumn("adjusted_close",df.close.cast(FloatType()))
df = df.withColumn("high",df.high.cast(FloatType()))
df = df.withColumn("low",df.low.cast(FloatType()))

In [127]:
nvda = df.where(df.stock == "IBM")

In [128]:
nvda.show()

+-----+----------+--------+------+------+------+------+--------------+
|stock|      date|  volume|  open| close|  high|   low|adjusted_close|
+-----+----------+--------+------+------+------+------+--------------+
|  IBM|2018-11-02|10587400|117.51|115.67|117.75|114.54|        115.67|
|  IBM|2018-11-01|13626700| 115.5|116.83| 117.0|115.11|        116.83|
|  IBM|2018-10-31|21645000|116.49|115.43| 116.5|114.09|        115.43|
|  IBM|2018-10-30|21458600|120.48| 115.4| 121.5|115.15|         115.4|
|  IBM|2018-10-29|20450400|119.34|119.64|123.95| 118.3|        119.64|
|  IBM|2018-10-26| 8767000|125.21|124.79|125.78|123.71|        124.79|
|  IBM|2018-10-25|10304300|127.25|126.45|127.79|125.14|        126.45|
|  IBM|2018-10-24| 5710300|131.17|127.21|131.69| 127.0|        127.21|
|  IBM|2018-10-23| 6463600|129.02|131.21| 131.9|128.41|        131.21|
|  IBM|2018-10-22| 5719500|129.58|130.02|130.44| 128.4|        130.02|
|  IBM|2018-10-19| 7791600|130.65| 129.1|131.89|127.96|         129.1|
|  IBM

In [129]:
nvda.orderBy(["date"]).select("date").first()

Row(date='1970-01-02')

In [130]:
nvda.orderBy(["date"], ascending=False).select("date").first()

Row(date='2018-11-02')

In [131]:
# feature engineering: Quarter, week of year, year, day of week
nvda = nvda.withColumn("quarter", quarter(col("date")))
nvda = nvda.withColumn("week_of_year", weekofyear(col("date")))
nvda = nvda.withColumn("year", year(col("date")))
nvda = nvda.withColumn("day_of_week", dayofweek(col("date")))
nvda = nvda.withColumn("year_quarter", concat(nvda.year, lit("-"), nvda.quarter).alias("year_quarter"))

In [132]:
nvda.select("date", "quarter", "week_of_year", "year", "day_of_week", "year_quarter").show()

+----------+-------+------------+----+-----------+------------+
|      date|quarter|week_of_year|year|day_of_week|year_quarter|
+----------+-------+------------+----+-----------+------------+
|2018-11-02|      4|          44|2018|          6|      2018-4|
|2018-11-01|      4|          44|2018|          5|      2018-4|
|2018-10-31|      4|          44|2018|          4|      2018-4|
|2018-10-30|      4|          44|2018|          3|      2018-4|
|2018-10-29|      4|          44|2018|          2|      2018-4|
|2018-10-26|      4|          43|2018|          6|      2018-4|
|2018-10-25|      4|          43|2018|          5|      2018-4|
|2018-10-24|      4|          43|2018|          4|      2018-4|
|2018-10-23|      4|          43|2018|          3|      2018-4|
|2018-10-22|      4|          43|2018|          2|      2018-4|
|2018-10-19|      4|          42|2018|          6|      2018-4|
|2018-10-18|      4|          42|2018|          5|      2018-4|
|2018-10-17|      4|          42|2018|  

In [133]:
# get last full quarter
# step 1: calculate days in each quarters - a full quarter has at least 60 days 
quarter_day_count = nvda.groupby("year_quarter").count()\
                                                .filter("count >= 60")
last_quarter = quarter_day_count.orderBy(["year_quarter"], ascending=False).select("year_quarter").first()

In [134]:
last_quarter[0]

'2018-3'

In [135]:
quarter_day_count = quarter_day_count.withColumn("is_test", when(col("year_quarter") == last_quarter[0], 1).otherwise(0))

In [136]:
quarter_day_count.show()

+------------+-----+-------+
|year_quarter|count|is_test|
+------------+-----+-------+
|      1973-1|   62|      0|
|      1984-3|   63|      0|
|      1987-2|   63|      0|
|      1998-1|   61|      0|
|      1995-3|   63|      0|
|      2003-4|   64|      0|
|      2006-2|   63|      0|
|      2014-1|   61|      0|
|      2013-4|   64|      0|
|      1993-3|   64|      0|
|      1970-4|   64|      0|
|      1991-2|   64|      0|
|      1975-2|   64|      0|
|      1991-4|   64|      0|
|      1980-1|   63|      0|
|      1974-2|   63|      0|
|      1985-2|   63|      0|
|      2000-1|   63|      0|
|      1995-1|   63|      0|
|      1976-3|   64|      0|
+------------+-----+-------+
only showing top 20 rows



In [137]:
nvda = nvda.join(quarter_day_count, "year_quarter")

In [138]:
nvda.count()

12239

In [139]:
open_average = nvda.groupby("year_quarter")\
                   .agg(avg("open").alias("open_avg"))
windowSpec = Window.orderBy("year_quarter")
open_average = open_average.withColumn("open_avg_l1", lag("open_avg", 1).over(windowSpec))
open_average = open_average.withColumn("open_avg_l2", lag("open_avg", 2).over(windowSpec))
open_average = open_average.withColumn("open_avg_l3", lag("open_avg", 3).over(windowSpec))
open_average = open_average.withColumn("open_avg_l4", lag("open_avg", 4).over(windowSpec))
open_average = open_average.drop("open_avg") # drop unused col

In [140]:
volume_average = nvda.groupby("year_quarter")\
                     .agg(avg("volume").alias("volume_avg"))
volume_average = volume_average.withColumn("volume_avg_l1", lag("volume_avg", 1).over(windowSpec))
volume_average = volume_average.withColumn("volume_avg_l2", lag("volume_avg", 2).over(windowSpec))
volume_average = volume_average.withColumn("volume_avg_l3", lag("volume_avg", 3).over(windowSpec))
volume_average = volume_average.withColumn("volume_avg_l4", lag("volume_avg", 4).over(windowSpec))
volume_average = volume_average.drop("volume_avg") # drop unused col

In [141]:
high_average = nvda.groupby("year_quarter")\
                   .agg(avg("high").alias("high_avg"))
high_average = high_average.withColumn("high_avg_l1", lag("high_avg", 1).over(windowSpec))
high_average = high_average.withColumn("high_avg_l2", lag("high_avg", 2).over(windowSpec))
high_average = high_average.withColumn("high_avg_l3", lag("high_avg", 3).over(windowSpec))
high_average = high_average.withColumn("high_avg_l4", lag("high_avg", 4).over(windowSpec))
high_average = high_average.drop("high_avg") # drop unused col

In [142]:
low_average = nvda.groupby("year_quarter")\
                   .agg(avg("low").alias("low_avg"))
low_average = low_average.withColumn("low_avg_l1", lag("low_avg", 1).over(windowSpec))
low_average = low_average.withColumn("low_avg_l2", lag("low_avg", 2).over(windowSpec))
low_average = low_average.withColumn("low_avg_l3", lag("low_avg", 3).over(windowSpec))
low_average = low_average.withColumn("low_avg_l4", lag("low_avg", 4).over(windowSpec))
low_average = low_average.drop("low_avg") # drop unused col

In [143]:
close_average = nvda.groupby("year_quarter")\
                    .agg(avg("close").alias("close_avg"))
close_average = close_average.withColumn("close_avg_l1", lag("close_avg", 1).over(windowSpec))
close_average = close_average.withColumn("close_avg_l2", lag("close_avg", 2).over(windowSpec))
close_average = close_average.withColumn("close_avg_l3", lag("close_avg", 3).over(windowSpec))
close_average = close_average.withColumn("close_avg_l4", lag("close_avg", 4).over(windowSpec))
close_average = close_average.drop("close_avg") # drop unused col

In [144]:
adj_close_average = nvda.groupby("year_quarter")\
                        .agg(avg("adjusted_close").alias("adj_close_avg"))
adj_close_average = adj_close_average.withColumn("adj_close_avg_l1", lag("adj_close_avg", 1).over(windowSpec))
adj_close_average = adj_close_average.withColumn("adj_close_avg_l2", lag("adj_close_avg", 2).over(windowSpec))
adj_close_average = adj_close_average.withColumn("adj_close_avg_l3", lag("adj_close_avg", 3).over(windowSpec))
adj_close_average = adj_close_average.withColumn("adj_close_avg_l4", lag("adj_close_avg", 4).over(windowSpec))
adj_close_average = adj_close_average.drop("adj_close_avg") # drop unused col

In [145]:
nvda = nvda.join(open_average, "year_quarter")\
           .join(volume_average, "year_quarter")\
           .join(high_average, "year_quarter")\
           .join(low_average, "year_quarter")\
           .join(close_average, "year_quarter")\
           .join(adj_close_average, "year_quarter")

In [146]:
open_average.show(5)

+------------+------------------+------------------+------------------+-----------------+
|year_quarter|       open_avg_l1|       open_avg_l2|       open_avg_l3|      open_avg_l4|
+------------+------------------+------------------+------------------+-----------------+
|      1970-1|              null|              null|              null|             null|
|      1970-2| 17.23662911086786|              null|              null|             null|
|      1970-3|14.302403802138109| 17.23662911086786|              null|             null|
|      1970-4|12.909082069993019|14.302403802138109| 17.23662911086786|             null|
|      1971-1|15.098828166723251|12.909082069993019|14.302403802138109|17.23662911086786|
+------------+------------------+------------------+------------------+-----------------+
only showing top 5 rows



In [147]:
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [148]:
# drop rows with nulls (i.e. first four quarters)
nvda = nvda.na.drop()

In [149]:
nvda = nvda.toPandas()
nvda.columns

                                                                                

Index(['year_quarter', 'stock', 'date', 'volume', 'open', 'close', 'high',
       'low', 'adjusted_close', 'quarter', 'week_of_year', 'year',
       'day_of_week', 'count', 'is_test', 'open_avg_l1', 'open_avg_l2',
       'open_avg_l3', 'open_avg_l4', 'volume_avg_l1', 'volume_avg_l2',
       'volume_avg_l3', 'volume_avg_l4', 'high_avg_l1', 'high_avg_l2',
       'high_avg_l3', 'high_avg_l4', 'low_avg_l1', 'low_avg_l2', 'low_avg_l3',
       'low_avg_l4', 'close_avg_l1', 'close_avg_l2', 'close_avg_l3',
       'close_avg_l4', 'adj_close_avg_l1', 'adj_close_avg_l2',
       'adj_close_avg_l3', 'adj_close_avg_l4'],
      dtype='object')

In [150]:
predictor_features = ['quarter', 'week_of_year', 'year', 'day_of_week', 'open_avg_l1', 'open_avg_l2',
                      'open_avg_l3', 'open_avg_l4', 'volume_avg_l1', 'volume_avg_l2', 'volume_avg_l3', 
                      'volume_avg_l4', 'high_avg_l1', 'high_avg_l2', 'high_avg_l3', 'high_avg_l4', 
                      'low_avg_l1', 'low_avg_l2', 'low_avg_l3', 'low_avg_l4', 'close_avg_l1', 'close_avg_l2', 
                      'close_avg_l3', 'close_avg_l4', 'adj_close_avg_l1', 'adj_close_avg_l2', 'adj_close_avg_l3', 
                      'adj_close_avg_l4']

In [151]:
x_train, x_test = nvda[nvda.is_test == 0][predictor_features], nvda[nvda.is_test == 1][predictor_features]

In [152]:
y_train, y_test = nvda[nvda.is_test == 0]['adjusted_close'], nvda[nvda.is_test == 1]['adjusted_close']

In [None]:
# hypertuning steps
import numpy as np
import random

model = RandomForestRegressor()  # blank/boilerplate model

grid_rf = {
    "n_estimators": [20, 50, 100, 500, 1000],
    "max_depth": np.arange(1, 15, 1),
    "min_samples_split": [2, 10, 9],
    "min_samples_leaf": np.arange(1, 15, 2, dtype=int),
    "bootstrap": [True, False],
    "random_state": [1, 2, 30, 42, random.randint(0, (2**32 - 1))],
}

rscv = RandomizedSearchCV(
    estimator=model, param_distributions=grid_rf, cv=3, n_jobs=-1, verbose=2, n_iter=200
)
rscv_fit = rscv.fit(x_train, y_train)
best_parameters = rscv_fit.best_params_
print(best_parameters)

Fitting 3 folds for each of 200 candidates, totalling 600 fits


In [None]:
model = RandomForestRegressor(
    random_state=best_parameters["random_state"],
    n_estimators=best_parameters["n_estimators"],
    min_samples_split=best_parameters["min_samples_split"],
    min_samples_leaf=best_parameters["min_samples_leaf"],
    max_depth=best_parameters["max_depth"],
    bootstrap=best_parameters["bootstrap"],
)

model.fit(x_train, y_train)

predict = model.predict(x_test)

In [None]:
print(metrics.mean_absolute_error(y_test, predict))
print(metrics.mean_squared_error(y_test, predict))
print(np.sqrt(metrics.mean_squared_error(y_test, predict)))
# print(metrics.r2_scorex(y_test, predict))

In [None]:
[(y_test.tolist()[i], predict[i]) for i in range(len(y_test))]

In [None]:
nvda.head(10)

In [None]:
# look at feature importance after all the features are engineered

In [None]:
# last 7 days, last 14 days, last 28 days, last 54 days, last 72 days.. 
# 2014-03-31 - (21-28), (14-28), ...
# 2014-03-30 - (20-27), (13-27),
# 2014-03-29 - 
# 2014-03-28 - 
# 2014-03-27 - 
# 2014-03-26
# 2014-03-25
# 2014-03-24
# 2014-03-23
# 2014-03-22
# 2014-03-21
# .....
# 2014-03-01 - ()

# ----

# 2014-02-28
# ...
# 2014-01-01