# Train the model

Margie's Travel wants to build a model to predict if a departing flight will have a 15-minute or greater delay. In the historical data they have provided, the indicator for such a delay is found within the DepDel15 (where a value of 1 means delay, 0 means no delay). To create a model that predicts such a binary outcome, we can choose from the various Two-Class algorithms provided by Spark MLlib. For our purposes, we choose Decision Tree. This type of classification module needs to be first trained on sample data that includes the features important to making a prediction and must also include the actual historical outcome for those features. 

The typical pattern is to split the historical data so a portion is shown to the model for training purposes, and another portion is reserved to test just how well the trained model performs against examples it has not seen before.

In [0]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import array, col, lit

## Load the cleaned flight and weather data

Load the data from the global table.

In [0]:
dfDelays = spark.sql("select OriginAirportCode, cast(Month as int) Month, cast(DayofMonth as int) DayofMonth, CRSDepHour, cast(DayOfWeek as int) DayOfWeek, Carrier, DestAirportCode, DepDel15, WindSpeed, SeaLevelPressure, HourlyPrecip from flight_delays_with_weather")
cols = dfDelays.columns

In [0]:
display(dfDelays)

In [0]:
%sql
select OriginAirportCode, DestAirportCode, count(DepDel15)
from flight_delays_with_weather where DepDel15 = 1
group by OriginAirportCode, DestAirportCode
ORDER BY count(DepDel15) desc

OriginAirportCode,DestAirportCode,count(DepDel15)
LAX,SFO,3385
SFO,LAX,2690
ATL,LGA,2193
ORD,LGA,2003
ORD,SFO,1925
LAS,SFO,1706
LGA,ATL,1701
SAN,SFO,1686
ORD,EWR,1683
LAX,LAS,1632


## Sampling the data

In [0]:
dfDelays.groupBy("DepDel15").count().show()

Judging by the delay counts, there are almost four times as many non-delayed records as there are delayed.

We want to ensure our model is sensitive to the delayed samples. To do this, we can use stratified sampling provided by the `sampleBy()` function. First we create fractions of each sample type to be returned. In our case, we want to keep all instances of delayed (value of 1) and downsample the not delayed instances to 30%.

In [0]:
fractions = {0: .30, 1: 1.0}
trainingSample = dfDelays.sampleBy("DepDel15", fractions, 36)
trainingSample.groupBy("DepDel15").count().show()

## Select an algorithm and transform features

