####This notebook walks through the approach 1 : Model with all features
- to run this notebook please have the dataset "category_data" uploaded
- While uploading the above dataset make sure to set the schema as the following 
- Schema : _Date:timestamp, d:double, wm_yr_wk:double, weekday:string, month:double, year:double, Hobbies:double, Foods:double, HouseHold:double, event_name_1:string, event_type_1:string, event_name_2:string, event_type_2:string_

In [2]:
import random
random.seed(50)

import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
from datetime import datetime
from pyspark.sql.functions import udf, split, when, isnan, count, col, monotonically_increasing_id
from pyspark.sql.types import DateType, IntegerType, DoubleType, FloatType, StringType
from pyspark.sql import *
from pyspark.sql.window import Window
from pyspark.sql import functions as f
from sklearn.metrics import mean_squared_error
from pyspark.ml.feature import *
from pyspark.ml.feature import CountVectorizer, VectorIndexer, VectorAssembler,StringIndexer, OneHotEncoder, VectorSlicer
from pyspark.ml import Pipeline, feature
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.base import Estimator
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasOutputCol

#####User defined functions

In [4]:
#counting null values 
def check_null(dataset):
  null_count = dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns])
  return (null_count.show())

# Convert data in column to an array so as to use with Count Vectorizer
def convert_col_to_array(df,new_col,orig_col, split_char):
  return(df.withColumn(new_col,split(col(orig_col),split_char)))

# Count Vectorizer
def initialize_Count_Vectorizer(input_Col, output_Col):
  return(CountVectorizer(inputCol = input_Col, outputCol = output_Col))

#SMape error metric
def smape(A, F):
    return 100/len(A) * np.sum(2 * np.abs(F - A) / (np.abs(A) + np.abs(F)))

#####Importing Data

In [6]:
#importing the data : 
df = spark.table("category_data")
display(df)

Date,d,wm_yr_wk,weekday,month,year,Hobbies,Foods,HouseHold,event_name_1,event_type_1,event_name_2,event_type_2
2011-01-29T00:00:00.000+0000,1.0,11101.0,Saturday,1.0,2011.0,3764.0,23178.0,5689.0,,,,
2011-01-30T00:00:00.000+0000,2.0,11101.0,Sunday,1.0,2011.0,3357.0,22758.0,5634.0,,,,
2011-01-31T00:00:00.000+0000,3.0,11101.0,Monday,1.0,2011.0,2682.0,17174.0,3927.0,,,,
2011-02-01T00:00:00.000+0000,4.0,11101.0,Tuesday,2.0,2011.0,2669.0,18878.0,3865.0,,,,
2011-02-02T00:00:00.000+0000,5.0,11101.0,Wednesday,2.0,2011.0,1814.0,14603.0,2729.0,,,,
2011-02-03T00:00:00.000+0000,6.0,11101.0,Thursday,2.0,2011.0,3220.0,22093.0,3898.0,,,,
2011-02-04T00:00:00.000+0000,7.0,11101.0,Friday,2.0,2011.0,2944.0,20490.0,4576.0,,,,
2011-02-05T00:00:00.000+0000,8.0,11102.0,Saturday,2.0,2011.0,3986.0,27751.0,6195.0,,,,
2011-02-06T00:00:00.000+0000,9.0,11102.0,Sunday,2.0,2011.0,2899.0,24862.0,4975.0,SuperBowl,Sporting,,
2011-02-07T00:00:00.000+0000,10.0,11102.0,Monday,2.0,2011.0,2615.0,18901.0,4056.0,,,,


##### Checking & replacing nulls in the given dataset
- The null values in columns event_name_1 and 2 and event_type_1 and 2 represents that there were no events hence they are replaced with string " No event"

