#Import useful libraries

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

#Read dataset from csv

**NB**: For some reason yahoo finance responded with 403 when trying to download the dataset directly with wget, so the file "BTC_USD.csv" (included in the archive) needs to be added to DBFS at "dbfs:/FileStore/BTC_USD/BTC_USD.csv"

In [None]:
#read from dbfs
df = spark.read.load("dbfs:/FileStore/BTC_USD/BTC_USD.csv",
                      format="csv", 
                      sep=",", 
                      inferSchema="true", 
                      header="true");

#Train-test split

Validation not needed because CrossValidator will use part of train set as validation set (KFold)

Proportion of split is ~ 70/30

In [None]:
df.createOrReplaceTempView('df_table')
train_set = spark.sql("SELECT Date, Close from df_table where Date < '20190917'")
test_set = spark.sql("SELECT Date, Close from df_table where Date >= '20190917'")

#Feature importance analysis

Pearson and Spearman correlation matrices used to study the correlation between each couple of features.

Being all features quite highly correlated, I choose to keep just one of them (close price) to be able to use a window of more days in the models

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

def show_matrix(matrix):
    """
    function to print a matrix on screen
    """
    print(matrix.collect()[0][matrix.columns[0]].toArray())
    print()

vector_col = "features"
assembler = VectorAssembler(inputCols=["Open", "Close", "High", "Low", "Volume"], outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

matrix_pearson = Correlation.corr(df_vector, vector_col)    #pearson is default
matrix_spearman = Correlation.corr(df_vector, vector_col, "spearman")
show_matrix(matrix_pearson)
show_matrix(matrix_spearman)

#Feature scaling

Tanh estimator used to scale all the feature values

In [None]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col, udf
from pyspark.sql.types import FloatType, StructType, ArrayType


train_set_stats = train_set.select(
    _mean(col('Close')).alias('mean'),
    _stddev(col('Close')).alias('std')
).collect()
mean = train_set_stats[0]['mean']    #mean of close prices
std = train_set_stats[0]['std']    #standard deviation of close prices


@udf(returnType=FloatType())
def tanh_estimator(x):
    """
    user defined function, applies tanh estimator's formula to a feature value x
    """
    return (float)(0.5 * (np.tanh(0.01*(x-mean)/std) + 1))

def scale_transform(df):
    """
    tranforms a dataframe applying tanh estimator udf
    """
    return df.select("Date", tanh_estimator("Close").alias("Close"))

scaled_train_set = scale_transform(train_set)
scaled_test_set = scale_transform(test_set)

#Sliding window

Window of 30 days is slided on the close prices in order to create train and test set, composed by examples such as: 

x={day(i), ... , day(i+29)}, y={day(i+30)}

In [None]:
from pyspark.sql.window import Window

def slide_window(df, window_size):
    """
    Returns two new dataframes:
    X - obtained sliding a window of given size (=#window_size) on the original dataframe, aggregating #window_size close prices on the same row 
    y - for each row of X, y contains a row with the (single) price of the day after last day contained in X 
    """
    
    w = Window.orderBy("Date")
    indexed_df = df.withColumn("Index", row_number().over(w)).select("Index", "Close")    #adding index to be able to loop following order and create windows

    schema = StructType([StructField("Close", ArrayType(FloatType()), False)])   #schema for X (array of floats)

    X = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema )
    y = spark.createDataFrame(spark.sparkContext.emptyRDD(), FloatType())

    length = indexed_df.count()
    for i in range(window_size+1, length+1):
        new_df = indexed_df.where(col("Index").between(i-window_size, i-1)).select("Close")    #select the window
        new_row = new_df.agg(collect_list("Close").alias("Close"))    #create new X's row with all prices from window
        X = X.union(new_row)
        new_row = indexed_df.where(col("Index") == i).select("Close")    #create new Y's row with price of the day after last day contained in X 
        y = y.union(new_row)
    
    return X, y

In [None]:
window = 30    #window size

X_train, y_train = slide_window(scaled_train_set, window)    #slide window on train set
X_test, y_test = slide_window(scaled_test_set, window)    #slide window on test set

#Merging X and y

X and y (for both train and test) need to be merged as the Pyspark regression models require them in a single dataframe

