# Regression Model

## Import Data

In [None]:
# File location and type
file_location = "/FileStore/tables/GOOGLE.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Date,High,Low,Open,Close,Volume,Adj Close,company_name
2017-01-03T00:00:00.000+0000,789.6300048828125,775.7999877929688,778.8099975585938,786.1400146484375,1657300,786.1400146484375,GOOGLE
2017-01-04T00:00:00.000+0000,791.3400268554688,783.1599731445312,788.3599853515625,786.9000244140625,1073000,786.9000244140625,GOOGLE
2017-01-05T00:00:00.000+0000,794.47998046875,785.02001953125,786.0800170898438,794.02001953125,1335200,794.02001953125,GOOGLE
2017-01-06T00:00:00.000+0000,807.9000244140625,792.2039794921875,795.260009765625,806.1500244140625,1640200,806.1500244140625,GOOGLE
2017-01-09T00:00:00.000+0000,809.9660034179688,802.8300170898438,806.4000244140625,806.6500244140625,1274600,806.6500244140625,GOOGLE
2017-01-10T00:00:00.000+0000,809.1300048828125,803.510009765625,807.8599853515625,804.7899780273438,1176800,804.7899780273438,GOOGLE
2017-01-11T00:00:00.000+0000,808.1500244140625,801.3699951171875,805.0,807.9099731445312,1065900,807.9099731445312,GOOGLE
2017-01-12T00:00:00.000+0000,807.3900146484375,799.1699829101562,807.1400146484375,806.3599853515625,1353100,806.3599853515625,GOOGLE
2017-01-13T00:00:00.000+0000,811.2239990234375,806.6900024414062,807.47998046875,807.8800048828125,1099200,807.8800048828125,GOOGLE
2017-01-17T00:00:00.000+0000,807.1400146484375,800.3699951171875,807.0800170898438,804.6099853515625,1362100,804.6099853515625,GOOGLE


In [None]:
display(df.select("Adj Close").summary())

summary,Adj Close
count,987.0
mean,1164.8144006854736
stddev,215.44599608390607
min,786.1400146484375
25%,1025.760009765625
50%,1134.7900390625
75%,1269.22998046875
max,1807.1199951171875


We re use the **exponential moving average** (EMA) to have our true values

In [None]:
def exponential_moving_average(df, column_name, period, smooth=2):
    prices = df.select(column_name).rdd.flatMap(lambda x: x).collect()
    ema = [sum(prices[:period]) / period]
    for price in prices[period:]:
        ema.append((price * (smooth / (1 + period))) + ema[-1] * (1 - (smooth / (1 + period))))
    
    return ema

We create a new DataFrame containing only EMA for Closing price

