In [11]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [12]:
spark = SparkSession.builder \
    .master("local[*]")\
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

In [13]:
from pipeline_oriented_analytics.dataframe import CsvDataFrame, ParquetDataFrame
from pipeline_oriented_analytics import Phase

features_df = ParquetDataFrame(f'../data/processed/{Phase.train.name.lower()}/features', spark)
test_data_frac = 0.1
test_features_df, train_features_df = features_df.randomSplit([test_data_frac, 1-test_data_frac])

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_col = 'duration_min'
assembler_estimator = Pipeline(stages=[
    StringIndexer(inputCol='pickup_cell_6', handleInvalid='keep', outputCol='pickup_cell_6_idx'),
    StringIndexer(inputCol='dropoff_cell_6', handleInvalid='keep', outputCol='dropoff_cell_6_idx'),
    VectorAssembler(inputCols=['pickup_cell_6_idx', 'dropoff_cell_6_idx', 'distance', 'month', 'day_of_month', 
                               'day_of_week', 'hour', 'requests_pickup_cell', 'requests_dropoff_cell'], outputCol="features")
])

In [15]:
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

dt_estimator = DecisionTreeRegressor(maxDepth=5, featuresCol='features', labelCol=label_col)
rf_estimator = RandomForestRegressor(maxDepth=5, numTrees=40, featuresCol='features', labelCol=label_col)

pipeline = Pipeline(stages=[])
dt_stages = [assembler_estimator, dt_estimator]
rf_stages = [assembler_estimator, rf_estimator]
dt_pipeline = Pipeline(stages=dt_stages)
rf_pipeline = Pipeline(stages=rf_stages)

dt_grid = ParamGridBuilder().baseOn({pipeline.stages: dt_stages})\
                            .addGrid(dt_estimator.maxDepth, [2, 5, 7, 9])\
                            .build()

rf_grid = ParamGridBuilder().baseOn({pipeline.stages: rf_stages})\
                            .addGrid(rf_estimator.maxDepth, [5, 7])\
                            .addGrid(rf_estimator.numTrees, [10, 20])\
                            .build()
grid = dt_grid + rf_grid


In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
eval_metric = 'mae'
folds = 3
print(f'Preparing {eval_metric} evaluator and {folds}-fold cross-validator...')
mae_evaluator = RegressionEvaluator(metricName=eval_metric, labelCol=label_col)
cross_val = CrossValidator(estimatorParamMaps=grid, estimator=pipeline,
                           evaluator=mae_evaluator, numFolds=folds, parallelism=4)

Preparing mae evaluator and 3-fold cross-validator...


In [17]:
from timeit import default_timer as timer
from datetime import timedelta

print(f'Training...')
start = timer()
cross_val_model = cross_val.fit(train_features_df)
end = timer()
print(f'Training complete, duration: {timedelta(seconds=end-start)}')
print(f'Best model: {cross_val_model.bestModel.stages}')

Training...
Training complete, duration: 0:04:06.047454
Best model: [PipelineModel_ae97e0395eea, DecisionTreeRegressionModel (uid=DecisionTreeRegressor_60580be5e8f0) of depth 7 with 243 nodes]


In [18]:
model_path = '../model/trip_duration_min'
print(f'Saving model to {model_path}')
cross_val_model.bestModel.write().overwrite().save(model_path)
print(f'Model saved...')


Saving model to ../model/trip_duration_min
Model saved...


In [19]:
predictions_df = cross_val_model.transform(test_features_df)
mae_cv = RegressionEvaluator(labelCol=label_col, metricName=eval_metric).evaluate(predictions_df)
print(f'Best model MAE: {mae_cv}')

Best model MAE: 6.511276324357712


In [20]:
predictions_df.groupby().agg(f.avg(label_col)).show()

+------------------+
| avg(duration_min)|
+------------------+
|15.617653213342992|
+------------------+