In [None]:
def merge_X_y(X, y):
    """
    merges two dataframes column-wise
    """
    schema = StructType(X.schema.fields + y.schema.fields)
    X_y = X.rdd.zip(y.rdd).map(lambda x: x[0]+x[1])
    return spark.createDataFrame(X_y, schema)

X_y_train = merge_X_y(X_train, y_train)
X_y_test = merge_X_y(X_test, y_test)

#Vectorization of windows

Windows represented as lists of days need to be converted to vectors (rows) of features as Pyspark regression models require dataframes in this form

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark import StorageLevel

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())    #converts list of prices to features vector 

def assemble_window(X_y):
    """
    applies list_to_vector_udf to given dataframe
    """
    return X_y.select(list_to_vector_udf(X_y["Close"]).alias("features"), X_y["value"].alias("label"))

X_y_train_vec = assemble_window(X_y_train)
X_y_test_vec = assemble_window(X_y_test)

#Hyperparameter tuning/model selection/evaluation

In this section linear regression and gradient-boosted trees regression models are cross-validated partitioning the train set in 3 folds in order to tune their hyperparameters and find the best model.

Then the best models found are tested on unseen data (test set) and the actual and predicted prices are plotted.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator

def cross_validate(model, param_grid, df):
    """
    Performs grid search on given model with given parameter grid, using given dataframe as train/validation data.
    Returns the validated model (ready to be used for predictions using best parameters found)
    """
    evaluator = RegressionEvaluator(metricName="rmse")
    cv = CrossValidator(estimator=model, estimatorParamMaps=param_grid, evaluator=evaluator)
    validated_model = cv.fit(df)
    return validated_model

#Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression

evaluator = RegressionEvaluator(metricName="rmse")

lr = LinearRegression(standardization=False)    #avoid standardization as tanh estimator has been applied yet
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.33, 0.66]).addGrid(lr.elasticNetParam, [0.33, 0.5, 0.66]).build()    #parameters to be tuned
validated_lr_model = cross_validate(lr, param_grid, X_y_train_vec)

#best parameters found
elasticNet = validated_lr_model.bestModel.getElasticNetParam()
reg = validated_lr_model.bestModel.getRegParam()

print("ElasicNetParam of best model -> ", elasticNet)
print("RegParam of best model -> = ", reg)

predictions = validated_lr_model.transform(X_y_test_vec)    #test on unseen data
RMSE = evaluator.evaluate(predictions)    #evaluate predictions using ROOT MEAN SQUARED ERROR
print("RMSE of best model on unseen data -> ", RMSE)

In [None]:
def plot_predictions(predictions):
    """
    plots two lines representing predicted and actual prices
    """
    pandas_df = predictions.select('label', 'prediction').toPandas()
    plt.figure(figsize=(20, 7))
    plt.plot(range(len(pandas_df['label'].values)), pandas_df['label'].values, label = 'Actual Price', color = 'blue')
    plt.plot(range(len(pandas_df['prediction'].values)), pandas_df['prediction'].values, label = 'Predicted Price', color = 'red')
    plt.xticks(np.arange(100, pandas_df.shape[0], 200))
    plt.xlabel('Time')
    plt.ylabel('Price (scaled)')
    plt.legend()
    plt.show()
    
plot_predictions(predictions)    #plot linear regression's predictions

#Gradient-boosted trees

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt_r = GBTRegressor()
param_grid = ParamGridBuilder().addGrid(gbt_r.maxDepth, [4, 8, 12]).addGrid(gbt_r.featureSubsetStrategy, ['0.33', '0.66']).build()    #parameters to be tuned
validated_gbt_model = cross_validate(gbt_r, param_grid, X_y_train_vec)

#best parameters found
max_depth = validated_gbt_model.bestModel.getMaxDepth()
subsample = validated_gbt_model.bestModel.getFeatureSubsetStrategy()

print("maxDepth of best model -> ", max_depth)
print("featureSubsetStrategy of best model -> = ", subsample)

predictions = validated_gbt_model.transform(X_y_test_vec)    #test on unseen data
RMSE = evaluator.evaluate(predictions)    #evaluate predictions using ROOT MEAN SQUARED ERROR
print("RMSE of best model on unseen data ->  ", RMSE)

In [None]:
plot_predictions(predictions)    #plot gradient-boosted trees' predicitons