# Part 3 Building the Model

This notebook goes through the process of selecting and building the best model to use with this problem. There are several ML algorithm options available in the SparkML library. As this is a binary classification problem, we will use Logistic Regression, Random Forest and Gradient Boosted Tree classifiers.

The aim here is to train all three kinds of models and then test to see which provides the best results. The next step is to optimise i.e. the chosen models hyper parameters to then have the best option for solving this problem. 

## Start the Spark Session
You might need to make changes to the `spark.executor.memory` and `spark.executor.instances` settings. SparkML processes are more memory intensive and therefore require a different configuration to run efficiently.

In [None]:
from pyspark.sql import SparkSession
import os

s3_bucket = os.getenv("STORAGE")

spark = SparkSession\
    .builder\
    .appName("Airline ML")\
    .config("spark.executor.memory","8g")\
    .config("spark.executor.cores","4")\
    .config("spark.driver.memory","6g")\
    .config("spark.yarn.access.hadoopFileSystems",s3_bucket)\
    .getOrCreate()

## Import the Data
The data that is imported was the smaller data set created in Part 2. This is now stored in S3 in parquet format and will therefore load much quicker. To get the larger ML jobs to complete in a reasonable amount of time for a workshop audience, there total data set size is limited to approx 100,000 sampled rows.

In [None]:
flight_df_original = spark.sql("select * from smaller_flight_table")

flight_df = flight_df_original.na.drop().sample(1/600,seed=111)
flight_df.persist()

In [None]:
#Adding a link to the Spark UI for demo purposes
from IPython.core.display import HTML
import os
HTML('<a href="http://spark-{}.{}">Spark UI</a>'.format(os.getenv("CDSW_ENGINE_ID"),os.getenv("CDSW_DOMAIN")))

In [None]:
flight_df.printSchema()

## Basic Feature Engineering
Before applying any training to an ML algorithm, you need to check if the format of columns in your dataset will work with the chosen algorithm. The  change that we need to make is extracting the hour of the day from the time. This is based on an assumption of seasonality during the day. The ML aglorithms needs floating point numbers for optimal internal matrix maths, therefore these two new columns are cast using `cast('double')`.

> HANDY TIP
> 
> Each new release of PySpark brings with it more useful functions that makes it easier to do things that you had to use a UDF to do.
> Even with Apache Arrow used for memory storage, using a Python UDF in a pyspark tranform is going to be slower than running pure Spark functions. After spending some time reading through the pyspark function list, I found that I could do what my udf did using a the `when` and `otherwise` functions.

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf,substring,weekofyear,concat,col,when,length,lit,hour

#No longer need to use a UDF as if else in pyspark is when otherwise
#convert_time_to_hour = udf(lambda x: x if len(x) == 4 else "0{}".format(x),StringType())

flight_df = flight_df\
    .withColumn(
        'CRS_DEP_HOUR',
        when(
            length(col("CRS_DEP_TIME")) == 4,col("CRS_DEP_TIME")
        )\
        .otherwise(concat(lit("0"),col("CRS_DEP_TIME")))
    )

flight_df = flight_df.withColumn('CRS_DEP_HOUR',col('CRS_DEP_TIME').cast('double'))

In [None]:
flight_df.printSchema()

## Additional Feature Indexing and Encoding

The next feature engineering requirement is to convert the categorical variables into a numeric form. The `OP_CARRIER`,`ORIGIN` and `DEST` columns are the ones that will be used. All three are `string` type categorical variables. There are 2 methods of dealing with these, one is using a [String Indexer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer) and the other is using a [One Hot Encoder](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder). Logsitic Regression requires one hot encoding, [tree based algorithims do not](https://roamanalytics.com/2016/10/28/are-categorical-variables-getting-lost-in-your-random-forests/). The code below creates a [Spark Pipeline](https://spark.apache.org/docs/latest/ml-pipeline.html) that assembles the process to get the columns needed, do the indexing and encoding and create an packaged vector that the various models will be trained on.

> HANDY TIP
>
> `OneHotEncoderEstimator` is a function introduced in Spark 2.3 and works on severnal indexed columns in one go, rather than having to do one `OneHotEncoder` per indexed feature.

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, OneHotEncoderEstimator

numeric_cols = ["CRS_ELAPSED_TIME","DISTANCE","WEEK","CRS_DEP_HOUR"]

op_carrier_indexer = StringIndexer(inputCol ='OP_CARRIER', outputCol = 'OP_CARRIER_INDEXED',handleInvalid="keep")

origin_indexer = StringIndexer(inputCol ='ORIGIN', outputCol = 'ORIGIN_INDEXED',handleInvalid="keep")

dest_indexer = StringIndexer(inputCol ='DEST', outputCol = 'DEST_INDEXED',handleInvalid="keep")

indexer_encoder = OneHotEncoderEstimator(
    inputCols = ['OP_CARRIER_INDEXED','ORIGIN_INDEXED','DEST_INDEXED'],
    outputCols= ['OP_CARRIER_ENCODED','ORIGIN_ENCODED','DEST_ENCODED']
)

input_cols=[
    'OP_CARRIER_ENCODED',
    'ORIGIN_ENCODED',
    'DEST_ENCODED'] + numeric_cols

assembler = VectorAssembler(
    inputCols = input_cols,
    outputCol = 'features')

from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages=[
        op_carrier_indexer,
        origin_indexer,
        dest_indexer,
        indexer_encoder,
        assembler]
)


