In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import expr
from pyspark.sql import functions as F
from pyspark.ml.stat import Summarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from helpers.helper_functions import translate_to_file_string
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [None]:
inputFile = translate_to_file_string("../data/Flight_Delay_Jan_2020_ontime.csv")

In [None]:
spark = (SparkSession
       .builder
       .appName("FlightDelay")
       .getOrCreate())

In [None]:
pysparkDF = spark.read.option("header", "true") \
        .option("inferSchema", "true") \
        .option("delimiter", ",") \
        .csv(inputFile)

pysparkDF.printSchema()

### Remove faulty features

In [None]:
pysparkDF = pysparkDF.drop('_c21')
pysparkDF.printSchema()

### Remove records containing NULLvalues

In [None]:
pysparkDF_nonull = pysparkDF.dropna()
f"Removed {pysparkDF.count()-pysparkDF_nonull.count()} records containing NULL values"

### Build String indexer for TAIL_NUM

In [None]:
tailNum_Indexer = StringIndexer().setInputCol("TAIL_NUM").setOutputCol("TAIL_NUM_ID").fit(pysparkDF_nonull)
pysparkDF_indexed = tailNum_Indexer.transform(pysparkDF_nonull)

### Define label columns and remove non-BOOL label columns

In [None]:
labelCols = ["DEP_DEL15","ARR_DEL15"]
labelCols

### Get weight values for features realizations

In [None]:
weightedDF = pysparkDF_indexed

for label in ["DEP_DEL15","ARR_DEL15"]:
    delayedDf = pysparkDF_indexed.filter(label + "=1.0")
    sampleRatio = delayedDf.count() / pysparkDF_indexed.count()

    ratioOfDelayed = sampleRatio
    delayedWeight  = 1 - ratioOfDelayed
    nonDelayedWeight = ratioOfDelayed

    weightedDF = weightedDF.withColumn(label + "_weighted", F.when(weightedDF[label]==("1.0"),delayedWeight).otherwise(nonDelayedWeight))

weightedDF.printSchema()

### Remove redundant features and labels for unconditional prediction
-> Unconditional is referring to predicting each of the labels without having information on the current status of the flight (Use-Case: Checking the day before)

In [None]:
# Remove strings from id/string pairs (redundant)
# Remark: since in this dataset both string and id exist already, no further preprocessing via string_indexer is necessary. Otherwise, strings would have first been converted to ids via string_indexer.
featureCols_unconditional = weightedDF.columns.copy()
featureCols_unconditional.remove("TAIL_NUM") # -> TAIL_NUM_ID
featureCols_unconditional.remove("OP_UNIQUE_CARRIER") # -> OP_CARRIER_AIRLINE_ID
featureCols_unconditional.remove("OP_CARRIER") # -> OP_CARRIER_AIRLINE_ID
featureCols_unconditional.remove("ORIGIN") # -> ORIGIN_AIRPORT_ID
featureCols_unconditional.remove("ORIGIN_AIRPORT_SEQ_ID") # -> ORIGIN_AIRPORT_ID
featureCols_unconditional.remove("DEST") # -> DEST_AIRPORT_SEQ_ID
featureCols_unconditional.remove("DEST_AIRPORT_SEQ_ID") # -> DEST_AIRPORT_SEQ_ID
featureCols_unconditional.remove("DEP_TIME_BLK") # -> preliminary elimination, check if model works better with binned values or not

for label in labelCols:
    featureCols_unconditional.remove(label)
featureCols_unconditional.remove("CANCELLED")
featureCols_unconditional.remove("DIVERTED")
featureCols_unconditional.remove("DEP_DEL15_weighted")
featureCols_unconditional.remove("ARR_DEL15_weighted")
                                     
featureCols_unconditional                         

### Remove redundant features and labels for conditional prediction
-> Conditional is referring to predicting each of the labels considering available real-time information on the current status of the flight (Use-Case: Checking while at the airport, pre-flight)

