# Tuning Machine Learning models in Spark

<a href = "http://yogen.io"><img src="http://yogen.io/assets/logo.svg" alt="yogen" style="width: 200px; float: right;"/></a>

### If you are running this notebook in Google Colab

Copy the following to a code cell and run it. It will install and set up Spark for you.

```python
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.uvigo.es/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar -xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark pyspark==2.4.6

import os
import findspark
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
findspark.init()
spark = SparkSession.builder.master("local[*]").config("spark.ui.port", "4050").getOrCreate()  ## For ngrok to tunnel to
```

In [1]:
!pip install pyspark==3.1.1

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.ui.port", "4050").getOrCreate()  ## For ngrok to tunnel to

Collecting pyspark==3.1.1
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 19.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=2eabcf1aadb9758a940f7e86e11a0712052d5c08f8fd26c463424994b5287e2a
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1

## ML Pipelines in Spark

ML model training and tuning often represents running the same steps once and again. Often, we run the same steps with small variations in order to evaluate combinations of parameters. 

In order to make this use case a lot easier, Spark provides the [Pipeline](https://spark.apache.org/docs/2.3.0/ml-pipeline.html) abstraction.

A Pipeline represents a series of steps in the processing of a dataset. Each step is a Transformer or an Estimator. The whole Pipeline is an Estimator, so we can .fit the whole pipeline in one step. When we do that, the steps'  .fit and .transform methods will be called in turn.

![pipelineestimator](https://spark.apache.org/docs/2.3.0/img/ml-Pipeline.png)

![PipelineModel](https://spark.apache.org/docs/2.3.0/img/ml-PipelineModel.png)

## Example: predicting flight delays

We'll be using the same [Transtats'](https://www.transtats.bts.gov/) OTP performance data] from way back when. Remember it?

It's a table that contains all domestic departures by US air air carriers that represent at least one percent of domestic scheduled passenger revenues, with data on each individual departure including [Tail Number](https://en.wikipedia.org/wiki/Tail_number), departure delay, origin, destination and carrier.


### Load the data

Opening .zip files in Spark is a bit of a pain. For now, let's just decompress the file we want to read. When we are ready to expand the processing to the cluster, we will need to do [this](https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark).

```python
csvname = 'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv'
columns_of_interest = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 
                       'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                       'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                       'DepTime', 'DepDelay', 'AirTime', 'Distance']
```

In [2]:
df = spark.read.csv('On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2019_3.csv.gz', inferSchema=True, header=True)

columns_of_interest = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 
                       'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                       'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                       'DepTime', 'DepDelay', 'AirTime', 'Distance']

flights = df[columns_of_interest]                       
flights

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: int, Origin: string, OriginCityName: string, OriginStateName: string, Dest: string, DestCityName: string, DestStateName: string, DepTime: int, DepDelay: double, AirTime: double, Distance: double]

In [8]:
from pyspark.sql import types, functions

flights.select(*[functions.count(col) for col in columns_of_interest]).show()

+-----------+------------+-----------------+----------------+------------------------+------------------+--------------------------------------+-------------+---------------------+----------------------+-----------+-------------------+--------------------+--------------+---------------+--------------+---------------+
|count(Year)|count(Month)|count(DayofMonth)|count(DayOfWeek)|count(Reporting_Airline)|count(Tail_Number)|count(Flight_Number_Reporting_Airline)|count(Origin)|count(OriginCityName)|count(OriginStateName)|count(Dest)|count(DestCityName)|count(DestStateName)|count(DepTime)|count(DepDelay)|count(AirTime)|count(Distance)|
+-----------+------------+-----------------+----------------+------------------------+------------------+--------------------------------------+-------------+---------------------+----------------------+-----------+-------------------+--------------------+--------------+---------------+--------------+---------------+
|     632074|      632074|           632074

In [14]:
functions.sum(functions.isnull('DepDelay').astype(types.IntegerType()))

Column<'sum(CAST((DepDelay IS NULL) AS INT))'>

In [15]:
[functions.sum(functions.isnull(col).astype(types.IntegerType())) for col in columns_of_interest]

[Column<'sum(CAST((Year IS NULL) AS INT))'>,
 Column<'sum(CAST((Month IS NULL) AS INT))'>,
 Column<'sum(CAST((DayofMonth IS NULL) AS INT))'>,
 Column<'sum(CAST((DayOfWeek IS NULL) AS INT))'>,
 Column<'sum(CAST((Reporting_Airline IS NULL) AS INT))'>,
 Column<'sum(CAST((Tail_Number IS NULL) AS INT))'>,
 Column<'sum(CAST((Flight_Number_Reporting_Airline IS NULL) AS INT))'>,
 Column<'sum(CAST((Origin IS NULL) AS INT))'>,
 Column<'sum(CAST((OriginCityName IS NULL) AS INT))'>,
 Column<'sum(CAST((OriginStateName IS NULL) AS INT))'>,
 Column<'sum(CAST((Dest IS NULL) AS INT))'>,
 Column<'sum(CAST((DestCityName IS NULL) AS INT))'>,
 Column<'sum(CAST((DestStateName IS NULL) AS INT))'>,
 Column<'sum(CAST((DepTime IS NULL) AS INT))'>,
 Column<'sum(CAST((DepDelay IS NULL) AS INT))'>,
 Column<'sum(CAST((AirTime IS NULL) AS INT))'>,
 Column<'sum(CAST((Distance IS NULL) AS INT))'>]

In [17]:
flights.select([functions.sum(functions.isnull(col).astype(types.IntegerType())).alias('nulls_' + col) 
                  for col in columns_of_interest]) \
       .show()

+----------+-----------+----------------+---------------+-----------------------+-----------------+-------------------------------------+------------+--------------------+---------------------+----------+------------------+-------------------+-------------+--------------+-------------+--------------+
|nulls_Year|nulls_Month|nulls_DayofMonth|nulls_DayOfWeek|nulls_Reporting_Airline|nulls_Tail_Number|nulls_Flight_Number_Reporting_Airline|nulls_Origin|nulls_OriginCityName|nulls_OriginStateName|nulls_Dest|nulls_DestCityName|nulls_DestStateName|nulls_DepTime|nulls_DepDelay|nulls_AirTime|nulls_Distance|
+----------+-----------+----------------+---------------+-----------------------+-----------------+-------------------------------------+------------+--------------------+---------------------+----------+------------------+-------------------+-------------+--------------+-------------+--------------+
|         0|          0|               0|              0|                      0|             

### Drop nas

There are only a few departures for which any of the columns of interest contains null values. The most expedient way to handle them is to just drop them, since they won't make much of a difference.

In [18]:
clean = flights.na.drop()

NA-related functions are grouped in a .na attribute of DataFrames.

## Feature extraction and generation of target variable

The departing hour is the most important factor in delays, so we need to calculate it from the departure time. Since the input file uses a funny format for times, Spark has interpreted them as floats:

In [19]:
clean.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Reporting_Airline: string (nullable = true)
 |-- Tail_Number: string (nullable = true)
 |-- Flight_Number_Reporting_Airline: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DestStateName: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)



