In [1]:
import os
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Set environment variables for Java and Spark
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@21'
os.environ['SPARK_HOME'] = '/opt/spark'
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'

# Initialize Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("MyApp") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/17 11:37:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Flights data
flights.csv contains flight information, such as day of the week, and departure delays

In [2]:
csv = spark.read.csv('flights.csv', inferSchema=True, header=True)
csv.show(10)

                                                                                

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

# Prepare the data for classification
Select a subset of columns to use as features and create a Boolean label field named late. 1 for flights that arrived late and 0 for flights that arrived early or on-time.

In [3]:
data = csv.select("DayofMonth", "DayOfWeek",
                  "Carrier", "OriginAirportID",
                  "DestAirportID", "DepDelay",
                  ((col("ArrDelay") > 15).cast("Int").alias("Late")))
data.show(10)

+----------+---------+-------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+-------+---------------+-------------+--------+----+
|        19|        5|     DL|          11433|        13303|      -3|   0|
|        19|        5|     DL|          14869|        12478|       0|   0|
|        19|        5|     DL|          14057|        14869|      -4|   0|
|        19|        5|     DL|          15016|        11433|      28|   1|
|        19|        5|     DL|          11193|        12892|      -6|   0|
|        19|        5|     DL|          10397|        15016|      -1|   0|
|        19|        5|     DL|          15016|        10397|       0|   0|
|        19|        5|     DL|          10397|        14869|      15|   1|
|        19|        5|     DL|          10397|        10423|      33|   1|
|        19|        5|     DL|          11278|        10397|     323|   1|
+----------+---------+---

# Split the data for training
Use 70% of the data for training, 30% for testing.
In the testing data, the Late column will be considered trueLabel for comparing predicted labels with known actual values.

In [4]:
splits = data.randomSplit([0.7,0.3])
train = splits[0]
test = splits[1].withColumnRenamed("Late", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Number of training samples for each feature: ", train_rows,
      "Number of testing samples for each feature: ", test_rows)



Number of training samples for each feature:  1891945 Number of testing samples for each feature:  810273


                                                                                

# Define the Pipeline
A pipeline consists of a series of transformer and estimator stages that typically prepare a DataFrame for modeling and then train a predictive model. 

Here we will have seven stages:
1. A **StringIndex** estimator: converts string values to indices for categorical features
2. A **VectorAssembler** that combines categorical features into a single vector.
3. A **VectorIndexer** that creates indices for a vector of categorical features.
4. A **VectorAssembler** that creates a vector of continuous numeric features.
5. A **MinMaxScaler** that normalizes continuous numeric features.
6. A **VectorAssembler** that creates a vector of categorical and continuous features
7. A **DecisionTreeClassifier** that trains a classification model.

In [5]:
strIdx = StringIndexer(inputCol="Carrier",
                       outputCol="CarrierIdx")
catVect = VectorAssembler(inputCols=["CarrierIdx", "DayofMonth", "DayOfWeek",
                                     "OriginAirportID", "DestAirportID"],
                                     outputCol="catFeatures")
catIdx = VectorIndexer(inputCol=catVect.getOutputCol(),
                       outputCol="idxCatFeatures")
numVect = VectorAssembler(inputCols=["DepDelay"],
                          outputCol="numFeatures")
minMax = MinMaxScaler(inputCol=numVect.getOutputCol(),
                      outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"],
                           outputCol="features")
lr = LogisticRegression(labelCol="Late",
                         featuresCol="features",
                         maxIter=10,
                         regParam=0.03)
# dt = DecisionTreeClassifier(labelCol="Late",
#                             featuresCol="features")
pipeline = Pipeline(stages=[strIdx, catVect, 
                           catIdx, numVect, minMax, 
                           featVect, lr]) 

# Run the pipeline to train the model
Run as an Estimator on the training data to train the model.

In [6]:
pipelineModel = pipeline.fit(train)

24/06/17 11:37:25 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/06/17 11:37:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/17 11:37:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

# Generate label predictions
Transform the test data with all of the stages and the trained model in the pipeline to generate label predictions.

In [7]:
predictions = pipelineModel.transform(test)
predicted = predictions.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

