In [1]:
# Import dependencies

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
# Create spark session 

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [7]:
# Create spark dataframe

flights = spark.read.csv('flights.csv', inferSchema=True, header=True)
flights.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows



### Data Prep

In [8]:
# Only select necessary columns and ArrDelay where/datatype/label
flights_df = flights.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))
flights_df.show(10)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
|        19|        5|     DL|          15016|        11433|      28|    1|
|        19|        5|     DL|          11193|        12892|      -6|    0|
|        19|        5|     DL|          10397|        15016|      -1|    0|
|        19|        5|     DL|          15016|        10397|       0|    0|
|        19|        5|     DL|          10397|        14869|      15|    1|
|        19|        5|     DL|          10397|        10423|      33|    1|
|        19|        5|     DL|          11278|        10397|     323|    1|
+----------+

In [10]:
# Train/test split
splits = flights_df.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1891532  Testing Rows: 810686


### Data Pipeline

In [15]:
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")
catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
# dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [16]:
# Train the model

piplineModel = pipeline.fit(train)

In [17]:
# Create label predictions

prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10423.0,14869.0,0.04888195527821113] |0.0       |1        |
|[10.0,1.0,0.0,10529.0,11193.0,0.028601144045761834]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.028601144045761834]|0.0       |0        |
|[10.0,1.0,0.0,10529.0,11433.0,0.028601144045761834]|0.0       |0        |
|[10.0,1.0,0.0,10693.0,12478.0,0.03536141445657827] |0.0       |0        |
|[10.0,1.0,0.0,10693.0,13487.0,0.029121164846593866]|0.0       |0        |
|[10.0,1.0,0.0,10721.0,12478.0,0.0296411856474259]  |0.0       |1        |
|[10.0,1.0,0.0,10721.0,13931.0,0.027041081643265734]|0.0       |0        |
|[10.0,1.0,0.0,10792.0,11433.0,0.026521060842433702]|0.0       |0        |
|[10.0,1.0,0.0,10792.0,11433.0,0.029121164846593866]|0.0       |0        |
|[10.0,1.0,0.0,10792.0,12

### Evaluate Model

In [19]:
# Area Under ROC

evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.922771927066443


In [20]:
# View raw prediction and probability

prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+----------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                           |probability                             |prediction|trueLabel|
+----------------------------------------+----------------------------------------+----------+---------+
|[1.0820569183429851,-1.0820569183429851]|[0.7468830386427964,0.25311696135720363]|0.0       |1        |
|[1.6266095738348372,-1.6266095738348372]|[0.8357046548606655,0.16429534513933453]|0.0       |0        |
|[1.6266095738348372,-1.6266095738348372]|[0.8357046548606655,0.16429534513933453]|0.0       |0        |
|[1.6267444334094021,-1.6267444334094021]|[0.8357231705635642,0.16427682943643585]|0.0       |0        |
|[1.4468844777321732,-1.4468844777321732]|[0.8095184889366397,0.1904815110633603] |0.0       |0        |
|[1.6153538821359223,-1.6153538821359223]|[0.8341533762838017,0.1658466237161983] |0.0       |0        |
|[1.6010421141694404,-1.6010421141694404]|[0.8321639845

In [21]:
# Run cross validation

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

In [22]:
# New prediction

newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[10.0,1.0,0.0,104...|       0.0|        1|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,105...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,106...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        1|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        0|
|[10.0,1.0,0.0,107...|       0.0|        1|
|[10.0,1.0,0.0,108...|       0.0|        1|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,108...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0|        0|
|[10.0,1.0,0.0,110...|       0.0

In [23]:
# New Area Under ROC

# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print( "AUR2 = ", aur2)

AUR2 =  0.9227722057029768