In [21]:
clean.select('DepTime').show(10)

+-------+
|DepTime|
+-------+
|    726|
|    730|
|    915|
|    913|
|    916|
|    920|
|    916|
|    912|
|    913|
|    915|
+-------+
only showing top 10 rows



#### Exercise

Calculated the 'DepHour' column that represents the hour as an int.

In [34]:
clean.withColumn('DepHour', (clean['DepTime'] / 100).cast(types.IntegerType()))\
     .withColumn('Delayed', (clean['DepDelay'] > 15).cast(types.IntegerType()))  

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: int, Origin: string, OriginCityName: string, OriginStateName: string, Dest: string, DestCityName: string, DestStateName: string, DepTime: int, DepDelay: double, AirTime: double, Distance: double, DepHour: int, Delayed: int]

In [39]:
final = clean.select('*',
                     (clean['DepTime'] / 100).cast(types.IntegerType()).alias('DepHour'),
                     (clean['DepDelay'] > 15).cast(types.IntegerType()).alias('Delayed'))  

final

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: int, Origin: string, OriginCityName: string, OriginStateName: string, Dest: string, DestCityName: string, DestStateName: string, DepTime: int, DepDelay: double, AirTime: double, Distance: double, DepHour: int, Delayed: int]

We will also generate a binary target variable. The aviation industry considers a flight delayed when it departs more than 15 minutes after its scheduled departure time, so we will use that. We will create it as an integer, since that is what the learning algorithms expect.

