# Logistic Regression and Random Forest to Predict Arrival Delay of 15+ Minutes

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline  
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import when
import pandas as pd
import os

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()

/opt/conda/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/12/01 02:44:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Training years
flights_2018 = spark.read.parquet('/standard/ds7200-apt4c/isaac_yuyang_final_project/flights_2018.parquet')
flights_2019 = spark.read.parquet('/standard/ds7200-apt4c/isaac_yuyang_final_project/flights_2019.parquet')
flights_2020 = spark.read.parquet('/standard/ds7200-apt4c/isaac_yuyang_final_project/flights_2020.parquet')
flights_2021 = spark.read.parquet('/standard/ds7200-apt4c/isaac_yuyang_final_project/flights_2021.parquet')
# Test year
flights_2022 = spark.read.parquet('/standard/ds7200-apt4c/isaac_yuyang_final_project/flights_2022.parquet')

In [4]:
combined = flights_2018.union(flights_2019).union(flights_2020).union(flights_2021)
train = combined.select(['Airline', 'DayOfWeek', 'DayofMonth', 'Month', 'DepDelay', 'TaxiOut', 'Distance', 'Origin', 'Dest', 'ArrDel15']).dropna()

In [5]:
train.count()

                                                                                

24394688

In [16]:
train_sample = train.sample(fraction=0.6, seed=59)
train_sample.count()

                                                                                

14637535

In [7]:
# Clean test set
test = flights_2022.select(['Airline', 'DayOfWeek', 'DayofMonth', 'Month', 'DepDelay', 'TaxiOut', 'Distance', 'Origin', 'Dest', 'ArrDel15']).dropna()
test.count()

                                                                                

3944916

## Logistic Regression

In [15]:
# Create pipeline stages
indexer_airline = StringIndexer(inputCol='Airline', outputCol='airline_index')
indexer_origin = StringIndexer(inputCol='Origin', outputCol='origin_index')
indexer_dest = StringIndexer(inputCol='Dest', outputCol='dest_index')
indexer_dow = StringIndexer(inputCol='DayOfWeek', outputCol='dow_index')
indexer_dom = StringIndexer(inputCol='DayofMonth', outputCol='dom_index')

ohe = OneHotEncoder(inputCols=['airline_index', 'origin_index', 'dest_index', 'dow_index', 'dom_index'], outputCols=['airline_ohe', 'origin_ohe', 'dest_ohe', 'dow_ohe', 'dom_ohe'])

#va = VectorAssembler(inputCols=['airline_ohe', 'dow_ohe', 'dom_ohe', 'DepDelay', 'TaxiOut', 'Distance'], outputCol='features')
va = VectorAssembler(inputCols=['airline_ohe', 'origin_ohe', 'dest_ohe', 'dow_ohe', 'dom_ohe', 'Month', 'TaxiOut', 'Distance'], outputCol='features')

sc = StandardScaler(inputCol='features', outputCol='scaledFeatures')
lr = LogisticRegression(labelCol='ArrDel15', featuresCol='scaledFeatures', maxIter=50)

pipeline = Pipeline(stages=[indexer_airline, indexer_origin, indexer_dest, indexer_dow, indexer_dom, ohe, va, sc, lr])

In [17]:
model = pipeline.fit(train_sample)

                                                                                

In [18]:
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                          labelCol='ArrDel15',
                                          metricName='areaUnderROC')
auroc = evaluator.evaluate(predictions)

print(f'Area under ROC: {auroc}')



Area under ROC: 0.5454443091829165


                                                                                

## Random Forest

In [19]:
# Create pipeline stages
indexer_airline = StringIndexer(inputCol='Airline', outputCol='airline_index')
indexer_origin = StringIndexer(inputCol='Origin', outputCol='origin_index')
indexer_dest = StringIndexer(inputCol='Dest', outputCol='dest_index')
indexer_dow = StringIndexer(inputCol='DayOfWeek', outputCol='dow_index')
indexer_dom = StringIndexer(inputCol='DayofMonth', outputCol='dom_index')

ohe = OneHotEncoder(inputCols=['airline_index', 'origin_index', 'dest_index', 'dow_index', 'dom_index'], outputCols=['airline_ohe', 'origin_ohe', 'dest_ohe', 'dow_ohe', 'dom_ohe'])

#va = VectorAssembler(inputCols=['airline_ohe', 'dow_ohe', 'dom_ohe', 'DepDelay', 'TaxiOut', 'Distance'], outputCol='features')
va = VectorAssembler(inputCols=['airline_ohe', 'origin_ohe', 'dest_ohe', 'dow_ohe', 'dom_ohe', 'Month', 'TaxiOut', 'Distance'], outputCol='features')

rf = RandomForestClassifier(labelCol='ArrDel15', featuresCol='features', numTrees=50)

pipeline = Pipeline(stages=[indexer_airline, indexer_origin, indexer_dest, indexer_dow, indexer_dom, ohe, va, rf])

In [20]:
model = pipeline.fit(train_sample)



25/12/01 02:55:44 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 2014.7 MiB so far)
25/12/01 02:55:44 WARN BlockManager: Persisting block rdd_1301_11 to disk instead.
25/12/01 02:55:44 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 2014.7 MiB so far)
25/12/01 02:55:44 WARN BlockManager: Persisting block rdd_1301_19 to disk instead.
25/12/01 02:55:44 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)
25/12/01 02:55:44 WARN BlockManager: Persisting block rdd_1301_27 to disk instead.
25/12/01 02:56:08 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 6.6 GiB so far)
25/12/01 02:56:08 WARN BlockManager: Persisting block rdd_1301_3 to disk instead.




25/12/01 02:56:53 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 6.6 GiB so far)
25/12/01 02:56:55 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 2014.7 MiB so far)
25/12/01 02:57:05 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)




25/12/01 02:57:38 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 6.6 GiB so far)




25/12/01 02:58:26 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 2014.7 MiB so far)
25/12/01 02:58:26 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 2014.7 MiB so far)
25/12/01 02:58:26 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)
25/12/01 02:58:28 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 3.0 GiB so far)




25/12/01 02:59:30 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 2014.7 MiB so far)
25/12/01 02:59:30 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 2014.7 MiB so far)
25/12/01 02:59:31 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)
25/12/01 02:59:32 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 3.0 GiB so far)




25/12/01 03:00:42 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 2014.7 MiB so far)
25/12/01 03:00:42 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 2014.7 MiB so far)
25/12/01 03:00:42 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)
25/12/01 03:00:43 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 3.0 GiB so far)




25/12/01 03:02:04 WARN MemoryStore: Not enough space to cache rdd_1301_11 in memory! (computed 2014.7 MiB so far)
25/12/01 03:02:04 WARN MemoryStore: Not enough space to cache rdd_1301_19 in memory! (computed 2014.7 MiB so far)
25/12/01 03:02:04 WARN MemoryStore: Not enough space to cache rdd_1301_27 in memory! (computed 2014.7 MiB so far)
25/12/01 03:02:06 WARN MemoryStore: Not enough space to cache rdd_1301_3 in memory! (computed 3.0 GiB so far)


                                                                                

In [21]:
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                          labelCol='ArrDel15',
                                          metricName='areaUnderROC')
auroc = evaluator.evaluate(predictions)

print(f'Area under ROC: {auroc}')



Area under ROC: 0.5


                                                                                

In [23]:
corr = combined.stat.corr('DepDelay', 'ArrDelay')
corr

                                                                                

0.9557306025332492