In [0]:
categoricalColumns = ["OriginAirportCode", "Carrier", "DestAirportCode"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoderEstimator to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(dropLast=False, inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="DepDel15", outputCol="label")
stages += [label_stringIdx]

Now we need to use the `VectorAssembler` to combine all the feature columns into a single vector column. This includes our numeric columns as well as the one-hot encoded binary vector columns.

In [0]:
# Transform all features into a vector using VectorAssembler
numericCols = ["Month", "DayofMonth", "CRSDepHour", "DayOfWeek", "WindSpeed", "SeaLevelPressure", "HourlyPrecip"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

## Create and train the Decision Tree model

In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = trainingSample.randomSplit([0.7, 0.3], seed=100)
# We want to have two copies of the training and testing data, since the pipeline runs transformations and we want to run a couple different iterations
trainingData2 = trainingData
testData2 = testData
print(trainingData.count())
print(testData.count())

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
stages += [dt]

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(trainingData)
trainingData = pipelineModel.transform(trainingData)
# Keep relevant columns
selectedcols = ["label", "features"] + cols
trainingData = trainingData.select(selectedcols)
display(trainingData)

label,features,OriginAirportCode,Month,DayofMonth,CRSDepHour,DayOfWeek,Carrier,DestAirportCode,DepDel15,WindSpeed,SeaLevelPressure,HourlyPrecip
1.0,"List(0, 157, List(45, 70, 84, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 1.0, 9.0, 3.0, 5.0, 29.799999237060547))",ABQ,5,1,9,3,OO,DEN,1,5.0,29.8,0.0
0.0,"List(0, 157, List(45, 70, 91, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 1.0, 12.0, 3.0, 13.0, 29.75))",ABQ,5,1,12,3,OO,IAH,0,13.0,29.75,0.0
0.0,"List(0, 157, List(45, 70, 81, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 1.0, 12.0, 3.0, 13.0, 29.75))",ABQ,5,1,12,3,OO,ORD,0,13.0,29.75,0.0
0.0,"List(0, 157, List(45, 70, 82, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 1.0, 13.0, 3.0, 15.0, 29.739999771118164))",ABQ,5,1,13,3,OO,LAX,0,15.0,29.74,0.0
0.0,"List(0, 157, List(45, 70, 104, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 1.0, 17.0, 3.0, 25.0, 29.75))",ABQ,5,1,17,3,OO,SLC,0,25.0,29.75,0.0
0.0,"List(0, 157, List(45, 70, 85, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 2.0, 5.0, 4.0, 29.0, 30.290000915527344))",ABQ,5,2,5,4,OO,SFO,0,29.0,30.29,0.0
0.0,"List(0, 157, List(45, 70, 104, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 2.0, 17.0, 4.0, 5.0, 30.309999465942383))",ABQ,5,2,17,4,OO,SLC,0,5.0,30.31,0.0
0.0,"List(0, 157, List(45, 70, 91, 150, 151, 152, 153, 154, 155), List(1.0, 1.0, 1.0, 5.0, 3.0, 6.0, 5.0, 5.0, 30.34000015258789))",ABQ,5,3,6,5,OO,IAH,0,5.0,30.34,0.0
1.0,"List(0, 157, List(45, 72, 81, 150, 151, 152, 153, 155), List(1.0, 1.0, 1.0, 5.0, 3.0, 8.0, 5.0, 30.309999465942383))",ABQ,5,3,8,5,MQ,ORD,1,0.0,30.31,0.0
0.0,"List(0, 157, List(45, 66, 106, 150, 151, 152, 153, 155), List(1.0, 1.0, 1.0, 5.0, 3.0, 8.0, 5.0, 30.309999465942383))",ABQ,5,3,8,5,UA,IAD,0,0.0,30.31,0.0


Let's make predictions on our test dataset using the `transform()`, which will only use the 'features' column. We'll display the prediction's schema afterward so you can see the three new prediction-related columns.

In [0]:
# Make predictions on test data using the Transformer.transform() method.
predictions = pipelineModel.transform(testData)

Let's evaluate the Decision Tree model with `BinaryClassificationEvaluator`.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Now we will try tuning the model with the `ParamGridBuilder` and the `CrossValidator`.

As we indicate 3 values for maxDepth and 3 values for maxBin, this grid will have 3 x 3 = 9 parameter settings for `CrossValidator` to choose from. We will create a 3-fold CrossValidator.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [0]:
# Create 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Run cross validations (this can take several minutes to execute)
cvModel = cv.fit(trainingData2)

In [0]:
predictions = cvModel.transform(testData2)

In [0]:
evaluator.evaluate(predictions)

In [0]:
selected = predictions.select("label", "prediction", "probability", "OriginAirportCode", "DestAirportCode")
display(selected)

label,prediction,probability,OriginAirportCode,DestAirportCode
0.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,DEN
0.0,0.0,"List(1, 2, List(), List(0.6988151665954008, 0.30118483340459923))",ABQ,IAD
0.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,PHX
1.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,IAH
0.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,PHX
1.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,LAX
0.0,0.0,"List(1, 2, List(), List(0.6988151665954008, 0.30118483340459923))",ABQ,ORD
0.0,1.0,"List(1, 2, List(), List(0.4388121744094769, 0.561187825590523))",ABQ,IAH
0.0,0.0,"List(1, 2, List(), List(0.6988151665954008, 0.30118483340459923))",ABQ,DEN
0.0,0.0,"List(1, 2, List(), List(0.6988151665954008, 0.30118483340459923))",ABQ,LAX


In [0]:
bestModel = cvModel.bestModel
finalPredictions = bestModel.transform(dfDelays)
evaluator.evaluate(finalPredictions)

## Save the model

In [0]:
# Save the best model under /dbfs/flightDelayModel
bestModel.write().overwrite().save("/flightDelayModel")