In order to make the training times manageable, let's pick only 10% of the data to train.

In [42]:
sample, rest = final.randomSplit([1., 19.], seed=7)

## Handle different fields in different ways

We have features of at least three kinds:

* Numeric continuous fields, which we can use as input to many algorithms as they are. In particular, decision trees can take continuous variables with any value as input, since they only look for the cutoff point that most increases the homogeneity of the resulting groups. In contrast, if we were using a logistic regression with regularization, for example, we would need to first scale the variables to have comparable magnitudes.

* There are fields which we will treat as categorical variables, but which are already integers. These need to be one-hot encoded.

* Finally, there are several categorical variables that are encoded as strings. These need to be one-hot encoded, but OneHotEncoder requires numeric input. Therefore, we will need to apply a StringIndexer to each of them before one-hot encoding.

```python
# Reminder:

categorical_fields = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 
                      'Origin', 'OriginCityName', 'OriginStateName', 
                      'Dest', 'DestCityName', 'DestStateName']

string_fields = [field.name for field in flights_sample.schema.fields if field.dataType == types.StringType()]

continuous_fields = ['Distance', 'DepHour']

target_field = 'Delayed'
```

In [47]:
sample.schema.fields

[StructField(Year,IntegerType,true),
 StructField(Month,IntegerType,true),
 StructField(DayofMonth,IntegerType,true),
 StructField(DayOfWeek,IntegerType,true),
 StructField(Reporting_Airline,StringType,true),
 StructField(Tail_Number,StringType,true),
 StructField(Flight_Number_Reporting_Airline,IntegerType,true),
 StructField(Origin,StringType,true),
 StructField(OriginCityName,StringType,true),
 StructField(OriginStateName,StringType,true),
 StructField(Dest,StringType,true),
 StructField(DestCityName,StringType,true),
 StructField(DestStateName,StringType,true),
 StructField(DepTime,IntegerType,true),
 StructField(DepDelay,DoubleType,true),
 StructField(AirTime,DoubleType,true),
 StructField(Distance,DoubleType,true),
 StructField(DepHour,IntegerType,true),
 StructField(Delayed,IntegerType,true)]

In [49]:
sample.schema.fields[0].name

'Year'

In [50]:
sample.schema.fields[0].dataType

IntegerType

In [44]:
categorical_fields = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'Reporting_Airline', 
                      'Origin', 'OriginCityName', 'OriginStateName', 
                      'Dest', 'DestCityName', 'DestStateName']
 
string_fields = [field.name for field in sample.schema.fields if field.dataType == types.StringType()]
 
continuous_fields = ['Distance', 'DepHour']
 
target_field = 'Delayed'

We have generated the list of names of columns that have dataType string with a list comprehension, rather than hard-coding it, but it is just like the other ones.

## Handling categorical fields

Let's do the processing of just one field first, as an example. Then we will process the rest.

### StringIndexer 

A [StringIndexer](https://spark.apache.org/docs/2.2.0/ml-features.html#stringindexer) is an estimator that takes a single string field, then produces a transformer that codifies said field as numeric labels that are fit for feeding to a one-hot encoding. 

We need to specify an input column, an output column, and a way to handle invalids. In this case, invalids are values that the indexer has not seen during fitting but that the transformer finds during processing. Its values are 'error' (the default), which is pretty self-explanatory, 'skip', which drops them, and 'keep', which is what we want. It will assign all unseen labels to a single category index.

In [64]:
from pyspark.ml.feature import StringIndexer, StandardScaler, OneHotEncoder

origin_indexer = StringIndexer(inputCol='Origin', outputCol='OriginIndex', handleInvalid='keep')
origin_indexer.fit

<bound method Estimator.fit of StringIndexer_be0af2e439f1>

In [105]:
test_df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3), ('d', 4)], schema=['x','y'])
train, test = test_df.randomSplit([2.,2.], seed=8)
train.show()

+---+---+
|  x|  y|
+---+---+
|  a|  1|
|  d|  4|
+---+---+



In [106]:
test_indexer = StringIndexer(inputCol='x', handleInvalid='keep').fit(train)

In [109]:
test_indexer.transform(train).show()

+---+---+----------------------------------+
|  x|  y|StringIndexer_0a211fec2251__output|
+---+---+----------------------------------+
|  a|  1|                               0.0|
|  d|  4|                               1.0|
+---+---+----------------------------------+



