## Spark MLlib Classification example

This notebook demonstrates the use of Spark MLlib features for binary classification


In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('spark://spark-master:7077')\
                            .appName('flightArrivalTimeClassification').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/24 05:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
flights = spark.read.csv('hdfs://spark-master:9000/user/trainer/data/flights.csv', inferSchema=True, header=True)

                                                                                

In [4]:
flights.printSchema()

root
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)



In [5]:
flights.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

label each data point as to whether arrival was 15mins delayed or not

In [6]:
labelled = flights.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay",
                          ((col("ArrDelay") > 15).cast("Int").alias("label")))

labelled.show(10)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
|        19|        5|     DL|          15016|        11433|      28|    1|
|        19|        5|     DL|          11193|        12892|      -6|    0|
|        19|        5|     DL|          10397|        15016|      -1|    0|
|        19|        5|     DL|          15016|        10397|       0|    0|
|        19|        5|     DL|          10397|        14869|      15|    1|
|        19|        5|     DL|          10397|        10423|      33|    1|
|        19|        5|     DL|          11278|        10397|     323|    1|
+----------+

train-test split. Here, we're using a random 70/30 split. making use of sparks `randomSplit` function

Note how `randomSplit` is a function on the dataframe

In [7]:
splits = labelled.randomSplit([0.7, 0.3])

train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")

train_rows = train.count()
test_rows = test.count()

print("Training Rows:", train_rows, " Testing Rows:", test_rows)



Training Rows: 1891168  Testing Rows: 811050


                                                                                

#### Define the Pipeline

A pipeline consists of a series of transformer and estimator stages that typically prepare a DataFrame for modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:

- A StringIndexer estimator that converts string values to indexes for categorical features
- A VectorAssembler that combines categorical features into a single vector
- A VectorIndexer that creates indexes for a vector of categorical features
- A VectorAssembler that creates a vector of continuous numeric features
- A MinMaxScaler that normalizes continuous numeric features
- A VectorAssembler that creates a vector of categorical and continuous features
- A DecisionTreeClassifier that trains a classification model.

In [8]:
from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [9]:
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")

catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")

catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")

minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

create model object and build the pipeline

In [10]:
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)

# an alternative model
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

This is the line that will require processing resources

In [11]:
pipelineModel = pipeline.fit(train)


                                                                                

Now we will get the trained model to give it's predictions for the test data

In [12]:
prediction = pipelineModel.transform(test)

prediction.show()

[Stage 27:>                                                         (0 + 1) / 1]

+----------+---------+-------+---------------+-------------+--------+---------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|trueLabel|CarrierIdx|         catFeatures|      idxCatFeatures|numFeatures|        normFeatures|            features|       rawPrediction|         probability|prediction|
+----------+---------+-------+---------------+-------------+--------+---------+----------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|         1|        1|     9E|          10423|        11433|      -5|        0|      10.0|[10.0,1.0,1.0,104...|[10.0,1.0,0.0,104...|     [-5.0]|[0.03011422637590...|[10.0,1.0,0.0,104...|[1.62736339397519...|[0.83580812997316...|       0.0|
|         1|        1|     9E|          

                                                                                

In [13]:
# display a little clearer
# select features, prediction and trueLabel
predicted = prediction.select("features", "prediction", "trueLabel")

predicted.show(100, truncate=False)

[Stage 28:>                                                         (0 + 1) / 1]

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10423.0,11433.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.027518172377985463]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.029075804776739357]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.030633437175493248]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030633437175493248]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.05192107995846314] |0.0       |1        |
|[10.0,1.0,0.0,10529.0,13487.0,0.030633437175493248]|0.0       |1        |
|[10.0,1.0,0.0,10529.0,14492.0,0.0430944963655244]  |0.0       |1        |
|[10.0,1.0,0.0,10693.0,12478.0,0.03686396677050883] |0.0       |0        |
|[10.0,1.0,0.0,10693.0,12

                                                                                

To evaluate the accuracty, we can create some metrics, calculate:

- true positives
- true negatives
- false positives
- false negatives

In [14]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())

pr = tp / (tp + fp)
re = tp / (tp + fn)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])

metrics.show()

                                                                                

+---------+-------------------+
|   metric|              value|
+---------+-------------------+
|       TP|            19188.0|
|       FP|               77.0|
|       TN|           649193.0|
|       FN|           142592.0|
|Precision| 0.9960031144562679|
|   Recall|0.11860551366052664|
|       F1|0.21196939987295976|
+---------+-------------------+



#### Further improvements

We could review the Area Under ROC

Another way to assess the performance of a classification model is to measure the area under a ROC (Receiver Operating Characteristic) curve for the model. the spark.ml library includes a BinaryClassificationEvaluator class that we can use to compute this.

The ROC curve shows the True Positive and False Positive rates plotted for varying thresholds.



In [15]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

[Stage 53:>                                                         (0 + 2) / 2]

AUR =  0.922612814761315


                                                                                