In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import StructType, StructField, LongType

import statsmodels.tsa.stattools as ts

Helper functions

In [None]:
def data_for_modeling(data, forecast_months):
  # create label 
  lead_window = Window.rowsBetween(0,forecast_months)   
  data = data.withColumn("label", last(data['sunspots']).over(lead_window))
  
  # vector assembler
  data = data.dropna()
  assembler = VectorAssembler().setInputCols(features).setOutputCol("features")
  data_transformed = assembler.transform(data)

  return(data_transformed)

In [None]:
def create_ts_vars(data, n_lags, features):

  ## LAG VARS ######################################################################
  # add col to partition by
  data = data.withColumn("Series",lit('Univariate'))

  # set up window
  lag_window = Window.orderBy("Series")

  for i in range(n_lags):
      str_lag = 'sunspots'+'_lag_'+str(i+1)
      data = data.withColumn(str_lag, lag(data['sunspots'], i+1).over(lag_window))
      features.append(str_lag)
      
  data = data.drop('Series')   

  ## MOVING AVG VAR ##############################################################
  mavg_window = Window.rowsBetween(-n_lags, 0)
  str_mov_avg = 'sunspots'+'_' + str(n_lags)+'_moving_avg'
  data = data.withColumn(str_mov_avg, avg(data['sunspots']).over(mavg_window))
  features.append(str_mov_avg)

  ## TREND VAR ###################################################################
  # if current sunspots > time-lagged sunspots then sign = +1.0
  # if current sunspots < time-lagged sunspots then sign = -1.0
  data = data.withColumn("Series",lit('Univariate'))       
  trend_window = Window.orderBy("Series")
  for i in range(n_lags):
      str_sign = 'sunspots' +'_lag_'+str(i+1)+'_sign'
      data = data.withColumn(str_sign,\
                          signum((data['sunspots'] - lag(data['sunspots'],i+1).over(trend_window))))
      features.append(str_sign)
      
  data = data.drop("Series")
  
  return(data)

In [None]:
def split_time_series(data, train_ratio=0.7):
     
    # split data into train and test but maintain time-order
    newSchema  = StructType(data.schema.fields + \
                [StructField('Row Number', LongType(), False)])
    new_rdd = data.rdd.zipWithIndex().map(lambda x: list(x[0]) + [x[1]])

    # create new df with row number
    new_df = spark.createDataFrame(new_rdd, newSchema)
    total_rows = new_df.count()
    splitFraction  =int(total_rows*train_ratio)

    df_train = new_df.where(new_df['Row Number'] >= 0)\
                   .where(new_df['Row Number'] <= splitFraction)
    df_test = new_df.where(new_df['Row Number'] > splitFraction)
    
    return df_train, df_test

In [None]:
def lr_fxn(train, test):
  # init model
  lr = LinearRegression(featuresCol = "features", labelCol="label", \
                                maxIter = 100, regParam = 0.4, \
                                elasticNetParam = 0.1)
  
  # fit model 
  model = lr.fit(train)
  pred_train = model.transform(train)
  pred_test = model.transform(test)
  

  # calc rmse
  evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName ="rmse")
  RMSE_train = evaluator.evaluate(pred_train)
  RMSE_test= evaluator.evaluate(pred_test)

  return train, test, pred_train, pred_test, RMSE_train, RMSE_test

Set up data for baseline (no lag)

In [None]:
n_lags = 12*0
data_lag = data.select('*')
features = ['sunspots']
forecast_months = 10

data_lag = create_ts_vars(data_lag, n_lags, features)
data_lag.show(10)

data_transformed = data_for_modeling(data_lag, forecast_months)

train, test = split_time_series(data_transformed, 0.7)

Run Baseline

In [None]:
lr_train, lr_test, lr_pred_train, lr_pred_test, lr_RMSE_train, lr_RMSE_test = lr_fxn(train, test)

lr_pred_train.select(['date', 'label','prediction']).show(5)

In [None]:
lr_RMSE_test

In [None]:
lr_RMSE_train