In [107]:
test_indexer.transform(test).show()

+---+---+----------------------------------+
|  x|  y|StringIndexer_0a211fec2251__output|
+---+---+----------------------------------+
|  b|  2|                               2.0|
|  c|  3|                               2.0|
+---+---+----------------------------------+



In [65]:
from pyspark.ml.base import Estimator

isinstance(origin_indexer, Estimator)

True

In [66]:
origin_indexer.transform

AttributeError: ignored

In [67]:
indexer_model = origin_indexer.fit(sample)
indexer_model

StringIndexerModel: uid=StringIndexer_be0af2e439f1, handleInvalid=keep

In [68]:
indexer_model.transform

<bound method Transformer.transform of StringIndexerModel: uid=StringIndexer_be0af2e439f1, handleInvalid=keep>

In [69]:
indexer_model.transform(rest).show(5)

+----+-----+----------+---------+-----------------+-----------+-------------------------------+------+--------------------+---------------+----+---------------+-------------+-------+--------+-------+--------+-------+-------+-----------+
|Year|Month|DayofMonth|DayOfWeek|Reporting_Airline|Tail_Number|Flight_Number_Reporting_Airline|Origin|      OriginCityName|OriginStateName|Dest|   DestCityName|DestStateName|DepTime|DepDelay|AirTime|Distance|DepHour|Delayed|OriginIndex|
+----+-----+----------+---------+-----------------+-----------+-------------------------------+------+--------------------+---------------+----+---------------+-------------+-------+--------+-------+--------+-------+-------+-----------+
|2019|    3|         1|        5|               9E|     N131EV|                           3328|   ATL|         Atlanta, GA|        Georgia| DSM| Des Moines, IA|         Iowa|   1510|     3.0|  108.0|   743.0|     15|      0|        0.0|
|2019|    3|         1|        5|               9E| 

In [137]:
index_cols = [field + 'Index' for field in string_fields]

indexer = StringIndexer(inputCols=string_fields, outputCols=index_cols, handleInvalid='keep')

### OneHotEncoder

A [OneHotEncoder](https://spark.apache.org/docs/latest/ml-features#onehotencoderestimator) generates a n-1 length vector column for an n-category column of category indices. 

We need to specify an input and an output column.

In [138]:
index_cols

['Reporting_AirlineIndex',
 'Tail_NumberIndex',
 'OriginIndex',
 'OriginCityNameIndex',
 'OriginStateNameIndex',
 'DestIndex',
 'DestCityNameIndex',
 'DestStateNameIndex']

In [139]:
cat_not_string = [field for field in categorical_fields if field not in string_fields]
cat_not_string

['Year', 'Month', 'DayofMonth', 'DayOfWeek']

In [140]:
input_to_encoder = cat_not_string +  index_cols
input_to_encoder

['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'Reporting_AirlineIndex',
 'Tail_NumberIndex',
 'OriginIndex',
 'OriginCityNameIndex',
 'OriginStateNameIndex',
 'DestIndex',
 'DestCityNameIndex',
 'DestStateNameIndex']

In [141]:
out_from_encoder = [field + 'OneHot' for field in cat_not_string + string_fields]
out_from_encoder

['YearOneHot',
 'MonthOneHot',
 'DayofMonthOneHot',
 'DayOfWeekOneHot',
 'Reporting_AirlineOneHot',
 'Tail_NumberOneHot',
 'OriginOneHot',
 'OriginCityNameOneHot',
 'OriginStateNameOneHot',
 'DestOneHot',
 'DestCityNameOneHot',
 'DestStateNameOneHot']

In [142]:
encoder = OneHotEncoder(inputCols=input_to_encoder, outputCols=out_from_encoder, handleInvalid='keep')
encoder

OneHotEncoder_1395bbd1f29c

### SparseVectors

The vectors produced by OneHotEncoder will each have only one non-zero value, but can potentially be very long. An efficient way to represent them is therefore a SparseVector, and that is what OneHotEncoder generates. 

A SparseVector is a data structure that only stores the length of the vector, a list of positions, and a list of values. All other values are assumed to be 0s.

This way, a vector like the following, with lenght 15 and non-zero values only on positions 3 and 9:

```python
[0.0, 0.0, 0.0, 6.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0]
```

can be compactly expressed as

```python
(15, [3, 9], [6.0, 4.0])
```

In [116]:
indexed = indexer.fit(sample).transform(sample)
encoded = encoder.fit(indexed).transform(indexed)
result = encoded.select('Tail_NumberOneHot')
result.show(10)

+-------------------+
|  Tail_NumberOneHot|
+-------------------+
| (5169,[182],[1.0])|
| (5169,[182],[1.0])|
| (5169,[100],[1.0])|
| (5169,[746],[1.0])|
| (5169,[761],[1.0])|
|(5169,[1111],[1.0])|
| (5169,[797],[1.0])|
|(5169,[1586],[1.0])|
|(5169,[1587],[1.0])|
| (5169,[114],[1.0])|
+-------------------+
only showing top 10 rows



In [117]:
continuous_fields

['Distance', 'DepHour']

In [118]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=out_from_encoder + continuous_fields, outputCol='features')

