## Evaluating a Classification Model

In this exercise, you will create a pipeline for a classification model, and then apply commonly used metrics to evaluate the resulting classifier.

### Prepare the Data

First, import the libraries you will need and prepare the training and test data:

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Load the source data
wkdir ='file:///mnt/c/Users/Adura/Google Drive/Projects/Jupyter/SparkMs/data/'
csv = spark.read.csv(wkdir + 'flights.csv', inferSchema=True, header=True)

# Select features and label
data = csv.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))

# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")



### Define the Pipeline and Train the Model
Now define a pipeline that creates a feature vector and trains a classification model

In [2]:
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
pipeline = Pipeline(stages=[assembler, lr])
model = pipeline.fit(train)

### Test the Model
Now you're ready to apply the model to the test data.

In [3]:
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+-------------------------------+----------+---------+
|features                       |prediction|trueLabel|
+-------------------------------+----------+---------+
|[1.0,1.0,10140.0,10397.0,-4.0] |0.0       |0        |
|[1.0,1.0,10140.0,10821.0,8.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11259.0,-1.0] |0.0       |0        |
|[1.0,1.0,10140.0,11259.0,0.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11292.0,0.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11292.0,2.0]  |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-10.0]|0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-2.0] |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,-1.0] |0.0       |0        |
|[1.0,1.0,10140.0,11298.0,87.0] |0.0       |1        |
|[1.0,1.0,10140.0,12191.0,-3.0] |0.0       |0        |
|[1.0,1.0,10140.0,12191.0,16.0] |0.0       |0        |
|[1.0,1.0,10140.0,12266.0,-8.0] |0.0       |0        |
|[1.0,1.0,10140.0,12266.0,-5.0] |0.0       |0        |
|[1.0,1.0,10140.0,12266.0,27.0] |0.0       |1        |
|[1.0,1.0,

### Compute Confusion Matrix Metrics
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision* and *recall* can be calculated.

In [4]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])
metrics.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            19273.0|
|       FP|               78.0|
|       TN|           649127.0|
|       FN|           141312.0|
|Precision| 0.9959692005581107|
|   Recall|0.12001743624871564|
+---------+-------------------+



### View the Raw Prediction and Probability
The prediction is based on a raw prediction score that describes a labelled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (in this case, 0 and 1). The value with the highest confidence is selected as the prediction.

In [5]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+------------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                             |probability                             |prediction|trueLabel|
+------------------------------------------+----------------------------------------+----------+---------+
|[1.6017642570554997,-1.6017642570554997]  |[0.8322648199729293,0.1677351800270707] |0.0       |0        |
|[1.4350634393056974,-1.4350634393056974]  |[0.807689032481294,0.19231096751870602] |0.0       |0        |
|[1.5620337023890425,-1.5620337023890425]  |[0.8266449823758082,0.17335501762419184]|0.0       |0        |
|[1.5480510800652163,-1.5480510800652163]  |[0.8246320689820911,0.17536793101790904]|0.0       |0        |
|[1.548135965571093,-1.548135965571093]    |[0.8246443442780453,0.1753556557219547] |0.0       |0        |
|[1.5201707209234403,-1.5201707209234403]  |[0.8205636187406454,0.17943638125935457]|0.0       |0        |
|[1.6879776225376968,-1.6879776225376

Note that the results include rows where the probability for 0 (the first value in the **probability** vector) is only slightly higher than the probability for 1 (the second value in the **probability** vector). The default *discrimination threshold* (the boundary that decides whether a probability is predicted as a 1 or a 0) is set to 0.5; so the prediction with the highest probability is always used, no matter how close to the threshold.

### Review the Area Under ROC
Another way to assess the performance of a classification model is to measure the area under a ROC curve for the model. the spark.ml library includes a **BinaryClassificationEvaluator** class that you can use to compute this. The ROC curve shows the True Positive and False Positive rates plotted for varying thresholds.

In [6]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print("AUR = ", aur)

AUR =  0.9228408488404625


### Change the Discrimination Threshold
The AUC score seems to indicate a reasonably good model, but the performance metrics seem to indicate that it predicts a high number of False Negative labels (i.e. it predicts 0 when the true label is 1), leading to a low Recall. You can affect the way a model performs by changing its parameters. For example, as noted previously, the default discrimination threshold is set to 0.5 - so if there are a lot of False Positives, you may want to consider raising this; or conversely, you may want to address a large number of False Negatives by lowering the threshold.

In [7]:
lr2 = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3, threshold=0.35)
pipeline2 = Pipeline(stages=[assembler, lr2])
model2 = pipeline2.fit(train)
newPrediction = model2.transform(test)
newPrediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+------------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                             |probability                             |prediction|trueLabel|
+------------------------------------------+----------------------------------------+----------+---------+
|[1.6017642570554924,-1.6017642570554924]  |[0.8322648199729282,0.16773518002707172]|0.0       |0        |
|[1.4350634393056905,-1.4350634393056905]  |[0.807689032481293,0.1923109675187071]  |0.0       |0        |
|[1.562033702389036,-1.562033702389036]    |[0.8266449823758073,0.17335501762419275]|0.0       |0        |
|[1.5480510800652096,-1.5480510800652096]  |[0.82463206898209,0.17536793101790998]  |0.0       |0        |
|[1.5481359655710865,-1.5481359655710865]  |[0.8246443442780444,0.1753556557219556] |0.0       |0        |
|[1.5201707209234339,-1.5201707209234339]  |[0.8205636187406445,0.17943638125935552]|0.0       |0        |
|[1.6879776225376903,-1.6879776225376

Note that some of the **rawPrediction** and **probability** values that were previously predicted as 0 are now predicted as 1

In [8]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", tp2 / (tp2 + fp2)),
 ("Recall", tp2 / (tp2 + fn2))],["metric", "value"])
metrics2.show()

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            41999.0|
|       FP|              123.0|
|       TN|           649082.0|
|       FN|           118586.0|
|Precision| 0.9970799107354826|
|   Recall|0.26153750350281785|
+---------+-------------------+



Note that there are now more True Positives and less False Negatives, and Recall has improved. By changing the discrimination threshold, the model now gets more predictions correct - though it's worth noting that the number of False Positives has also increased.