# Introduction

We are interested in applying Decision Trees and Random Forests to regression problems in Spark. Here we are using a dataset with flight data and predicting the length of the delay.

We combine our data processing into a pipeline, before training a Decission Tree model and then a Random Forest. Since we are just interested in exploring these models here we do not split out data into train/test sets or evaluate the models, until we look at k-fold cross validation.

Finally we look at some simple optimization, where we run the random forest training in parallel.

# Exploring and tidying the dataset

We examine our data and drop most of its many columns. This is to simplify the experiments with the models here, there is almost certainly important information we are losing in the dropped columns. For the same reason we also drop most of the many rows. 

In [0]:
path = '/databricks-datasets/airlines/part-00000'
df = spark.read.option("header", "true").csv(path)
df.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|1987|   10|    

In [0]:
# We select our columns and drop any NA values as these cause problems for the regression.
from pyspark.sql.functions import col
reduced = df[['Year', 'Month', 'UniqueCarrier', 'ArrDelay']]
reduced = reduced.dropna()

In [0]:
reduced.show(5)

+----+-----+-------------+--------+
|Year|Month|UniqueCarrier|ArrDelay|
+----+-----+-------------+--------+
|1987|   10|           PS|      23|
|1987|   10|           PS|      14|
|1987|   10|           PS|      29|
|1987|   10|           PS|      -2|
|1987|   10|           PS|      33|
+----+-----+-------------+--------+
only showing top 5 rows



In [0]:
# Looking at the schema we can see that the delay is a string, and we need int.
reduced.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- ArrDelay: string (nullable = true)



In [0]:
reduced = reduced.select(['Year', 'Month', 'UniqueCarrier', col("ArrDelay").cast('int')])

In [0]:
# We take a very small sample of our data to speed things along.
reduced = reduced.sample(False, 0.01, seed=0)

# Decision Tree Regressor

Before running our decision tree model we need to replace any categorical data with indices, using the String Indexer. We also need to combine all feature columns in to one, and we do this with a Vector Assembler. Finally we train the model.

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

decTree = DecisionTreeRegressor(labelCol="ArrDelay")

In [0]:
# We need to use the String Indexer for our categorical data. This will replace the categories with indicies depending on frequency (most frequent gets 0, then 1...)

from pyspark.ml.feature import StringIndexer

catCols = [field for (field, dataType) in reduced.dtypes if dataType == "string"]
numericCols = []

# Name for our columns after they will be indexed
indexOutputCols = [x + "Index" for x in catCols]

stringIndexer = StringIndexer(inputCols=catCols, outputCols=indexOutputCols, handleInvalid="skip")

In [0]:
# We can see what the string indexer is doing by fitting it and then transforming the data
stringIndexerModel = stringIndexer.fit(dataset=reduced)
stringIndexedData = stringIndexerModel.transform(reduced)
stringIndexedData.show(5)

+----+-----+-------------+--------+---------+----------+------------------+
|Year|Month|UniqueCarrier|ArrDelay|YearIndex|MonthIndex|UniqueCarrierIndex|
+----+-----+-------------+--------+---------+----------+------------------+
|1987|   10|           PS|       2|      0.0|       0.0|              10.0|
|1987|   10|           PS|      -2|      0.0|       0.0|              10.0|
|1987|   10|           PS|      -4|      0.0|       0.0|              10.0|
|1987|   10|           PS|       7|      0.0|       0.0|              10.0|
|1987|   10|           PS|      10|      0.0|       0.0|              10.0|
+----+-----+-------------+--------+---------+----------+------------------+
only showing top 5 rows



In [0]:
# Our features need to be combined into one column. We do this with a Vector Assembler

from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=indexOutputCols, outputCol="features")

In [0]:
# We can see the effect of this by applying it after the string indexer (as will be done in our pipeline)
vecAssembledData = vecAssembler.transform(stringIndexedData)
vecAssembledData.show(5)

