## Cross Validation with RanfomForests Example

In this example, we will perform a regression on bike-sharing dataset. We will pass the regression model through a CrossValidator to select the perfect model

Check if the Spark and SparkSQL context have started successfully

In [1]:
print sc
print sqlContext
print sqlCtx

<pyspark.context.SparkContext object at 0x7f15fc946650>
<pyspark.sql.context.HiveContext object at 0x7f15fc926e90>
<pyspark.sql.context.HiveContext object at 0x7f15fc926e90>


In [2]:
bike_df = (sqlContext
           .read
           .format('com.databricks.spark.csv')
           .option("header", "true") # Use first line of all files as header
           .option("inferSchema", "true") # Automatically infer data types
           .load("bike-data/day.csv"))

In [3]:
bike_df.columns

['instant',
 'dteday',
 'season',
 'yr',
 'mnth',
 'holiday',
 'weekday',
 'workingday',
 'weathersit',
 'temp',
 'atemp',
 'hum',
 'windspeed',
 'casual',
 'registered',
 'cnt']

In [4]:
bike_df.show()

+-------+----------+------+---+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+----+
|instant|    dteday|season| yr|mnth|holiday|weekday|workingday|weathersit|    temp|   atemp|     hum|windspeed|casual|registered| cnt|
+-------+----------+------+---+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+----+
|      1|2011-01-01|     1|  0|   1|      0|      6|         0|         2|0.344167|0.363625|0.805833| 0.160446|   331|       654| 985|
|      2|2011-01-02|     1|  0|   1|      0|      0|         0|         2|0.363478|0.353739|0.696087| 0.248539|   131|       670| 801|
|      3|2011-01-03|     1|  0|   1|      0|      1|         1|         1|0.196364|0.189405|0.437273| 0.248309|   120|      1229|1349|
|      4|2011-01-04|     1|  0|   1|      0|      2|         1|         1|     0.2|0.212122|0.590435| 0.160296|   108|      1454|1562|
|      5|2011-01-05|     1|  0|   1|      0|      3|   

For this example purposes, lets just select some simple columns to run the regression on

In [5]:
bike_df1 = bike_df.select('season','mnth','holiday','weekday','workingday','weathersit','temp','atemp','hum',
                         'windspeed','casual','registered','cnt')

In [6]:
bike_df2 = bike_df1.withColumn("cnt", bike_df1["cnt"].cast("double"))

In [7]:
bike_df2.show()

+------+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+------+
|season|mnth|holiday|weekday|workingday|weathersit|    temp|   atemp|     hum|windspeed|casual|registered|   cnt|
+------+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+------+
|     1|   1|      0|      6|         0|         2|0.344167|0.363625|0.805833| 0.160446|   331|       654| 985.0|
|     1|   1|      0|      0|         0|         2|0.363478|0.353739|0.696087| 0.248539|   131|       670| 801.0|
|     1|   1|      0|      1|         1|         1|0.196364|0.189405|0.437273| 0.248309|   120|      1229|1349.0|
|     1|   1|      0|      2|         1|         1|     0.2|0.212122|0.590435| 0.160296|   108|      1454|1562.0|
|     1|   1|      0|      3|         1|         1|0.226957| 0.22927|0.436957|   0.1869|    82|      1518|1600.0|
|     1|   1|      0|      4|         1|         1|0.204348|0.233209|0.518261|0.0895652|

In [8]:
bike_df2.dtypes

[('season', 'int'),
 ('mnth', 'int'),
 ('holiday', 'int'),
 ('weekday', 'int'),
 ('workingday', 'int'),
 ('weathersit', 'int'),
 ('temp', 'double'),
 ('atemp', 'double'),
 ('hum', 'double'),
 ('windspeed', 'double'),
 ('casual', 'int'),
 ('registered', 'int'),
 ('cnt', 'double')]