In [None]:
from pyspark.sql.types import DoubleType
df = df.drop('High','Open','Low','Adj Close','Volume','company_name')
new_df = spark.createDataFrame(exponential_moving_average(df,df['Close'],10), DoubleType())
new_df = new_df.withColumnRenamed("value",'EMA')
columns = ['EMA']
vals = [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
nr = spark.createDataFrame(vals, DoubleType())
nr = nr.withColumnRenamed("value",'EMA')
new_df = nr.union(new_df)
display(new_df)

EMA
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
801.1410034179687


We add an ID column

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

DF1 = df.withColumn("row_id", monotonically_increasing_id())
display(DF1)

Date,Close,row_id
2017-01-03T00:00:00.000+0000,786.1400146484375,0
2017-01-04T00:00:00.000+0000,786.9000244140625,1
2017-01-05T00:00:00.000+0000,794.02001953125,2
2017-01-06T00:00:00.000+0000,806.1500244140625,3
2017-01-09T00:00:00.000+0000,806.6500244140625,4
2017-01-10T00:00:00.000+0000,804.7899780273438,5
2017-01-11T00:00:00.000+0000,807.9099731445312,6
2017-01-12T00:00:00.000+0000,806.3599853515625,7
2017-01-13T00:00:00.000+0000,807.8800048828125,8
2017-01-17T00:00:00.000+0000,804.6099853515625,9


Finally, our dataset for the regression model is a DataFrame containing for each value in the Close price column its EMA calculated

In [None]:
from pyspark.sql.functions import col

DF2 = new_df.rdd.zipWithIndex().toDF()
DF2 = DF2.select(col("_1.*"),col("_2").alias('row_id'))
display(DF2)
result_df = DF1.join(DF2, ("row_id")).drop("row_id")
display(result_df)


EMA,row_id
0.0,0
0.0,1
0.0,2
0.0,3
0.0,4
0.0,5
0.0,6
0.0,7
0.0,8
801.1410034179687,9


Date,Close,EMA
2017-01-03T00:00:00.000+0000,786.1400146484375,0.0
2017-01-04T00:00:00.000+0000,786.9000244140625,0.0
2017-01-05T00:00:00.000+0000,794.02001953125,0.0
2017-01-06T00:00:00.000+0000,806.1500244140625,0.0
2017-01-09T00:00:00.000+0000,806.6500244140625,0.0
2017-01-10T00:00:00.000+0000,804.7899780273438,0.0
2017-01-11T00:00:00.000+0000,807.9099731445312,0.0
2017-01-12T00:00:00.000+0000,806.3599853515625,0.0
2017-01-13T00:00:00.000+0000,807.8800048828125,0.0
2017-01-17T00:00:00.000+0000,804.6099853515625,801.1410034179687


In [None]:
final_df = spark.createDataFrame(result_df.tail(df.count()-9), result_df.schema)
display(final_df)

Date,Close,EMA
2017-01-17T00:00:00.000+0000,804.6099853515625,801.1410034179687
2017-01-18T00:00:00.000+0000,806.0700073242188,802.0371859463778
2017-01-19T00:00:00.000+0000,802.1749877929688,802.062240827576
2017-01-20T00:00:00.000+0000,805.02001953125,802.6000187736986
2017-01-23T00:00:00.000+0000,819.3099975585938,805.6381967345885
2017-01-24T00:00:00.000+0000,823.8699951171875,808.9530691677883
2017-01-25T00:00:00.000+0000,835.6699829101562,813.8106898482188
2017-01-26T00:00:00.000+0000,832.1500244140625,817.1451143147358
2017-01-27T00:00:00.000+0000,823.3099975585938,818.2660021772554
2017-01-30T00:00:00.000+0000,802.3200073242188,815.3667303857941


We split this DataFrame to obtain a train and a test DataFrame

In [None]:
trainDF, testDF = final_df.randomSplit([.8, .2], seed=42)
trainDF.count()

Out[8]: 782

We use the MLlib form Pyspark to create a regression model and use it on our DataFrame

In [None]:
from pyspark.sql.functions import col, log, exp
from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

logTrainDF = trainDF.withColumn("log_EMA", log(col("EMA")))
logTestDF = testDF.withColumn("log_EMA", log(col("EMA")))

rFormula = RFormula(formula="log_EMA ~ . - EMA", featuresCol="features", labelCol="log_EMA", handleInvalid="skip") 

lr = LinearRegression(labelCol="log_EMA", predictionCol="log_pred")

pipeline = Pipeline(stages = [rFormula, lr])
pipelineModel = pipeline.fit(logTrainDF)
predDF = pipelineModel.transform(logTestDF)
expDF = predDF.withColumn("prediction", exp(col("log_pred")))
display(expDF)

Date,Close,EMA,log_EMA,features,log_pred,prediction
2017-01-19T00:00:00.000+0000,802.1749877929688,802.062240827576,6.687186211872174,"Map(vectorType -> dense, length -> 1, values -> List(802.1749877929688))",6.746635919438638,851.1904673379695
2017-01-25T00:00:00.000+0000,835.6699829101562,813.8106898482188,6.701727771200435,"Map(vectorType -> dense, length -> 1, values -> List(835.6699829101562))",6.773805696480784,874.6341603583219
2017-01-27T00:00:00.000+0000,823.3099975585938,818.2660021772554,6.707187469776992,"Map(vectorType -> dense, length -> 1, values -> List(823.3099975585938))",6.763779779322975,865.9089628640434
2017-02-03T00:00:00.000+0000,801.489990234375,806.0948136663695,6.692201370409993,"Map(vectorType -> dense, length -> 1, values -> List(801.489990234375))",6.746080277293407,850.7176414138683
2017-02-13T00:00:00.000+0000,819.239990234375,810.0153913176689,6.697053249112708,"Map(vectorType -> dense, length -> 1, values -> List(819.239990234375))",6.760478355000563,863.0549437094351
2017-02-17T00:00:00.000+0000,828.0700073242188,817.5324230881939,6.706290563254883,"Map(vectorType -> dense, length -> 1, values -> List(828.0700073242188))",6.767640905547893,869.2588095977783
2017-02-28T00:00:00.000+0000,823.2100219726562,825.057776925651,6.715453416519663,"Map(vectorType -> dense, length -> 1, values -> List(823.2100219726562))",6.7636986831959245,865.8387438480609
2017-03-08T00:00:00.000+0000,835.3699951171875,829.8167317752228,6.721204871319518,"Map(vectorType -> dense, length -> 1, values -> List(835.3699951171875))",6.7735623585903975,874.4213546197805
2017-03-22T00:00:00.000+0000,829.5900268554688,839.3768416523133,6.732659761350822,"Map(vectorType -> dense, length -> 1, values -> List(829.5900268554688))",6.7688738835381645,870.331247588218
2017-03-23T00:00:00.000+0000,817.5800170898438,835.4137826409551,6.727927150180458,"Map(vectorType -> dense, length -> 1, values -> List(817.5800170898438))",6.759131852334267,861.8936199647931


At the end, we evalue our results.

In [None]:
regressionEvaluator = RegressionEvaluator(labelCol="EMA", predictionCol="prediction")
rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 39.44250331548785
R2 is 0.9623560385420282
