In [1]:
import warnings
warnings.simplefilter("ignore")

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .master("local[6]") # we can use [X] to limit the number of cores spark uses
    .appName("Taxis")
    .getOrCreate()
)
spark

We are going to replicate the following pipeline in Spark:

```
pipeline = make_pipeline(
    make_union(
        make_pipeline(
            ColumnSelector(cols=["vendor_id", "store_and_fwd_flag", "payment_type", "rate_code"]),
            OneHotEncoder()
        ),
        make_pipeline(
            ColumnSelector(cols=["pickup_longitude", "pickup_latitude", "passenger_count"]),
            StandardScaler()
        )  
    ),
    SGDRegressor()
)
```

In [4]:
target = "tip_amount"
independent_variables = ["vendor_id", "store_and_fwd_flag", "payment_type",
               "rate_code", "pickup_longitude", "pickup_latitude", 
               "passenger_count"]

In [7]:
taxi = (
    spark
    .read
    .parquet("./data/nyc_taxi_data_2014_small.parquet")
    .select(independent_variables + [target])
    .withColumnRenamed(target, "label")
)

In [8]:
taxi.head()

Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.99476999999999, pickup_latitude=40.736828, passenger_count=1, label=1.4)

Spark has its own implementation of [Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) more or less similar to sklearns', it has 2 different kinds of objects:

- **Transformers**, preprocess the dataset prior to the estimation (encoders, standardizers, etc)
- **Estimators**, ML models that perform predictions (Linear Regression, Random Forest, etc.)

Spark transformers work by taking **a single column** as an input and producing **a single column** as an output.

Same way, Spark estimators work by taking **a single column** as an input (usually named `features`) and producing **a single column** as the predictions(usually named `labels`)

**Categorical Variables**

In [9]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler, VectorSlicer
from pyspark.ml.pipeline import Pipeline

In [10]:
categorical_variables = ["vendor_id", "store_and_fwd_flag", "payment_type", "rate_code"]

In [11]:
indexers = []
encoders = []
for column in categorical_variables:
    indexers.append(StringIndexer(inputCol=column, outputCol=f"{column}_idx)", handleInvalid="keep"))
    encoders.append(OneHotEncoder(inputCol=f"{column}_idx)", outputCol=f"{column}_enc"))
    
categorical_ensembler = VectorAssembler(inputCols=[enc.getOutputCol() for enc in encoders], 
                                         outputCol="categorical_encoding")

In [12]:
categorical_pipeline = Pipeline(stages=indexers +encoders + [categorical_ensembler])

In [13]:
categorical_pipeline.getStages()

[StringIndexer_326236731351,
 StringIndexer_0124264c121b,
 StringIndexer_236fe3694a45,
 StringIndexer_900a6fdbd12c,
 OneHotEncoder_d2e8ab5e667f,
 OneHotEncoder_45d783d6bfc8,
 OneHotEncoder_8fc8ae062f2d,
 OneHotEncoder_5424b982fdf3,
 VectorAssembler_a99daa68b21c]

In [14]:
taxi_cat = categorical_pipeline.fit(taxi).transform(taxi)

In [15]:
taxi_cat.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.99476999999999, pickup_latitude=40.736828, passenger_count=1, label=1.4, vendor_id_idx)=0.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=1.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {0: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {1: 1.0}), rate_code_enc=SparseVector(8, {0: 1.0}), categorical_encoding=SparseVector(17, {0: 1.0, 2: 1.0, 5: 1.0, 9: 1.0})),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.982392, pickup_latitude=40.773382, passenger_count=1, label=1.9, vendor_id_idx)=0.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=1.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {0: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {1: 1.0}), rate_code_enc=SparseVector(8, {0: 1.0}), categorical_encoding=SparseVector(17, {0: 1.0, 2: 1.

**Numerical variables**

In [16]:
numerical_variables = ["pickup_longitude", "pickup_latitude", "passenger_count"]

In [17]:
numerical_ensembler = VectorAssembler(inputCols=numerical_variables, outputCol="numerical_data")    

standardizer = StandardScaler(inputCol="numerical_data", outputCol="numerical_data_std")    

numerical_pipeline = Pipeline(stages=[numerical_ensembler, standardizer])

In [18]:
taxi_num = numerical_pipeline.fit(taxi).transform(taxi)

In [19]:
taxi_num.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.99476999999999, pickup_latitude=40.736828, passenger_count=1, label=1.4, numerical_data=DenseVector([-73.9948, 40.7368, 1.0]), numerical_data_std=DenseVector([-7.5283, 7.5237, 1.6172])),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.982392, pickup_latitude=40.773382, passenger_count=1, label=1.9, numerical_data=DenseVector([-73.9824, 40.7734, 1.0]), numerical_data_std=DenseVector([-7.5271, 7.5305, 1.6172]))]

In [20]:
processing_ensembler = VectorAssembler(inputCols=["categorical_encoding", "numerical_data"], 
                                         outputCol="features")  

processing_pipeline = Pipeline(stages=[categorical_pipeline, numerical_pipeline, processing_ensembler])

In [21]:
taxi_processed = processing_pipeline.fit(taxi).transform(taxi)