## Let's build our first Pipeline!

Our pipeline consists of a number of StringIndexers, followed by one OneHotEncoder, followed by a VectorAssembler, with a RandomForestClassifier at the end.

A Spark Pipeline is a single Estimator. We build it secifying the stages it comprises, and then we are ready to .fit it in one go. This will save us a lot of trouble, since we don't need to fit and transform each stage individually.

In [145]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(featuresCol='features',
                                    labelCol='Delayed')


pipeline = Pipeline(stages=[indexer,
                            encoder,
                            assembler,
                            classifier])

### StringIndexer stages

We only need to StringIndex some of the fields. We are going to build the input and output column names programatically.


### OneHotEncoderEstimator

One OneHotEncoderEstimator can handle all categorical columns. We are also going to build it programatically

### VectorAssembler

Once we have generated our features, we can assemble them into a single features column, together with the continuous_fields.

### RandomForestClassifier

Aaaaand we are ready to do some Machine Learning! We'll use a RandomForestClassifier to try to predict delayed versus non delayed flights, a binary classification task.

### Pipeline!

Now that we have all the stages, we are finally ready to put them together into a single Estimator, our Pipeline.

Now that we have gone to the trouble of building our Pipeline, fitting it and using it to predict the probabilty of delay on unseen data is as easy as using a single Estimator:

In [125]:
output_of_pipeline = pipeline.fit(sample)
output_of_pipeline

PipelineModel_9dbcbe5f6f17

## Evaluating and tuning our Pipeline

Probably the most interesting use of Spark Pipelines is quickly (in terms of coding time) evaluating many combinations of hyperparameters to feed our model and choosing the best ones. For that, we can use a TrainValidationSplit or a CrossValidator. The CrossValidator will generally perform better, but it will take several times as much. I'm using here the TrainValidationSplit because the API is the same.

In [144]:
from pyspark.ml.tuning import TrainValidationSplit, CrossValidator

### Params and Evaluators

In order to evaluate different sets of parameters, we need a) the set of parameters to iterate through and b) a metric to compare the results. 

The first element is represented by ParamMaps, which we build with a ParamGridBuilder, and the second by an Evaluator that needs to be specific to the relevant task.

In [129]:
print(classifier.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [149]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

param_grid = ParamGridBuilder().addGrid(classifier.maxDepth, [5])\
                               .addGrid(classifier.numTrees, [5])\
                               .build()

evaluator = BinaryClassificationEvaluator(labelCol='Delayed')


tv_split = CrossValidator(estimator=pipeline, 
                                estimatorParamMaps=param_grid, 
                                evaluator=evaluator)                     

tv_split

CrossValidator_95198614d388

We now have all the elements in place to perform our fit:

In [150]:
model = tv_split.fit(sample)
model

CrossValidatorModel_262974e2dd77

In [151]:
model.getEstimator()

Pipeline_70726c1bde3e

In [153]:
model.getEstimatorParamMaps()

[{Param(parent='RandomForestClassifier_dc199e0e58f6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
  Param(parent='RandomForestClassifier_dc199e0e58f6', name='numTrees', doc='Number of trees to train (>= 1).'): 5}]

And now we can predict on the rest of the flights and compare them with reality:

### Let's have a look

We are now ready to compare our predictions with reality. Do these features have any predictive power at all?

Not bad, considering we have not performed any feature engineering at all!

### Further Reading

https://spark.apache.org/docs/latest/ml-tuning.html

https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark