## Machine Learning Pipelines

At the core of the `pyspark.ml` module are the `Transformer` and `Estimator` classes. Almost every other class in the module behaves similarly to these two basic classes.

The `Transformer` classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class `Bucketizer` to create discrete bins from a continuous feature or the class `PCA` to reduce the dimensionality of your dataset using principal component analysis.

Estimator classes all implement a `.fit()` method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a `StringIndexerModel` for including categorical data saved as strings in your models, or a `RandomForestModel` that uses the random forest algorithm for classification or regression.

You'll be working to build a model that predicts whether or not a flight will be delayed based on the flights data

### Index 

1. Put a pandas DataFrame into a Spark cluster
2. Manipulating and join data
3. Making a Boolean
4. Strings and factors
5. Assemble a vector
6. Create the pipeline
7. Transform the data
8. Test vs Train
9. Create the modeler
10. Cross validation
11. Create the evaluator
12. Make a grid
13. Make the validator
14. Fit the model(s)
15. Evaluate the model

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

from pyspark.sql.types import *
from pyspark.ml import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
from pyspark.ml.classification import LogisticRegression

sc = SparkContext(master = "local", appName = "ML Pipelines") 
spark = SparkSession(sc)
sqlContext = SQLContext(spark)

In [2]:
import pandas as pd

url = 'https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv'
flights = pd.read_csv(url)
flights.head()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658.0,-7.0,935.0,-5.0,VX,N846VA,1780,SEA,LAX,132.0,954,6.0,58.0
1,2014,1,22,1040.0,5.0,1505.0,5.0,AS,N559AS,851,SEA,HNL,360.0,2677,10.0,40.0
2,2014,3,9,1443.0,-2.0,1652.0,2.0,VX,N847VA,755,SEA,SFO,111.0,679,14.0,43.0
3,2014,4,9,1705.0,45.0,1839.0,34.0,WN,N360SW,344,PDX,SJC,83.0,569,17.0,5.0
4,2014,3,9,754.0,-1.0,1015.0,1.0,AS,N612AS,522,SEA,BUR,127.0,937,7.0,54.0


In [3]:
url = 'https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv'
planes = pd.read_csv(url)
planes.head()

Unnamed: 0,tailnum,year,type,manufacturer,model,engines,seats,speed,engine
0,N102UW,1998.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
1,N103US,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
2,N104UW,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
3,N105UW,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan
4,N107US,1999.0,Fixed wing multi engine,AIRBUS INDUSTRIE,A320-214,2,182,,Turbo-fan


## 1. Put a pandas DataFrame into a Spark cluster

The `.createDataFrame()` method takes a pandas DataFrame and returns a Spark DataFrame.

In [4]:
# Create a Spark DataFrame from a pandas data frame 
flightSchema = StructType([
    StructField('year', IntegerType(), False),
    StructField('month', IntegerType(), False),
    StructField('day', IntegerType(), False),
    StructField('dep_time', FloatType(), False),
    StructField('dep_delay', FloatType(), False),
    StructField('arr_time', FloatType(), False),
    StructField('arr_delay', FloatType(), False),
    StructField('carrier', StringType(), False),
    StructField('tailnum', StringType(), False),
    StructField('flight', IntegerType(), False),
    StructField('origin', StringType(), False),
    StructField('dest', StringType(), False), 
    StructField('air_time', FloatType(), False), 
    StructField('distance', IntegerType(), False), 
    StructField('hour', FloatType(), False),
    StructField('minute', FloatType(), False)
])

#Create spark_temp from pd_temp
flightsDF = spark.createDataFrame(flights, flightSchema)
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|   658.0|     -7.0|   935.0|     -5.0|     VX| N846VA|  1780|   SEA| LAX|   132.0|     954| 6.0|  58.0|
|2014|    1| 22|  1040.0|      5.0|  1505.0|      5.0|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|10.0|  40.0|
|2014|    3|  9|  1443.0|     -2.0|  1652.0|      2.0|     VX| N847VA|   755|   SEA| SFO|   111.0|     679|14.0|  43.0|
|2014|    4|  9|  1705.0|     45.0|  1839.0|     34.0|     WN| N360SW|   344|   PDX| SJC|    83.0|     569|17.0|   5.0|
|2014|    3|  9|   754.0|     -1.0|  1015.0|      1.0|     AS| N612AS|   522|   SEA| BUR|   127.0|     937| 7.0|  54.0|
+----+-----+---+--------+---------+-----

In [5]:
planeSchema = StructType([
    StructField('tailnum', StringType(), False),
    StructField('year', FloatType(), False),
    StructField('type', StringType(), False),
    StructField('manufacturer', StringType(), False),
    StructField('model', StringType(), False),
    StructField('engines', IntegerType(), False),
    StructField('seats', IntegerType(), False),
    StructField('speed', FloatType(), False),
    StructField('engine', StringType(), False)
    ])

#Create spark_temp from pd_temp
planesDF = spark.createDataFrame(planes, planeSchema)
planesDF.show(5)

+-------+------+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|  year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+------+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998.0|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|  NaN|Turbo-fan|
| N103US|1999.0|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|  NaN|Turbo-fan|
| N104UW|1999.0|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|  NaN|Turbo-fan|
| N105UW|1999.0|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|  NaN|Turbo-fan|
| N107US|1999.0|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|  NaN|Turbo-fan|
+-------+------+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows



## 2. Manipulating and join data
Our model will also include information about the plane that flew the route, so the first step is to join the two tables: flights and planes.

In [6]:
# Rename year column
planesDF = planesDF.withColumnRenamed('year', 'plane_year')

# Join the DataFrames
model_data = flightsDF.join(planesDF, on = 'tailnum', how = "leftouter")

Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called `'doubles'` in Spark).

The `.cast()` method converts the type of a column. 

In [7]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast('integer'))
model_data = model_data.withColumn("air_time", model_data.air_time.cast('integer'))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast('integer'))

You converted just the column `plane_year` to an integer. This column holds the year each plane was manufactured. However, your model will use the planes' age, which is slightly different from the year it was made.

In [8]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

## 3. Making a Boolean

Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not.

In [9]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast('integer'))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and \
                               air_time is not NULL and plane_year is not NULL")

In [10]:
model_data.groupBy('carrier').count().show(5)

+-------+-----+
|carrier|count|
+-------+-----+
|     UA| 1002|
|     AA|   45|
|     B6|  210|
|     DL| 1080|
|     OO| 1176|
+-------+-----+
only showing top 5 rows



In [11]:
model_data.groupBy('dest').count().show(5)

+----+-----+
|dest|count|
+----+-----+
| MSY|    9|
| GEG|  103|
| SNA|  196|
| BUR|  135|
| EUG|   39|
+----+-----+
only showing top 5 rows



## 4. Strings and factors

You can create what are called 'one-hot vectors' to represent categorical features.

The first step to encoding categorical features is to create a `StringIndexer`. Members of this class are Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a `OneHotEncoder`. This works exactly the same way as the `StringIndexer` by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!

This may seem complicated, but don't worry! All you have to remember is that you need to create a `StringIndexer` and a `OneHotEncoder`, and the Pipeline will take care of the rest.

In [12]:
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carrier_fact')

In [13]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

## 5. Assemble a vector

The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the `pyspark.ml.feature` submodule contains a class called `VectorAssembler`. This `Transformer` takes all of the columns you specify and combines them into a new vector column.

In [14]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols = ["month", "air_time", "carrier_fact", "dest_fact",
                                           "plane_age"], outputCol = 'features')

## 6. Create the pipeline

Pipeline is a class in the `pyspark.ml` module that combines all the `Estimators` and `Transformers` that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object.

In [15]:
# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder
                                , carr_indexer, carr_encoder
                                , vec_assembler])