In [8]:
print("Null values in the dataframe")
check_null(df)
print("                                                      ")
print("After replaing Null values in the dataframe")
#replacing null with string value 'No_event' in respective columns
df = df.na.fill({'event_name_1': 'No event','event_name_2': 'No event','event_type_1': 'No event','event_type_2': 'No event' })
check_null(df)

#####Feature Generation
- "no_of_events" - This column gives the total number of events for each day by summing the columns event_name_1 and 2
- More features are generated below

In [10]:
def count_events(c1, c2):
  if c1 != "No event" and c2 != "No event":
    return 2
  elif c1 == "No event" and c2 != "No event":
    return 1
  elif c1 != "No event" and c2 == "No event":
    return 1
  else:
    return 0

combineUDF = udf(count_events)
expr = [c for c in df] + [combineUDF(col("event_name_1"), col("event_name_2")).alias("no_of_events").cast(DoubleType())]
df = df.select(*expr)

In [11]:
display(df)

Date,d,wm_yr_wk,weekday,month,year,Hobbies,Foods,HouseHold,event_name_1,event_type_1,event_name_2,event_type_2,no_of_events
2011-01-29T00:00:00.000+0000,1.0,11101.0,Saturday,1.0,2011.0,3764.0,23178.0,5689.0,No event,No event,No event,No event,0.0
2011-01-30T00:00:00.000+0000,2.0,11101.0,Sunday,1.0,2011.0,3357.0,22758.0,5634.0,No event,No event,No event,No event,0.0
2011-01-31T00:00:00.000+0000,3.0,11101.0,Monday,1.0,2011.0,2682.0,17174.0,3927.0,No event,No event,No event,No event,0.0
2011-02-01T00:00:00.000+0000,4.0,11101.0,Tuesday,2.0,2011.0,2669.0,18878.0,3865.0,No event,No event,No event,No event,0.0
2011-02-02T00:00:00.000+0000,5.0,11101.0,Wednesday,2.0,2011.0,1814.0,14603.0,2729.0,No event,No event,No event,No event,0.0
2011-02-03T00:00:00.000+0000,6.0,11101.0,Thursday,2.0,2011.0,3220.0,22093.0,3898.0,No event,No event,No event,No event,0.0
2011-02-04T00:00:00.000+0000,7.0,11101.0,Friday,2.0,2011.0,2944.0,20490.0,4576.0,No event,No event,No event,No event,0.0
2011-02-05T00:00:00.000+0000,8.0,11102.0,Saturday,2.0,2011.0,3986.0,27751.0,6195.0,No event,No event,No event,No event,0.0
2011-02-06T00:00:00.000+0000,9.0,11102.0,Sunday,2.0,2011.0,2899.0,24862.0,4975.0,SuperBowl,Sporting,No event,No event,1.0
2011-02-07T00:00:00.000+0000,10.0,11102.0,Monday,2.0,2011.0,2615.0,18901.0,4056.0,No event,No event,No event,No event,0.0


In [12]:
# dropping columns we won't be considering for model development
cat_df = df.drop("Date","event_name_1","event_name_2")

#####One hot encoding of categorical columns
- The columns weekday, month, year, event_type_1 and event_type_2 are one hot encoded

In [14]:
#One Hot Encoding of categorical columns

OHE_list= ['weekday','month','year','event_type_1','event_type_2']

# Generate OHE for elements in the list above
for element in OHE_list:
  print(element)
  
#Convert String to Array so that OHE can run (requires an array as an input)
  cat_df = convert_col_to_array(cat_df, element + '_array', element,' ')
  
#Initialize Count vectorizer
  elementVectorizer = initialize_Count_Vectorizer(element + '_array',element + '_OHE')
  
#Fit a vectorizer model
  elementVectorizer_model = elementVectorizer.fit(cat_df)
  
#Transform Data
  cat_df = elementVectorizer_model.transform(cat_df)
  
#Specify Extraneous Cols to drop 
  columns_to_drop = [element, element + '_array']
  
