In [2]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when
from pyspark.sql.functions import col

Load data and create dateframe. Fraction can control the proportion of data.

In [3]:
spark = SparkSession.builder.appName("RandomSample").getOrCreate()

#The file is in Google Cloud Platform's bucket
file_path = "gs://data_rz/full_data_flightdelay.csv"  
df = spark.read.csv(file_path, header=True, inferSchema=True)

#fraction controls the percentage of data used. In this project, we used 25%, 50% and 100%
fraction = 1.0
random_sample = df.sample(fraction)

Transform categorical columns into numeric columns. DEP_TIME_BLK means time period such as 0000-0059. CARRIER_NAME is the airline of this flight. DEPARTING_AIRPORT and PREVIOUS_AIRPORT is the departure and landing airports.  
Then we drop the initial columns.  
The next step is change the type of some columns bacause only specified type can be used in tree model training.

In [7]:
#Transfer categorical to numerical
columns_to_index = ["DEP_TIME_BLK", "CARRIER_NAME", "DEPARTING_AIRPORT", "PREVIOUS_AIRPORT"]
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") for col in columns_to_index]
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(random_sample).transform(random_sample)

In [8]:
#Data transfer
columns_to_drop = ['DEP_TIME_BLK', 'CARRIER_NAME', 'DEPARTING_AIRPORT', 'PREVIOUS_AIRPORT', 'label']
indexed_df = indexed_df.drop(*columns_to_drop)
indexed_df = indexed_df.withColumn("DEP_DEL15", indexed_df["DEP_DEL15"].cast(DoubleType()))
indexed_df = indexed_df.withColumn("PREVIOUS_AIRPORT_index", indexed_df["PREVIOUS_AIRPORT_index"].cast(IntegerType()))
indexed_df = indexed_df.withColumn("DEPARTING_AIRPORT_index", indexed_df["DEPARTING_AIRPORT_index"].cast(IntegerType()))

Define the size of airplane by the number of seats.

In [9]:
#Change NUMBER_OF_SEATS column to numerical
condition_0 = indexed_df['NUMBER_OF_SEATS'] < 150
condition_1 = (indexed_df['NUMBER_OF_SEATS'] >= 150) & (indexed_df['NUMBER_OF_SEATS'] <= 300)
condition_2 = indexed_df['NUMBER_OF_SEATS'] > 300

indexed_df = indexed_df.withColumn('SEATS_CATEGORY',
                                   when(condition_0, 0)
                                   .when(condition_1, 1)
                                   .when(condition_2, 2)
                                   .otherwise(None)) 

indexed_df = indexed_df.drop('NUMBER_OF_SEATS')

Train the data. Use grid search to tune the parameter. Number of K-folds is 2.

In [10]:
#Train using grid search
feature_columns = [col for col in indexed_df.columns if col != 'DEP_DEL15']

vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

lr_classifier = LogisticRegression(labelCol="DEP_DEL15", featuresCol="features")

pipeline = Pipeline(stages=[vector_assembler, lr_classifier])

param_grid = (ParamGridBuilder()
              .addGrid(lr_classifier.regParam, [0.01, 0.1, 1.0])
              .addGrid(lr_classifier.elasticNetParam, [0.0, 0.5, 1.0])
              .addGrid(lr_classifier.maxIter, [10, 50, 100])
              .build())

#Save into cache for quickily running
cached_df = indexed_df.cache()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=BinaryClassificationEvaluator(labelCol="DEP_DEL15", metricName="areaUnderROC"),
                          numFolds=2)

cv_model = crossval.fit(cached_df)

best_model = cv_model.bestModel

print("Best RegParam:", best_model.stages[-1]._java_obj.getRegParam())
print("Best ElasticNetParam:", best_model.stages[-1]._java_obj.getElasticNetParam())
print("Best MaxIter:", best_model.stages[-1]._java_obj.getMaxIter())

Best RegParam: 0.01
Best ElasticNetParam: 0.0
Best MaxIter: 100


Evaluate the model by some metrics. 

In [11]:
#Make prediction
predictions = cv_model.transform(indexed_df)

predictions = cv_model.transform(indexed_df)

evaluator_roc = BinaryClassificationEvaluator(labelCol="DEP_DEL15", metricName="areaUnderROC")
roc = evaluator_roc.evaluate(predictions)

evaluator_multi = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="accuracy")
accuracy = evaluator_multi.evaluate(predictions)

evaluator_precision = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)

evaluator_recall = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="DEP_DEL15", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

print("ROC:", roc)  
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1_score)

ROC: 0.6338357960219884
Accuracy: 0.8089738776555481
Precision: 0.7541789329521513
Recall: 0.8089738776555482
F1-Score: 0.7278706651805659