+----+-----+-------------+--------+---------+----------+------------------+--------------+
|Year|Month|UniqueCarrier|ArrDelay|YearIndex|MonthIndex|UniqueCarrierIndex|      features|
+----+-----+-------------+--------+---------+----------+------------------+--------------+
|1987|   10|           PS|       2|      0.0|       0.0|              10.0|[0.0,0.0,10.0]|
|1987|   10|           PS|      -2|      0.0|       0.0|              10.0|[0.0,0.0,10.0]|
|1987|   10|           PS|      -4|      0.0|       0.0|              10.0|[0.0,0.0,10.0]|
|1987|   10|           PS|       7|      0.0|       0.0|              10.0|[0.0,0.0,10.0]|
|1987|   10|           PS|      10|      0.0|       0.0|              10.0|[0.0,0.0,10.0]|
+----+-----+-------------+--------+---------+----------+------------------+--------------+
only showing top 5 rows



In [0]:
# The model will be trained on the new "features" column, using it to predict "ArrDelay"
vecAssembledData['features', 'ArrDelay'].show(5)

+--------------+--------+
|      features|ArrDelay|
+--------------+--------+
|[0.0,0.0,10.0]|       2|
|[0.0,0.0,10.0]|      -2|
|[0.0,0.0,10.0]|      -4|
|[0.0,0.0,10.0]|       7|
|[0.0,0.0,10.0]|      10|
+--------------+--------+
only showing top 5 rows



In [0]:
# The model has problems with missing values, so we ensure they are dropped.
vecAssembledData = vecAssembledData.dropna()

In [0]:
# Finally we fit the model to the two columns from the processed data.
decTree = decTree.fit(vecAssembledData['features', 'ArrDelay'])

In [0]:
# We can see the branches of the fitted tree here
print(decTree.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_8b270c3e97ca, depth=5, numNodes=37, numFeatures=3
  If (feature 2 in {0.0,3.0,5.0,7.0,8.0})
   If (feature 2 in {3.0,7.0})
    If (feature 2 in {7.0})
     Predict: 1.9249394673123488
    Else (feature 2 not in {7.0})
     Predict: 2.3701067615658364
   Else (feature 2 not in {3.0,7.0})
    If (feature 2 in {8.0})
     If (feature 1 in {0.0})
      Predict: 2.8190476190476192
     Else (feature 1 not in {0.0})
      Predict: 4.927083333333333
    Else (feature 2 not in {8.0})
     If (feature 2 in {0.0})
      If (feature 1 in {1.0})
       Predict: 4.147058823529412
      Else (feature 1 not in {1.0})
       Predict: 5.11472275334608
     Else (feature 2 not in {0.0})
      If (feature 1 in {0.0})
       Predict: 4.388268156424581
      Else (feature 1 not in {0.0})
       Predict: 8.556451612903226
  Else (feature 2 not in {0.0,3.0,5.0,7.0,8.0})
   If (feature 2 in {1.0,2.0,4.0,6.0,9.0,11.0,13.0})
    If (feature 1 in {0.0})
    

# Pipeline

The above process is a little messy, so we compactify it to a Pipeline. This looks like String Indexer -> Vector Assembler -> Decision Tree

In [0]:
# We can turn these steps into a pipeline

from pyspark.ml import Pipeline
stages = [stringIndexer, vecAssembler, decTree]
pipeline = Pipeline(stages=stages)

In [0]:
# Fit the pipeline to the reduced data
reduced = reduced.dropna()
pipelineModel = pipeline.fit(reduced)

In [0]:
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_8b270c3e97ca, depth=5, numNodes=37, numFeatures=3
  If (feature 2 in {0.0,3.0,5.0,7.0,8.0})
   If (feature 2 in {3.0,7.0})
    If (feature 2 in {7.0})
     Predict: 1.9249394673123488
    Else (feature 2 not in {7.0})
     Predict: 2.3701067615658364
   Else (feature 2 not in {3.0,7.0})
    If (feature 2 in {8.0})
     If (feature 1 in {0.0})
      Predict: 2.8190476190476192
     Else (feature 1 not in {0.0})
      Predict: 4.927083333333333
    Else (feature 2 not in {8.0})
     If (feature 2 in {0.0})
      If (feature 1 in {1.0})
       Predict: 4.147058823529412
      Else (feature 1 not in {1.0})
       Predict: 5.11472275334608
     Else (feature 2 not in {0.0})
      If (feature 1 in {0.0})
       Predict: 4.388268156424581
      Else (feature 1 not in {0.0})
       Predict: 8.556451612903226
  Else (feature 2 not in {0.0,3.0,5.0,7.0,8.0})
   If (feature 2 in {1.0,2.0,4.0,6.0,9.0,11.0,13.0})
    If (feature 1 in {0.0})
    

# Random Forest Regression

Random forest regression involves running multiple Decision Trees and then averaging the result, thus providing a more robust prediction. We can run this using the same pipeline as above as the input data is the same.

In [0]:
# Let's try the same but with a Random Forest
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="ArrDelay", maxBins=40, seed=42)