#Drop Extraneous Cols listed above
  cat_df = cat_df.drop(*columns_to_drop)

In [15]:
#dataset after OHE
display(cat_df)

d,wm_yr_wk,Hobbies,Foods,HouseHold,no_of_events,weekday_OHE,month_OHE,year_OHE,event_type_1_OHE,event_type_2_OHE
1.0,11101.0,3764.0,23178.0,5689.0,0.0,"List(0, 7, List(1), List(1.0))","List(0, 12, List(4), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
2.0,11101.0,3357.0,22758.0,5634.0,0.0,"List(0, 7, List(0), List(1.0))","List(0, 12, List(4), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
3.0,11101.0,2682.0,17174.0,3927.0,0.0,"List(0, 7, List(2), List(1.0))","List(0, 12, List(4), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
4.0,11101.0,2669.0,18878.0,3865.0,0.0,"List(0, 7, List(6), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
5.0,11101.0,1814.0,14603.0,2729.0,0.0,"List(0, 7, List(3), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
6.0,11101.0,3220.0,22093.0,3898.0,0.0,"List(0, 7, List(5), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
7.0,11101.0,2944.0,20490.0,4576.0,0.0,"List(0, 7, List(4), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
8.0,11102.0,3986.0,27751.0,6195.0,0.0,"List(0, 7, List(1), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
9.0,11102.0,2899.0,24862.0,4975.0,1.0,"List(0, 7, List(0), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"
10.0,11102.0,2615.0,18901.0,4056.0,0.0,"List(0, 7, List(2), List(1.0))","List(0, 12, List(3), List(1.0))","List(0, 6, List(4), List(1.0))","List(0, 6, List(0, 1), List(1.0, 1.0))","List(0, 4, List(0, 1), List(1.0, 1.0))"


#####Defining Time Horizons
The datasets are prepared and subsetted for the over three time horizon for each of the three catergories
1. Day
2. Week
3. Month

In [17]:
#function for creating lags in the dataset

def time_horizon(input_df, lag):
  window = Window.orderBy("d")
  output_df_name = input_df.withColumn("lag_Hobbies",f.lag(col("Hobbies"),lag).over(window)).withColumn("lag_Foods",f.lag(col("Foods"),lag).over(window)).withColumn("lag_HH",f.lag(col("HouseHold"),lag).over(window)).drop("Hobbies","Foods","HouseHold")
  return(output_df_name)

day_df = time_horizon(cat_df,-1)
week_df = time_horizon(cat_df,-7)
month_df = time_horizon(cat_df,-28)

In [18]:
#Formulating target specific datasets
#Subsetting dataframes for day, week, month over all categories
def sub_data(dataset,category):
  list1 = ['lag_Foods', 'lag_HH']
  list2 = ['lag_Hobbies', 'lag_HH']
  list3 = ['lag_Hobbies', 'lag_Foods']
  Hobbies_x = dataset.drop(*list1)
  Foods_x = dataset.drop(*list2)
  HH_x = dataset.drop(*list3)
  if category == "Hobbies":
     return (Hobbies_x)
  elif category == "Foods":
     return (Foods_x)
  elif category == "HouseHold":
     return (HH_x)
  else:
    return(print("Enter either Hobbies, Foods or HouseHold"))

Hobbies_day = sub_data(day_df,"Hobbies")
Foods_day = sub_data(day_df,"Foods")
HH_day = sub_data(day_df,"HouseHold")

Hobbies_week = sub_data(week_df,"Hobbies")
Foods_week = sub_data(week_df,"Foods")
HH_week = sub_data(week_df,"HouseHold")

Hobbies_month = sub_data(month_df,"Hobbies")
Foods_month = sub_data(month_df,"Foods")
HH_month = sub_data(month_df,"HouseHold")