In [22]:
taxi_processed.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.99476999999999, pickup_latitude=40.736828, passenger_count=1, label=1.4, vendor_id_idx)=0.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=1.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {0: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {1: 1.0}), rate_code_enc=SparseVector(8, {0: 1.0}), categorical_encoding=SparseVector(17, {0: 1.0, 2: 1.0, 5: 1.0, 9: 1.0}), numerical_data=DenseVector([-73.9948, 40.7368, 1.0]), numerical_data_std=DenseVector([-7.5283, 7.5237, 1.6172]), features=SparseVector(20, {0: 1.0, 2: 1.0, 5: 1.0, 9: 1.0, 17: -73.9948, 18: 40.7368, 19: 1.0})),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.982392, pickup_latitude=40.773382, passenger_count=1, label=1.9, vendor_id_idx)=0.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=1.0, rate_code_idx)=0.0, vendor_id_enc=Spar

In [23]:
taxi_processed.head(2)[0].features.values

array([  1.      ,   1.      ,   1.      ,   1.      , -73.99477 ,
        40.736828,   1.      ])

We have a pipeline capable of taking the input dataframe, and producing an output `features` column that will be the input for the estimator.

To define a spark estimator, besides the usual model specific hyperparameters, we need to define some additiona arguments:

- `featuresCol`, the name of the features column (`features`, by default)
- `labelCol`, the name of the target variable column(`labels` by default)
- `predictionCol`, the name for the prediction column name (`prediction` by default)

In [None]:
from pyspark.ml.regression import LinearRegression

In [24]:
lr = LinearRegression(maxIter=100,
                      regParam=0.3,
                      elasticNetParam=0.8,
                      featuresCol="features",
                      labelCol="label",
                      predictionCol="prediction")
pipeline = Pipeline(stages=[processing_pipeline, lr])

In [27]:
trained_pipeline = pipeline.fit(taxi)

In [28]:
trained_pipeline

PipelineModel_e1a15daa7e84

In [29]:
trained_pipeline.transform(taxi).head()

Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CRD', rate_code=1, pickup_longitude=-73.99476999999999, pickup_latitude=40.736828, passenger_count=1, label=1.4, vendor_id_idx)=0.0, store_and_fwd_flag_idx)=0.0, payment_type_idx)=1.0, rate_code_idx)=0.0, vendor_id_enc=SparseVector(2, {0: 1.0}), store_and_fwd_flag_enc=SparseVector(2, {0: 1.0}), payment_type_enc=SparseVector(5, {1: 1.0}), rate_code_enc=SparseVector(8, {0: 1.0}), categorical_encoding=SparseVector(17, {0: 1.0, 2: 1.0, 5: 1.0, 9: 1.0}), numerical_data=DenseVector([-73.9948, 40.7368, 1.0]), numerical_data_std=DenseVector([-7.5283, 7.5237, 1.6172]), features=SparseVector(20, {0: 1.0, 2: 1.0, 5: 1.0, 9: 1.0, 17: -73.9948, 18: 40.7368, 19: 1.0}), prediction=2.083814845782015)

Now, we can write the predictions back as another parquet file. We do this since we dont know if we could collect all of the dataset in memory

In [None]:
(trained_pipeline
 .transform(taxi)
 .select(
    categorical_variables + numerical_variables + ["prediction"]
 )
 .write
 .parquet("taxi_2014_prediction.parquet")
)

In [32]:
taxi_predictions = spark.read.parquet("taxi_2014_prediction.parquet")

In [33]:
taxi_predictions.head(2)

[Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CSH', rate_code=1, pickup_longitude=-73.978514, pickup_latitude=40.754396, passenger_count=1, prediction=0.1951040649097826),
 Row(vendor_id='CMT', store_and_fwd_flag='N', payment_type='CSH', rate_code=1, pickup_longitude=-73.789975, pickup_latitude=40.646617, passenger_count=1, prediction=0.1951040649097826)]

In [34]:
taxi_predictions.select("prediction").describe().toPandas()

Unnamed: 0,summary,prediction
0,count,1000000.0
1,mean,1.080832169995282
2,stddev,0.9841169109290068
3,min,0.1951040649097826
4,max,4.176174270525056


# Cross Validation and Hyperparameter Search with Spark

Spark doesnt have a random search object, however it does have a grid search object,`ParamGridBuilder`

In [35]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

We create the grid builder and add elements to the grid search

In [36]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1e-3, 1.]) \
    .addGrid(lr.elasticNetParam, [1e-3, 1.]) \
    .build()

We need to evaluate the grid search in order to decide which hyperparameters are the best combination

In [37]:
evaluator = RegressionEvaluator(metricName='rmse')

In [38]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

In [1]:
cvModel = crossval.fit(taxi)

cvModel.avgMetrics

list(zip(cvModel.avgMetrics, paramGrid))

NameError: name 'crossval' is not defined

We can see the pipeline stages:

In [42]:
cvModel.bestModel.stages

[PipelineModel_bf036130d077, LinearRegression_90134039cc6c]

And finally we can get the best hyperparameters for the estimator (`extractParamMap()` extracts a transformer/estimator parameters as a dictionary):

In [44]:
cvModel.bestModel.stages[1].extractParamMap()

{Param(parent='LinearRegression_90134039cc6c', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LinearRegression_90134039cc6c', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.001,
 Param(parent='LinearRegression_90134039cc6c', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0.'): 1.35,
 Param(parent='LinearRegression_90134039cc6c', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LinearRegression_90134039cc6c', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LinearRegression_90134039cc6c', name='labelCol', doc='label column name'): 'label',
 Param(parent='LinearRegression_90134039cc6c', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError)'): 'squaredError',
 Param(