## Tuning Model Parameters

In this exercise, you will optimise the parameters for a classification model.

### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [1]:
import os
os.environ["HADOOP_USER_NAME"] = "spark"
os.environ["SPARK_MAJOR_VERSION"] = "2"
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
import findspark
findspark.init()
import pyspark

In [2]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
spark = SparkSession.builder.appName('python-parameter-tuning').getOrCreate()
spark.conf.set('spark.executor.memory', '3g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set('spark.driver.memory','3g')

In [4]:
# Load the source data
csv = spark.read.csv('/user/maria_dev/data/flights.csv', inferSchema=True, header=True)

# Select features and label
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))

# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

### Define the Pipeline
Now define a pipeline that creates a feature vector and trains a classification model

In [5]:
# Define the pipeline
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
lr = LogisticRegression(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[assembler, lr])

### Tune Parameters
You can tune parameters to find the best model for your data. A simple way to do this is to use  **TrainValidationSplit** to evaluate each combination of parameters defined in a **ParameterGrid** against a subset of the training data in order to find the best performing parameters.

In [6]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1, 0.01]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, [0.35, 0.30]).build()
tvs = TrainValidationSplit(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)

model = tvs.fit(train)

### Test the Model
Now you're ready to apply the model to the test data.

In [7]:
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "probability", "trueLabel")
predicted.show(100)

+--------------------+----------+--------------------+---------+
|            features|prediction|         probability|trueLabel|
+--------------------+----------+--------------------+---------+
|[1.0,1.0,10140.0,...|       0.0|[0.89152309189313...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.86011603366836...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90805384511236...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90147768869811...|        0|
|[1.0,1.0,10140.0,...|       1.0|[0.63043653180965...|        1|
|[1.0,1.0,10140.0,...|       0.0|[0.92012041768991...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90815291522121...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.90158307652015...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.89459783850171...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.86214225810458...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.94795951519727...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.94406166381961...|        0|
|[1.0,1.0,10140.0,...|   

### Compute Confusion Matrix Metrics
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision* and *recall* can be calculated.

In [8]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          113347.0|
|       FP|           11409.0|
|       TN|          637545.0|
|       FN|           48266.0|
|Precision|0.9085494886017507|
|   Recall|0.7013482826257789|
+---------+------------------+



### Review the Area Under ROC
Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. the spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this.

In [9]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print "AUR = ", aur

AUR =  0.841883841846


### Identify the optimal parameters

In [16]:
bestModel = model.bestModel.stages[-1]
print('Best Param (regParam): {0}'.format(bestModel._java_obj.getRegParam()))
print('Best Param (maxIter): {0}'.format(bestModel._java_obj.getMaxIter()))
print('Best Param (threshold): {0}'.format(bestModel._java_obj.getThreshold()))

Best Param (regParam): 0.01
Best Param (maxIter): 10
Best Param (threshold): 0.35