#####Feature Generation
The following 10 features are generated representing the sales quantity in past for different time range,
>>1. __Pre_d1__ : the sales quantity one day before the target
2.  __Pre_d2__ : the sales quantity two days before the target
3.  __Pre_d3__ : the sales quantity three days before the target
4.  __Pre_d4__ : the sales quantity four days before the target
5.  __Pre_d5__ : the sales quantity five days before the target
6.  __Pre_d6__ : the sales quantity six days before the target
7.  __Pre_d7__ : the sales quantity seven days before the target
8.  __Pre_d10__ : the sales quantity ten days before the target
9.  __Pre_d14__ : the sales quantity fourteen days before the target
10.  __Pre_d28__ : the sales quantity twenty eight days before the target

All of these generated features along with rest of features already available will be passed through a feature selector and the model will be implemented with the auto  selected features

In [20]:
def generate_feat(df1,df2,column):
    window = Window.orderBy("d")
    x = df1.withColumn("Pre_d1",f.lag(col(column),1).over(window)).withColumn("Pre_d2",f.lag(col(column),2).over(window)).withColumn("Pre_d3",f.lag(col(column),3).over(window)).withColumn("Pre_d4",f.lag(col(column),4).over(window)).withColumn("Pre_d5",f.lag(col(column),5).over(window)).withColumn("Pre_d6",f.lag(col(column),6).over(window)).withColumn("Pre_d7",f.lag(col(column),7).over(window)).withColumn("Pre_d10",f.lag(col(column),10).over(window)).withColumn("Pre_d14",f.lag(col(column),14).over(window)).withColumn("Pre_d28",f.lag(col(column),28).over(window))
    x_sub = x.select("d","Pre_d1","Pre_d2","Pre_d3","Pre_d4","Pre_d5","Pre_d6","Pre_d7","Pre_d10","Pre_d14","Pre_d28")
    df2 = df2.join(x_sub, x_sub.d == df2.d).drop("d")
    return(df2)
 
Hob_day = generate_feat(cat_df,Hobbies_day,"Hobbies").dropna()
F_day   = generate_feat(cat_df,Foods_day,"Foods").dropna()
HH_day  = generate_feat(cat_df,HH_day,"HouseHold").dropna()

Hob_week = generate_feat(cat_df,Hobbies_week,"Hobbies").dropna()
F_week   = generate_feat(cat_df,Foods_week,"Foods").dropna()
HH_week  = generate_feat(cat_df,HH_week,"HouseHold").dropna()

Hob_month = generate_feat(cat_df,Hobbies_month,"Hobbies").dropna()
F_month = generate_feat(cat_df,Foods_month,"Foods").dropna()
HH_month = generate_feat(cat_df,HH_month,"HouseHold").dropna()

##### Model implemetation
###### Predicting over each category for the time horizons 1 day, 7 days and 28 days using the following algorithm
1. Gradient Boosting
2. Random Forest

NOTE : 
- The following set of code for each algorithm involves a increasing rolling window train test split with a split of 3
- Further Hyper parameter tuning is performed using Grid search and the parameters which give the least error value is chosen

###### HOBBIES : 1 DAY