One would expect that prediction performance is increased when the model is aware of the current flight status (=DEP_DEL15)

Example: If the model is aware that the flight has departure delay, it might be able to better predict whether it will also be delayed at arrival

In [None]:
# Remove strings from id/string pairs (redundant)
# Remark: since in this dataset both string and id exist already, no further preprocessing via string_indexer is necessary. Otherwise, strings would have first been converted to ids via string_indexer.
featureCols_conditional = weightedDF.columns.copy()
featureCols_conditional.remove("TAIL_NUM") # -> TAIL_NUM_ID
featureCols_conditional.remove("OP_UNIQUE_CARRIER") # -> OP_CARRIER_AIRLINE_ID
featureCols_conditional.remove("OP_CARRIER") # -> OP_CARRIER_AIRLINE_ID
featureCols_conditional.remove("ORIGIN") # -> ORIGIN_AIRPORT_ID
featureCols_conditional.remove("ORIGIN_AIRPORT_SEQ_ID") # -> ORIGIN_AIRPORT_ID
featureCols_conditional.remove("DEST") # -> DEST_AIRPORT_SEQ_ID
featureCols_conditional.remove("DEST_AIRPORT_SEQ_ID") # -> DEST_AIRPORT_SEQ_ID
featureCols_conditional.remove("DEP_TIME_BLK") # -> preliminary elimination, check if model works better with binned values or not

for label in [label for label in labelCols if label!="DEP_DEL15"]:
    featureCols_conditional.remove(label)
featureCols_conditional.remove("CANCELLED")
featureCols_conditional.remove("DIVERTED")
featureCols_conditional.remove("DEP_DEL15_weighted")
featureCols_conditional.remove("ARR_DEL15_weighted")
    
featureCols_conditional

### Build and apply feature column assembler for both featureCols

In [None]:
assembler_unconditional =  VectorAssembler(outputCol="features", inputCols=list(featureCols_unconditional))
assembler_conditional =  VectorAssembler(outputCol="features", inputCols=list(featureCols_conditional))

featureSet_unconditional = assembler_unconditional.transform(weightedDF)
featureSet_conditional = assembler_conditional.transform(weightedDF)