## Create the Training and Test Datasets
Once the pipeline has been built the next step is to `fit` the pipeline to the data, and then `transform` it into the final vector form. There after the data needs to split into a test and training set. This is done using `randomSplit`

In [None]:
pipelineModel = pipeline.fit(flight_df)
model_df = pipelineModel.transform(flight_df)
selectedCols = ['CANCELLED', 'features']# + cols
model_df = model_df.select(selectedCols)
model_df.printSchema()
(train, test) = model_df.randomSplit([0.7, 0.3])

## Model 1 - Logistic Regression
Now that the vector is assembled and ready to run, the first model we will try is good old faithful logsitic regression.

_Note: this is a bit too accruate to be considered valid_

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'CANCELLED', maxIter=10)

lrModel = lr.fit(train)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictionslr = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionslr)

## Save the Pipeline model
This next cell writes the fitted pipeline model back to S3 so it can be used for other processes to build/trainin different models.

In [None]:
#pipelineModel.write().overwrite().save("s3a://ml-field/demo/flight-analysis/data/models/pipeline_model")

## Rebuild the Pipeline
For the tree based classifiers, the data does not need to be one hot encoded. Being able to use the string indexed values significantly reduces the dimensionality of the feature vecotor and vastly improves processing time.

In [None]:
input_cols = [
    'OP_CARRIER_INDEXED',
    'ORIGIN_INDEXED',
    'DEST_INDEXED'] + numeric_cols

assembler = VectorAssembler(
    inputCols = input_cols,
    outputCol = 'features')

pipeline = Pipeline(
    stages=[
        op_carrier_indexer,
        origin_indexer,
        dest_indexer,
        assembler]
)

pipelineModel = pipeline.fit(flight_df)

model_df = pipelineModel.transform(flight_df)
selectedCols = ['CANCELLED', 'features']
model_df = model_df.select(selectedCols)
model_df.printSchema()
(train, test) = model_df.randomSplit([0.7, 0.3])

## Model 2 - Random Forest
The `RandomForestClassifier` is run on the updated pipeline and the results are really bad. Its bad performance is likely because of the large number of categories in the categorical variables. To get it to even run, the `maxBins` setting had to be increased to 390, which far above the default of 32.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rfclassifier = RandomForestClassifier(labelCol = 'CANCELLED', featuresCol = 'features', maxBins=390)
rfmodel = rfclassifier.fit(train)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictionsrf = rfmodel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionsrf)

## Model 2 - Gradient Boosted Tree
The GBTClassifier ran for the longest of all three algorithms but did perform significantly better than the the random forest.

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10,featuresCol = 'features', labelCol = 'CANCELLED', maxBins=390)

gbtModel = gbt.fit(train)

In [None]:
predictionsgbt = gbtModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionsgbt)

## Optimising The Model
The final step is to pick the model that peformed the best and then try optimising the various hyper parameters to see if the model's performance can be improved. For linear regression the three parameters that can have a material difference are:

+ `regParam`
+ `elasticNetParam`
+ `maxIter`

To get a list of all the parameters in a model, run `print(<your model object>.explainParams())`.

In [None]:
print(lrModel.explainParams())

# Rebuild the Pipeline Again
We need to go back to the one hot encoded pipeline used for the `lrModel`

In [None]:
input_cols=[
    'OP_CARRIER_ENCODED',
    'ORIGIN_ENCODED',
    'DEST_ENCODED'] + numeric_cols

assembler = VectorAssembler(
    inputCols = input_cols,
    outputCol = 'features')

from pyspark.ml import Pipeline

pipeline = Pipeline(
    stages=[
        op_carrier_indexer,
        origin_indexer,
        dest_indexer,
        indexer_encoder,
        assembler]
)

pipelineModel = pipeline.fit(flight_df)

model_df = pipelineModel.transform(flight_df)
selectedCols = ['CANCELLED', 'features']
model_df = model_df.select(selectedCols)
model_df.printSchema()
(train, test) = model_df.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lrModel.maxIter, [5, 10,100])
             .addGrid(lrModel.regParam, [0.0, 0.01, 0.1])
             .addGrid(lrModel.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=3)

cvModel = cv.fit(train)

In [None]:
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

In [None]:
print("regParam = {}".format(cvModel.bestModel._java_obj.getRegParam()))
print("elasticNetParam = {}".format(cvModel.bestModel._java_obj.getElasticNetParam()))
print("maxIter = {}".format(cvModel.bestModel._java_obj.getMaxIter()))

In [None]:
import matplotlib.pyplot as plt 
#trainingSummary = cvModel.bestModel.summary
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
def plotter():
    plt.plot(roc['FPR'],roc['TPR'])
    plt.ylabel('False Positive Rate')
    plt.xlabel('True Positive Rate')
    plt.title('ROC Curve')
plotter()
#print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))