Create a Spark ML Pipeline to run RandomForests Regression

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [10]:
feature_columns = ['season'
                   ,'mnth'
                   ,'holiday'
                   ,'weekday'
                   ,'workingday'
                   ,'weathersit'
                   ,'temp'
                   ,'atemp'
                   ,'hum'
                   ,'windspeed'
                   ,'casual'
                   ,'registered']

Split the data into training and test splits

In [11]:
(training_data, test_data) = bike_df2.randomSplit([0.7,0.3], seed = 10)
print "Training data size is :"+str(training_data.count())
print "Test data size is :"+str(test_data.count())

Training data size is :512
Test data size is :219


In [12]:
training_data.dtypes

[('season', 'int'),
 ('mnth', 'int'),
 ('holiday', 'int'),
 ('weekday', 'int'),
 ('workingday', 'int'),
 ('weathersit', 'int'),
 ('temp', 'double'),
 ('atemp', 'double'),
 ('hum', 'double'),
 ('windspeed', 'double'),
 ('casual', 'int'),
 ('registered', 'int'),
 ('cnt', 'double')]

We now need to take the columns that are features and create a vector from it. We will use a Transformer called VectorAssembler that gives a new dataframe by merging multiple columns in the input dataframe in a vector column 

In [13]:
vecAssembler = VectorAssembler(inputCols=feature_columns, outputCol='features_vector')
#bike_df2 = vecAssembler.transform(bike_df1)

Train the RandomForests model

In [14]:
rdf = RandomForestRegressor(labelCol='cnt',featuresCol="features_vector",predictionCol='predicted_cnt',seed=15)

Chaining VectorAssembler and RandomForestRegressor into a pipeline

In [15]:
pipeline = Pipeline(stages=[vecAssembler,rdf])

Set the parameter grid to fit the model using Cross Valiadtion and return the best model

In [16]:
paramGrid = (ParamGridBuilder()
             .addGrid(rdf.maxDepth,[5,10,15,20])
             .addGrid(rdf.numTrees,[1,10,50,100])
             .build())

In [17]:
rdfEvaluator = RegressionEvaluator(predictionCol="predicted_cnt", labelCol='cnt', metricName='rmse')

In [18]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=rdfEvaluator)

In [19]:
# Run cross-validation, and choose the best set of parameters.
cvModel = cv.fit(training_data)

Make the predictions on the test_data using the cvModel.

In [20]:
test_data_with_predictions = cvModel.transform(test_data)

In [21]:
test_data_with_predictions.show()

+------+----+-------+-------+----------+----------+---------+---------+--------+---------+------+----------+------+--------------------+------------------+
|season|mnth|holiday|weekday|workingday|weathersit|     temp|    atemp|     hum|windspeed|casual|registered|   cnt|     features_vector|     predicted_cnt|
+------+----+-------+-------+----------+----------+---------+---------+--------+---------+------+----------+------+--------------------+------------------+
|     1|   1|      0|      0|         0|         1| 0.138333| 0.116175|0.434167|  0.36195|    54|       768| 822.0|[1.0,1.0,0.0,0.0,...|           1064.95|
|     1|   1|      0|      0|         0|         1| 0.216522| 0.250322|0.722174|0.0739826|   140|       956|1096.0|[1.0,1.0,0.0,0.0,...|           1366.33|
|     1|   1|      0|      1|         1|         1| 0.196364| 0.189405|0.437273| 0.248309|   120|      1229|1349.0|[1.0,1.0,0.0,1.0,...|1435.9091666666666|
|     1|   1|      0|      1|         1|         2| 0.180833|  0

We can now use the evaluator we created to find the RMSE on the test_data

In [22]:
test_data_RMSE = rdfEvaluator.evaluate(test_data_with_predictions)
print "RMSE on test data is : " + str(test_data_RMSE)

RMSE on test data is : 260.970107946


In [23]:
bestRDFModel = cvModel.bestModel.stages[1]

In [24]:
bestRDFModel

RandomForestRegressionModel (uid=rfr_5910503e0776) with 100 trees

We have successfully built a RandomForestsRegressin model using Cross-Validation