# Define same base-scaler for both feature cols
scaler = StandardScaler(inputCol="features",
                        outputCol="scaledFeatures",
                        withStd=True, 
                        withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel_unconditional = scaler.fit(featureSet_unconditional)
scalerModel_conditional = scaler.fit(featureSet_conditional)

scaledFeatureSet_unconditional = scalerModel_unconditional.transform(featureSet_unconditional)
scaledFeatureSet_conditional = scalerModel_conditional.transform(featureSet_conditional)

### Split data into training and test set
Die Aufteilung der Daten erfolgt in 80% Trainingsdaten und 20% Testdaten.

In [None]:
splits_unconditional = scaledFeatureSet_unconditional.randomSplit([0.8, 0.2], 12345)
training_unconditional = splits_unconditional[0]
test_unconditional = splits_unconditional[1]
print('Count train data unconditional: ' + str(training_unconditional.count()))
print('Count test data unconditional: ' + str(test_unconditional.count()))


splits_conditional= scaledFeatureSet_conditional.randomSplit([0.8, 0.2], 12345)
training_conditional = splits_conditional[0]
test_conditional = splits_conditional[1]
print('Count train data conditional: ' + str(training_conditional.count()))
print('Count test data conditional: ' + str(test_conditional.count()))

### Modelling

- Logistische Regression
- SVM

In [None]:
from pyspark.ml.classification import LogisticRegression,LinearSVC,GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import *

In [None]:
schema = StructType([
      StructField('model', StringType(), True),
      StructField('target label', StringType(), True),
      StructField('mode', StringType(), True),
      StructField('param_config', StringType(), True),
      StructField('accuracy', FloatType(), True),
      StructField('fmeasure', FloatType(), True),
      StructField('test error', FloatType(), True)
  ])

evalDF = spark.createDataFrame([], schema)
evalDF.show()

In [None]:
models = ["LogisticRegression","SVM"]

for model in models:
    for label in labelCols:
        for test, train, mode, features in zip([test_unconditional, test_conditional],[training_unconditional,training_conditional],["Unconditional","Conditional"],[featureCols_unconditional,featureCols_conditional]):
            # Skip invalid combinations
            if label=="DEP_DEL15" and mode=="Conditional":
                continue

            # Print Model Spec
            print("\n\n----------MODEL SPEC----------")
            print(f"Model Type: {model}")
            print(f"Target Label: {label}")
            print(f"Prediction Mode: {mode}")

            # Define Model
            if model=="LogisticRegression":
                # Define LogisticRegression Classifier + Paramgrid
                model_instance = LogisticRegression(
                                featuresCol="scaledFeatures",
                                labelCol=label,
                                standardization=False,
                                weightCol=label + "_weighted")
                paramGrid = ParamGridBuilder().addGrid(model_instance.maxIter, [50,100])\
                                 .addGrid(model_instance.regParam, [0.001,0.01,0.1]) \
                                 .addGrid(model_instance.elasticNetParam, [0.0,0.5]) \
                                 .build()
                params = ["maxIter","regParam","elasticNetParam"]
            if model=="SVM":
                # Define SVM Classifier + Paramgrid
                model_instance = LinearSVC(labelCol=label,
                                            featuresCol="scaledFeatures",
                                            standardization=False,
                                            weightCol=label + "_weighted")
                paramGrid = ParamGridBuilder().addGrid(model_instance.aggregationDepth, [2,3])\
                                 .addGrid(model_instance.maxIter, [100,300]) \
                                 .addGrid(model_instance.regParam, [0.01,0.001,0.0001]) \
                                 .build()
                params = ["aggregationDepth","maxIter","regParam"]
                

            evaluator = BinaryClassificationEvaluator(labelCol=label)
            cv = CrossValidator(estimator=model_instance, evaluator=evaluator, \
                          estimatorParamMaps=paramGrid, numFolds=5, parallelism=2)
            cvModel = cv.fit(train)
            model_best = cvModel.bestModel
            param_print = '\n'.join([line for line in model_best.explainParams().split('\n') if line.split(":")[0] in params])
            print("Chosen parameters: \n" + param_print)

            print(str(model) + " Coefficients: " + str(dict(zip(features,["{:.4f}".format(a) for a in model_best.coefficients]))))
            print(str(model) + " Intercept: " + "{:.4f}".format(model_best.intercept))

            # Predict and evaluate
            predictions = cvModel.transform(test)
            predictionAndLabels = predictions.select(predictions.prediction, label)

            countcorrect = predictionAndLabels.filter(f"{label} == prediction").count()
            countincorrect = predictionAndLabels.filter(f"{label} != prediction").count()
            countall = predictionAndLabels.count()
            accuracy = countcorrect/countall
            print(f"Count correct: {countcorrect}")
            print(f"Count incorrect: {countincorrect}")
            print(f"Count all: {countall}")
            print(f"Accuracy: {accuracy}")
            print(f"Test Error {1-accuracy}")
            
            predictionAndLabels = predictions.select("prediction", label).rdd.map(lambda p: [p[0], float(p[1])]) # Map to RDD prediction|label
            metrics =  MulticlassMetrics(predictionAndLabels)
            confusion = metrics.confusionMatrix()
            print("Confusion matrix: \n" , confusion)
            print(f"Weighted F-Score: {metrics.weightedFMeasure()}")
            
            
            print("--------------------")

            newRow = spark.createDataFrame([(model,label,mode,param_print,accuracy,metrics.weightedFMeasure(),1-accuracy)], schema)
            evalDF = evalDF.union(newRow)

In [None]:
df = evalDF.toPandas()
df.head()

In [None]:
#spark.stop()