In [23]:
# Specify Input Columns that will become features in our final models
featuresCols = Hob_day.columns
featuresCols.remove('lag_Hobbies')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(Hob_day) # Transform dataframe into features
final_Hobbies = output.select("features", "lag_Hobbies") # Generate final output two cols: one with features and the other with the target i.e Hobbies

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Hobbies = final_Hobbies.select(col("features"),col("lag_Hobbies").alias("Hobbies_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Hobbies)

final_Hobbies = final_Hobbies.select("*").withColumn("id", monotonically_increasing_id())

In [24]:
#GBT for Hobbies 1 day horizon
import numpy as np
import pandas as pd
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Hobbies_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [25]:
#RF for Hobbies 1 day horizon
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Hobbies_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### HOBBIES : 7 DAY

In [27]:
# Specify Input Columns that will become features in our final models
featuresCols = Hob_week.columns
featuresCols.remove('lag_Hobbies')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(Hob_week) # Transform dataframe into features
final_Hobbies = output.select("features", "lag_Hobbies") # Generate final output two cols: one with features and the other with the target i.e Hobbies

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Hobbies = final_Hobbies.select(col("features"),col("lag_Hobbies").alias("Hobbies_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Hobbies)

final_Hobbies = final_Hobbies.select("*").withColumn("id", monotonically_increasing_id())

In [28]:
#GBT for Hobbies 7 days horizon
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Hobbies_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [29]:
#RF for Hobbies 7 days horizon
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Hobbies_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### HOBBIES : 28 DAY

In [31]:
# Specify Input Columns that will become features in our final models
featuresCols = Hob_month.columns
featuresCols.remove('lag_Hobbies')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(Hob_month) # Transform dataframe into features
final_Hobbies = output.select("features", "lag_Hobbies") # Generate final output two cols: one with features and the other with the target i.e Hobbies

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Hobbies = final_Hobbies.select(col("features"),col("lag_Hobbies").alias("Hobbies_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Hobbies)

final_Hobbies = final_Hobbies.select("*").withColumn("id", monotonically_increasing_id())

In [32]:
#GBT for Hobbies 28 days horizon
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Hobbies_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [33]:
#RF for Hobbies 28 days horizon
Hobbies = final_Hobbies.drop('id')
X = np.array(Hobbies.select('features','Hobbies_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Hobbies.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Hobbies.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Hobbies_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Hobbies_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print RMSE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### FOODS : 1 DAY

In [35]:
# Specify Input Columns that will become features in our final models
featuresCols = F_day.columns
featuresCols.remove('lag_Foods')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(F_day) # Transform dataframe into features
final_Foods = output.select("features", "lag_Foods") # Generate final output two cols: one with features and the other with the target i.e Foods

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Foods = final_Foods.select(col("features"),col("lag_Foods").alias("Foods_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Foods)

final_Foods = final_Foods.select("*").withColumn("id", monotonically_increasing_id())

In [36]:
#GBT for Foods 1 day horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Foods_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [37]:
#RF for Foods 1 day horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Foods_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### FOODS : 7 DAY

In [39]:
# Specify Input Columns that will become features in our final models
featuresCols = F_week.columns
featuresCols.remove('lag_Foods')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(F_week) # Transform dataframe into features
final_Foods = output.select("features", "lag_Foods") # Generate final output two cols: one with features and the other with the target i.e Foods

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Foods = final_Foods.select(col("features"),col("lag_Foods").alias("Foods_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Foods)

final_Foods = final_Foods.select("*").withColumn("id", monotonically_increasing_id())

In [40]:
#GBT for Foods 7 days horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Foods_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [41]:
#RF for Foods 7 days horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Foods_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### FOODS : 28 DAY

In [43]:
# Specify Input Columns that will become features in our final models
featuresCols = F_month.columns
featuresCols.remove('lag_Foods')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(F_month) # Transform dataframe into features
final_Foods = output.select("features", "lag_Foods") # Generate final output two cols: one with features and the other with the target i.e Foods

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_Foods = final_Foods.select(col("features"),col("lag_Foods").alias("Foods_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_Foods)

final_Foods = final_Foods.select("*").withColumn("id", monotonically_increasing_id())

In [44]:
#GBT for Foods 28 days horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="Foods_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [45]:
#RF for Foods 28 days horizon
Foods = final_Foods.drop('id')
X = np.array(Foods.select('features','Foods_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_Foods.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_Foods.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="Foods_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('Foods_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### HOUSEHOLD : 1 DAY

In [47]:
# Specify Input Columns that will become features in our final models
featuresCols = HH_day.columns
featuresCols.remove('lag_HH')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(HH_day) # Transform dataframe into features
final_HH = output.select("features", "lag_HH") # Generate final output two cols: one with features and the other with the target i.e HH

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_HH = final_HH.select(col("features"),col("lag_HH").alias("HH_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_HH)

final_HH = final_HH.select("*").withColumn("id", monotonically_increasing_id())

In [48]:
#GBT for HouseHold 1 day horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="HH_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [49]:
#RF for HouseHold 1 day horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="HH_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### HOUSEHOLD : 7 DAY

In [51]:
# Specify Input Columns that will become features in our final models
featuresCols = HH_week.columns
featuresCols.remove('lag_HH')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(HH_week) # Transform dataframe into features
final_HH = output.select("features", "lag_HH") # Generate final output two cols: one with features and the other with the target i.e HH

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_HH = final_HH.select(col("features"),col("lag_HH").alias("HH_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_HH)

final_HH = final_HH.select("*").withColumn("id", monotonically_increasing_id())

In [52]:
#GBT for HouseHold 7 days horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="HH_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [53]:
#RF for HouseHold 7 days horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="HH_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list

###### HOUSEHOLD : 28 DAY

In [55]:
# Specify Input Columns that will become features in our final models
featuresCols = HH_month.columns
featuresCols.remove('lag_HH')

#Build Features using the feature assembler
feature_assembler = VectorAssembler(inputCols = featuresCols, outputCol = "features")
output = feature_assembler.transform(HH_month) # Transform dataframe into features
final_HH = output.select("features", "lag_HH") # Generate final output two cols: one with features and the other with the target i.e HH

#Rename Hobbies Column to Hobbies_quantity to use with ML models in PySpark
final_HH = final_HH.select(col("features"),col("lag_HH").alias("HH_quantity"))

#Build Feature Indexer:
feature_Indexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures',maxCategories = 12).fit(final_HH)

final_HH = final_HH.select("*").withColumn("id", monotonically_increasing_id())

In [56]:
#GBT for HouseHold 28 days horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
maxDepth = (2,5)
maxIter = (10,20)
GBT_feature_list = []
for train_index, test_index in splits.split(X):                
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in maxDepth:
    for j in maxIter:
      GBT = GBTRegressor(labelCol="HH_quantity",maxDepth = i, maxIter = j)
      pipeline = Pipeline(stages = [feature_Indexer, GBT])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=GBT.getLabelCol(), predictionCol=GBT.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      GBT_feature_list.append(f)
GBT_feature_list 

In [57]:
#RF for HouseHold 28 days horizon
HH = final_HH.drop('id')
X = np.array(HH.select('features','HH_quantity').collect())

from sklearn.model_selection import TimeSeriesSplit
from sklearn import datasets, ensemble
splits = TimeSeriesSplit(n_splits=3)
numTrees = [5,15,30,50]
maxDepth = [5,10,15]
RF_feature_list = []
for train_index, test_index in splits.split(X):
  train = X[train_index]
  test = X[test_index]
  train_df_pyspark = final_HH.where(col("id").between(0, len(train))).drop('id')
  test_df_pyspark = final_HH.where(col("id").between(len(train)+1, len(train)+len(test))).drop('id')
  for i in numTrees:
    for j in maxDepth:
      RF = RandomForestRegressor(labelCol="HH_quantity",numTrees = i, maxDepth = j)
      pipeline = Pipeline(stages = [feature_Indexer, RF])
      model = pipeline.fit(train_df_pyspark)
      predictions = model.transform(test_df_pyspark)
      e = RegressionEvaluator(metricName="r2", labelCol=RF.getLabelCol(), predictionCol=RF.getPredictionCol())
      r2= e.evaluate(predictions)
      A = np.array(predictions.select('HH_quantity').collect())
      F = np.array(predictions.select('prediction').collect())
      #Print R2 and SMAPE for the current regressor along with the regression model  
      s = smape(A,F)
      f = [i,j,r2,s]
      RF_feature_list.append(f)
RF_feature_list