-
Notifications
You must be signed in to change notification settings - Fork 0
/
SparkML.py
305 lines (236 loc) · 12.1 KB
/
SparkML.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import sys
import time
print(time.strftime('%Y/%m/%d %H:%M'))
print('OS:', sys.platform)
print('Python:', sys.version)
print('Number of nodes on the cluster:', sc._jsc.sc().getExecutorMemoryStatus().size())
#################################################################################################################
##### Data Preparation
# Vectorizing a training set before feeding into a ML model
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
# Specifying the name of the column containing the label
labelColumn = 'price'
# Specifying the names of the columns containing the features
featureColumns = ['previous_hour_price']
# Assembling the vectors and outputting the training set
assembler = VectorAssembler(
inputCols=featureColumns,
outputCol='features')
output = assembler.transform(df)
trainingDataset = output.select('features', col(labelColumn).alias('label'))
# One Hot Encoding
def one_hot_encode(column, dataframe):
'''
Returns a dataframe with an additional one hot encoded column specified on the input
'''
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# Indexing the column before one hot encoding
stringIndexer = StringIndexer(inputCol=column, outputCol='categoryIndex')
model = stringIndexer.fit(dataframe)
indexed = model.transform(dataframe)
# One hot encoding the column
encoder = OneHotEncoder(inputCol='categoryIndex', outputCol=column+'_one_hot')
encoded = encoder.transform(indexed).drop('categoryIndex')
return encoded
# Adding a lag variable
def lag_variable(column, dataframe, partition_column, count=12):
'''
Returns a dataframe with an additional lag column specified from the input
'''
import pyspark.sql.functions as sqlF
from pyspark.sql.window import Window
lagDF = (dataframe.withColumn('previous_hour_'+column,
sqlF.lag(dataframe[column], count=count)
.over(Window.partitionBy()
.orderBy(partition_column))))
return lagDF
#################################################################################################################
##### Cross Validation
# Train/test split
seed = 46
(trainingDF, testDF) = df.randomSplit([8.0, 2.0], seed) # 80/20 split
# Evaluating regression
# https://spark.apache.org/docs/2.2.0/ml-tuning.html
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Fitting the model
lr = LinearRegression()
model = lr.fit(trainingDF)
regEval = RegressionEvaluator().setLabelCol('label') # Instantiating & setting the label column
predictDF = model.transform(testDF) # Generating predictions
testRMSE = regEval.evaluate(predictDF) # Gathering the RMSE
print('The model had a RMSE on the test set of {0}'.format(testRMSE))
# K-folds
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
regEval = RegressionEvaluator().setLabelCol('label')
crossval = CrossValidator(estimator=model,
evaluator=regEval,
numFolds=3)
# Re-fitting on the entire training set
cvModel = crossval.fit(trainingDF)
# Sliding window for time series model evaluation
def sliding_window_evaluation(dataframe, feature_columns, num_windows=5, test_size=0.2):
'''
Takes an input dataframe, splits it into partitions, and performs a sliding window where
each partition is split between a train/test set and a linear regression is trained
and evaluated
Meant for analyzing the performance of a time series regression forecasting model as a random
split is not appropriate in a time series setting
'''
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Gathering statistics for window partitions and train/test splits
total_rows = dataframe.count()
window_size = round(total_rows / num_windows)
num_training_rows = round((dataframe.count() * (1 - test_size)) / num_windows)
# Creating a column for partition numbers
dataframe = (dataframe.withColumn('window_num', ((sqlF.row_number().over(Window.orderBy('date_time_resampled')) - 1) / window_size) + 1)
.withColumn('window_num', sqlF.floor(col('window_num')))) # Truncating to integers
# Specifying the name of the column containing the label
labelColumn = 'price'
# Assembling the vectors and outputting the training set
assembler = VectorAssembler(
inputCols=feature_columns,
outputCol='features')
output = assembler.transform(dataframe)
vectorizedDF = output.select('features', col(labelColumn).alias('label'), 'window_num')
# Gathering the total RMSE from all windows
total_RMSE = []
# Looping over windows, splitting into train/test sets, and training and evaluating a model on each set
for window in range(1, num_windows+1):
# Subsetting the dataframe into the window
dataWindow = vectorizedDF.filter(col('window_num') == window).drop('window_num')
# Splitting into train/testing sets
trainWindow = sqlContext.createDataFrame(dataWindow.head(num_training_rows), dataWindow.schema)
testWindow = dataWindow.subtract(trainWindow)
# Fitting the model
# Using L1 regularization for automatic feature selection
lr = LinearRegression(elasticNetParam=1.0, regParam=0.03)
model = lr.fit(trainWindow)
# Gathering evaluation and summary metrics
modelSummary = model.summary
# Creating a plot of the predictions and actuals to see if there is a significant lag
predictDF = model.transform(testWindow) # Generating predictions
total_RMSE.append(testRMSE)
fig, ax = plt.subplots()
ax.plot(predictDF.select('label').collect(), label='Label')
ax.plot(predictDF.select('prediction').collect(), label='Prediction')
plt.legend()
plt.title('Test Set: Predictions and Actuals')
# Reporting results
print('Window', window)
print('Training Size:', trainWindow.count())
print('Testing Size:', testWindow.count())
print("r2: %f" % modelSummary.r2)
print("Training RMSE: %f" % modelSummary.rootMeanSquaredError)
plt.show() # Plot of actuals vs predictions
print()
print('Average RMSE for {0} windows: {1}'.format(num_windows, np.mean(total_RMSE)))
feature_columns = ['previous_hour_price', 'previous_hour_high_low_range', 'previous_hour_volume']
sliding_window_evaluation(dataframe=test, feature_columns=feature_columns, num_windows=3, test_size=0.2)
#################################################################################################################
##### Hyperparameter Tuning
# Grid Search - Spark ML
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Defining our parameter grid
paramGrid = (ParamGridBuilder()
.addGrid(randomForest.numTrees, [10, 30, 100, 300])
.addGrid(randomForest.maxDepth, [3, None])
.build()
)
# Cross validation with the parameter grid
crossval = CrossValidator(estimator=randomForest,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=3)
# Reporting the number of nodes on the cluster
print('Number of nodes on the cluster:', sc._jsc.sc().getExecutorMemoryStatus().size())
# Performing the grid search
cvModel = crossval.fit(trainingDataset)
# Grabbing the best parameters
bestModelParams = cvModel.bestModel._java_obj.parent()
# Reporting the best obtained parameters
print('Hyperparameters for the best model:')
print('Number of Trees:', bestModelParams.getNumTrees())
print('Max Depth:', bestModelParams.getMaxDepth())
# Grid Search - Spark Scikit-Learn
import spark_sklearn
from sklearn.ensemble import RandomForestClassifier
# Ensuring our Spark context exists in the sc variable
# This is likely unnecessary in a Databricks cluster
sc = pyspark.SparkContext.getOrCreate()
randomForest = RandomForestClassifier()
# Defining our parameter grid
parameters = {'n_estimators': [10, 30, 100, 300],
'max_depth': [3, None]
'max_features': [1, 3, None],
'min_samples_leaf': [1, 3, 10],
'bootstrap': [True, False],
'criterion': ['gini', 'entropy']}
# Cross validation with the parameter grid
model = spark_sklearn.GridSearchCV(sc, randomForest, parameters, refit=True, cv=3)
# Reporting the cluster size
print('Number of nodes on the cluster:', sc._jsc.sc().getExecutorMemoryStatus().size())
# Performing the grid search
model.fit(X, y)
# Reporting the parameters of the best model
model.best_estimator_
#################################################################################################################
##### Model Diagnostics
# Summary of a typical model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(trainingSet)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
#################################################################################################################
##### Model Training
# Prediction Intervals with Quantile Regression in MML/LightGBM
def prediction_with_intervals(train, test, confidence_level=0.95):
'''
Trains LightGBM models and creates predictions on the data
Input:
- train: The training set with the label and vectorized features
- test: The testing set with the label and vectorized features
- confidence_level: The percent confidence level for the quantile regression
Output:
- A Spark dataframe with the point estimates and upper/lower bounds
TODO: Add more inputs for hyperparameter tuning
'''
from mmlspark import LightGBMRegressor
# Calculating the upper/lower buffer for the quantile regressions
alpha_buffer = (1 - confidence_level) / 2
# Training all three models
# Lower bound of 95% confidence interval
model_lower_bound = LightGBMRegressor(application='quantile',
alpha=confidence_level - alpha_buffer,
learningRate=0.3).fit(train)
# Upper bound of 95% confidence interval
model_upper_bound = LightGBMRegressor(application='quantile',
alpha=confidence_level + alpha_buffer,
learningRate=0.3).fit(train)
# Point prediction
model = LightGBMRegressor(application='regression',
learningRate=0.3).fit(train)
# Scoring on the testing set and assembling the results
point_predictions = model.transform(test)
upper_predictions = model_upper_bound.transform(test).withColumnRenamed('prediction', 'UpperBound')
lower_predictions = model_lower_bound.transform(test).withColumnRenamed('prediction', 'LowerBound')
# Assembling the results
scored_data = (point_predictions.join(upper_predictions, ['label', 'features'])
.join(lower_predictions, ['label', 'features']))
return scored_data