[Stage 38:>                                                         (0 + 1) / 1]

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10397.0,12264.0,0.04205607476635514] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10423.0,13244.0,0.029595015576323987]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,13487.0,0.029075804776739357]|0.0       |0        |
|[10.0,1.0,0.0,10423.0,14869.0,0.05036344755970924] |0.0       |1        |
|[10.0,1.0,0.0,10693.0,10397.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10693.0,11193.0,0.03115264797507788] |0.0       |0        |
|[10.0,1.0,0.0,10693.0,12478.0,0.03686396677050883] |0.0       |0        |
|[10.0,1.0,0.0,10693.0,13487.0,0.030633437175493248]|0.0       |0        |
|[10.0,1.0,0.0,10721.0,12478.0,0.03686396677050883] |0.0       |1        |
|[10.0,1.0,0.0,11042.0,11

                                                                                

# Observation: 
A number of trueLabels 1s are predicted as 0s. Let's evaluate the model.

## Compute Confusion Matrix
Confusion matrix is a table that shows the number of true positive, true negative, false positive, and false negative predictions.

From these core metrics, we can compute the precision, recall, and F1 score.

In [8]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())

pr = tp / (tp + fp)
re = tp / (tp + fn)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()


                                                                                

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           74017.0|
|       FP|             516.0|
|       TN|          648507.0|
|       FN|           87233.0|
|Precision|0.9930768921149021|
|   Recall|0.4590201550387597|
|       F1|0.6278400054287205|
+---------+------------------+



# Observation:
Precision is high, but Recall is low, therefore F1 is suboptimal.

# Area Under ROC
Assess performance by measuring the area under Receiver Operating Characteristic curve. ROC is a graphical plot of the true positive rate vs. the false positive rate.

The spark.ml library includes a BinaryClassificationEvaluator class that can be used to compute this.

In [9]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel",
                                          rawPredictionCol="rawPrediction",
                                          metricName="areaUnderROC")
aur = evaluator.evaluate(predictions)
print("Area under ROC = " + str(aur))

                                                                                

Area under ROC = 0.9236329519980819


The AUR appears to be OK, let's look deeper.

## View the raw prediction and probablity
The prediction is based on a raw prediction score that describes a labeled point in a logistic function. This raw prediction is then converted to a predicted label of 0 or 1 based on a probability vector that indicates the confidence for each possible label value (0 or 1).
The value with the highest confidence is selected as the predicted label.

In [13]:
predictions.select("rawPrediction", "probability", "prediction", "trueLabel").show(100,
    truncate=False)

[Stage 65:>                                                         (0 + 1) / 1]

+----------------------------------------+------------------------------------------+----------+---------+
|rawPrediction                           |probability                               |prediction|trueLabel|
+----------------------------------------+------------------------------------------+----------+---------+
|[1.0574002743947553,-1.0574002743947553]|[0.7421934209918366,0.2578065790081634]   |0.0       |0        |
|[2.066586038796025,-2.066586038796025]  |[0.8876128477735481,0.1123871522264519]   |0.0       |0        |
|[2.260835530326638,-2.260835530326638]  |[0.9055810964270445,0.09441890357295546]  |0.0       |0        |
|[2.312698440673941,-2.312698440673941]  |[0.9099232721364571,0.09007672786354293]  |0.0       |0        |
|[0.2836249094900154,-0.2836249094900154]|[0.5704346938393792,0.42956530616062083]  |0.0       |1        |
|[2.190178240893169,-2.190178240893169]  |[0.8993640398758541,0.10063596012414588]  |0.0       |0        |
|[2.097448103468474,-2.09744810346847

                                                                                

Note that the results include rows where the probability for 0 (the first value in the probability vector) is only slightly highr than the probability for 1 (the second value in the probability vector). The default discrimination threshold is 0.5, so the prediction with the hig
hest probability is always used, no matter how close to the threshold. We can see from the results above that for those trueLabel 1s that were predicted as 0s, the probability for 1 is just slightly lower than the threshold of 0.5.

# Parameter Tuning
To find the best performing parameters, we can use the CrossValidator class to evaluate each combination of parameters defined in a ParameterGrid against multiple folds of the data split into training and validation datasets. This can take a long time to run because every parameter combination is tried multiple times.

## Change the Discrimination Threshold