In [0]:
# The pipeline is the same here
stages = [stringIndexer, vecAssembler, rf]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(reduced)

In [0]:
# Look at tree branches
rfModel = pipelineModel.stages[-1]
print(rfModel.toDebugString)

RandomForestRegressionModel: uid=RandomForestRegressor_3e7553bb556c, numTrees=20, numFeatures=3
  Tree 0 (weight 1.0):
    If (feature 2 in {0.0,3.0,5.0,7.0,8.0,11.0,13.0})
     Predict: 4.153821757056772
    Else (feature 2 not in {0.0,3.0,5.0,7.0,8.0,11.0,13.0})
     If (feature 1 in {0.0})
      If (feature 2 in {1.0,2.0,4.0,6.0,9.0})
       Predict: 8.032848680667744
      Else (feature 2 not in {1.0,2.0,4.0,6.0,9.0})
       Predict: 17.271186440677965
     Else (feature 1 not in {0.0})
      If (feature 2 in {6.0,12.0})
       If (feature 2 in {6.0})
        Predict: 8.038461538461538
       Else (feature 2 not in {6.0})
        Predict: 8.615384615384615
      Else (feature 2 not in {6.0,12.0})
       Predict: 11.673825503355705
  Tree 1 (weight 1.0):
    If (feature 2 in {0.0,3.0,7.0,8.0})
     Predict: 3.485424588086185
    Else (feature 2 not in {0.0,3.0,7.0,8.0})
     If (feature 1 in {0.0})
      Predict: 7.853891336270191
     Else (feature 1 not in {0.0})
      Predict: 10

# Hyperparameter Search

Here we perform a grid search to find the best pair of parameters (max depth and number of trees) for our forest. We use the ParamGridBuilder to construct a grid from our possible choice of parameters (here 2 x 3 giving 6 possibilities). We use the regression evaluator with the RMSE metric to find the best pair, which is performed using k-fold cross validation (here k=3) 

In [0]:
# We want to find the best hyperparameters. For example we can search over the following grid, where we allow max depth to be 2,4 or 6 and the number of trees to be 10 or 100

from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
 .addGrid(rf.maxDepth, [2, 4, 6])
 .addGrid(rf.numTrees, [10, 100])
 .build())

In [0]:
# To evaluate each of the hyperparameter pairs we use a regression evaulator with RMSE
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="ArrDelay",
 predictionCol="prediction",
 metricName="rmse")

In [0]:
# Finally we use k-fold (with k=3) cross validation to evaluate each of the hyperparameter combinations

from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline,
 evaluator=evaluator,
 estimatorParamMaps=paramGrid,
 numFolds=3,
 seed=42)
cvModel = cv.fit(reduced)

In [0]:
# We can see the performance here. There is little difference, suggesting our choice of grid needs to be enlarged or changed entirely.
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

Out[175]: [({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_3e7553bb556c', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  20.560019694881976),
 ({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_3e7553bb556c', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  20.56852718605279),
 ({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_3e7553bb

# Performance

Here we look at two ways to improve the speed of the above model training. Since the Random Forest models are independet, we can train them in parallel, doing which saves some time. We also note that, above, the cross validator is performing the String Indexinger and Vector Assembler on each fold and for each pair of parameters. By moving these before the cross validator we can also save some time  

In [0]:
# Try training the forests in the CV model parallel
cvModel = cv.setParallelism(4).fit(reduced)

In [0]:
# Reorganise the pipeline to perform string indexing and vector assembling first
cv = CrossValidator(estimator=rf,
 evaluator=evaluator,
 estimatorParamMaps=paramGrid,
 numFolds=3,
 parallelism=4,
 seed=42)
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(reduced)


In [0]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

Out[178]: [({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_3e7553bb556c', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  20.560019694881976),
 ({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_3e7553bb556c', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  20.56852718605279),
 ({Param(parent='RandomForestRegressor_3e7553bb556c', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_3e7553bb