## 7. Transform the data
Now you're finally ready to pass your data through the Pipeline you created!

In [16]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [17]:
piped_data.select('dest_index'
                  , 'dest_fact'
                  , 'carrier_index'
                  , 'carrier_fact'
                  , 'features').show(5)

+----------+---------------+-------------+--------------+--------------------+
|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+----------+---------------+-------------+--------------+--------------------+
|      33.0|(68,[33],[1.0])|          5.0|(10,[5],[1.0])|(81,[0,1,7,45,80]...|
|      33.0|(68,[33],[1.0])|          5.0|(10,[5],[1.0])|(81,[0,1,7,45,80]...|
|      21.0|(68,[21],[1.0])|          4.0|(10,[4],[1.0])|(81,[0,1,6,33,80]...|
|       0.0| (68,[0],[1.0])|          4.0|(10,[4],[1.0])|(81,[0,1,6,12,80]...|
|      30.0|(68,[30],[1.0])|          4.0|(10,[4],[1.0])|(81,[0,1,6,42,80]...|
+----------+---------------+-------------+--------------+--------------------+
only showing top 5 rows



## 8. Test vs Train

After you've cleaned your data and gotten it ready for modeling, one of the most important steps is to split the data into a test set and a train set. After that, don't touch your test data until you think you have a good model! As you're building models and forming hypotheses, you can test them on your training data to get an idea of their performance.

Once you've got your favorite model, you can see how well it predicts the new data in your test set. This never-before-seen data will give you a much more realistic idea of your model's performance in the real world when you're trying to predict or classify new data.

In Spark it's important to make sure you split the data **after** all the transformations. This is because operations like `StringIndexer` don't always produce the same index even when given the same list of strings.

In [18]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

## 9. Create the modeler
The Estimator you'll be using is a `LogisticRegression` from the `pyspark.ml.classification` submodule.

In [19]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

## 10. Cross validation

In the next few exercises you'll be tuning your logistic regression model using a procedure called **k-fold** cross validation. This is a method of estimating the model's performance on unseen data (like your test DataFrame).

It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the **cross validation error** of the model, and is a good estimate of the actual error on the held out data.

You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, `elasticNetParam` and `regParam`, and using the cross validation error to compare all the different models so you can choose the best one!

## 11. Create the evaluator
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the `pyspark.ml.evaluation` submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the `BinaryClassificationEvaluator` from the `pyspark.ml.evaluation` module.

In [20]:
# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName = "areaUnderROC")

## 12. Make a grid

Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule `pyspark.ml.tuning` includes a class called `ParamGridBuilder` that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).

You'll need to use the `.addGrid()` and `.build()` methods to create a grid that you can use for cross validation. The `.addGrid()` method takes a model parameter (an attribute of the model Estimator, `lr`, that you created before) and a list of values that you want to try.

The `.build()` method takes no arguments, it just returns the grid that you'll use later.

In [21]:
import numpy as np
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

## 13. Make the validator
The submodule `pyspark.ml.tuning` also has a class called `CrossValidator` for performing cross validation. This Estimator takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

The submodule `pyspark.ml.tune` has already been imported as tune. You'll create the CrossValidator by passing it the logistic regression Estimator `lr`, the parameter grid, and the evaluator you created in the previous exercises.

In [22]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator = lr,
               estimatorParamMaps = grid,
               evaluator = evaluator)

## 14. Fit the model(s)

You're finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure. 

In [23]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

Remember, the training data is called training and you're using lr to fit a logistic regression model. Cross validation selected the parameter values `regParam=0` and `elasticNetParam=0` as being the best. These are the default values, so you don't need to do anything else with lr before fitting the model.

In [24]:
# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_4390a738d5c9, numClasses = 2, numFeatures = 81


## 15. Evaluate the model

In [25]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.7012770923204009


In [26]:
sc.stop() # close the spark session