In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    BooleanType,
    TimestampType,
    LongType,
    DoubleType,
)

os.environ["JAVA_HOME"] = "C:\Program Files\Java\jre-1.8"
os.environ["SPARK_HOME"] = "c:\spark"


spark = (SparkSession.builder.master("local[1]").appName("airline_analysis").getOrCreate())
spark

Load lookup tables

In [2]:
lines = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("record layout.csv")
)

df_layout = lines.select("Column", "Description")
df_layout.sort(col("Column").asc()).show(df_layout.count(), False)

+--------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Column                                            |Description                                                                                                                                                                                                                                           |
+--------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ActualElapsedTime                                 |Elapsed Time of Flight, in Minutes              

In [3]:
#lookup table with airline info to join later on to main data set
lines = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("L_UNIQUE_CARRIERS.csv")
)

df_airlines = lines.select("Code", "Description")
df_airlines.sort(col("Code").asc()).show(df_airlines.count(), False)

+-------+----------------------------------------------------------------------------------+
|Code   |Description                                                                       |
+-------+----------------------------------------------------------------------------------+
|02Q    |Titan Airways                                                                     |
|04Q    |Tradewind Aviation                                                                |
|05Q    |Comlux Aviation, AG                                                               |
|06Q    |Master Top Linhas Aereas Ltd.                                                     |
|07Q    |Flair Airlines Ltd.                                                               |
|09Q    |Swift Air, LLC d/b/a Eastern Air Lines d/b/a Eastern                              |
|0BQ    |DCA                                                                               |
|0CQ    |ACM AIR CHARTER GmbH                                         

Load actual data from parquet file

In [4]:
# download the raw data (full) https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022?select=raw and save all csvs in one parquet file (not shown here how to)
dfs_total = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .parquet("my_file.parquet")
)

In [5]:
dfs_total.dtypes

[('Year', 'bigint'),
 ('Quarter', 'bigint'),
 ('Month', 'bigint'),
 ('DayofMonth', 'bigint'),
 ('DayOfWeek', 'bigint'),
 ('FlightDate', 'string'),
 ('Marketing_Airline_Network', 'string'),
 ('Operated_or_Branded_Code_Share_Partners', 'string'),
 ('DOT_ID_Marketing_Airline', 'bigint'),
 ('IATA_Code_Marketing_Airline', 'string'),
 ('Flight_Number_Marketing_Airline', 'bigint'),
 ('Originally_Scheduled_Code_Share_Airline', 'string'),
 ('DOT_ID_Originally_Scheduled_Code_Share_Airline', 'double'),
 ('IATA_Code_Originally_Scheduled_Code_Share_Airline', 'string'),
 ('Flight_Num_Originally_Scheduled_Code_Share_Airline', 'double'),
 ('Operating_Airline ', 'string'),
 ('DOT_ID_Operating_Airline', 'bigint'),
 ('IATA_Code_Operating_Airline', 'string'),
 ('Tail_Number', 'string'),
 ('Flight_Number_Operating_Airline', 'bigint'),
 ('OriginAirportID', 'bigint'),
 ('OriginAirportSeqID', 'bigint'),
 ('OriginCityMarketID', 'bigint'),
 ('Origin', 'string'),
 ('OriginCityName', 'string'),
 ('OriginState', '

In [6]:
dfs_total.count()

29193782

In [7]:
dfs_total.show()

+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+------------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+-------------

In [5]:
# decide if field is worth keeping
dfs_total.select(
    [
        count(when(col(field).isNull(), field)).alias(field)
        for field in dfs_total.columns
    ]
).show()

+----+-------+-----+----------+---------+----------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+---------------------------------------+----------------------------------------------+-------------------------------------------------+--------------------------------------------------+------------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+-------------

In [None]:
dfs_total.dtypes

[('Year', 'bigint'),
 ('Quarter', 'bigint'),
 ('Month', 'bigint'),
 ('DayofMonth', 'bigint'),
 ('DayOfWeek', 'bigint'),
 ('FlightDate', 'string'),
 ('Marketing_Airline_Network', 'string'),
 ('Operated_or_Branded_Code_Share_Partners', 'string'),
 ('DOT_ID_Marketing_Airline', 'bigint'),
 ('IATA_Code_Marketing_Airline', 'string'),
 ('Flight_Number_Marketing_Airline', 'bigint'),
 ('Originally_Scheduled_Code_Share_Airline', 'string'),
 ('DOT_ID_Originally_Scheduled_Code_Share_Airline', 'double'),
 ('IATA_Code_Originally_Scheduled_Code_Share_Airline', 'string'),
 ('Flight_Num_Originally_Scheduled_Code_Share_Airline', 'double'),
 ('Operating_Airline ', 'string'),
 ('DOT_ID_Operating_Airline', 'bigint'),
 ('IATA_Code_Operating_Airline', 'string'),
 ('Tail_Number', 'string'),
 ('Flight_Number_Operating_Airline', 'bigint'),
 ('OriginAirportID', 'bigint'),
 ('OriginAirportSeqID', 'bigint'),
 ('OriginCityMarketID', 'bigint'),
 ('Origin', 'string'),
 ('OriginCityName', 'string'),
 ('OriginState', '

In [None]:
# check for duplicates
diff = dfs_total.count() - dfs_total.distinct().count()
diff

0

Select subset of fields to work with

In [5]:
# note the delay cols include mostly NULLs and are not usable!
cols = (
    "Year",
    "FlightDate",
    "Operating_Airline ",
    "OriginAirportID",
    "Origin",
    "OriginCityName",
    "DestAirportID",
    "Dest",
    "DestCityName",
    "DepDelayMinutes",
    "ArrDelayMinutes",
    "Cancelled",
    "Diverted",
    "AirTime",
    "Flights",
    "Distance",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
    "DivAirportLandings",
)

dfsc = dfs_total.select(*cols)
dfsc.printSchema()

root
 |-- Year: long (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Operating_Airline : string (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- DestAirportID: long (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Flights: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)
 |-- DivAirportLandings: double (nullable = true)



In [6]:
# field Operating_Airline has a trailing whitespace => remove it and check other fields too
dfsc = dfsc.withColumn("Operating_Airline", trim(col("Operating_Airline ")))
dfsc = dfsc.drop(col("Operating_Airline "))
dfsc.show()

+----+----------+---------------+------+--------------+-------------+----+------------+---------------+---------------+---------+--------+-------+-------+--------+------------+------------+--------+-------------+-----------------+------------------+-----------------+
|Year|FlightDate|OriginAirportID|Origin|OriginCityName|DestAirportID|Dest|DestCityName|DepDelayMinutes|ArrDelayMinutes|Cancelled|Diverted|AirTime|Flights|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|DivAirportLandings|Operating_Airline|
+----+----------+---------------+------+--------------+-------------+----+------------+---------------+---------------+---------+--------+-------+-------+--------+------------+------------+--------+-------------+-----------------+------------------+-----------------+
|2018|2018-01-23|          10146|   ABY|    Albany, GA|        10397| ATL| Atlanta, GA|            0.0|            0.0|      0.0|     0.0|   38.0|    1.0|   145.0|        NULL|        NULL|    NUL

Rearrange order of fields again

In [7]:
# run again below to have this order of columns
cols = (
    "Year",
    "FlightDate",
    "Operating_Airline",
    "OriginAirportID",
    "Origin",
    "OriginCityName",
    "DestAirportID",
    "Dest",
    "DestCityName",
    "DepDelayMinutes",
    "ArrDelayMinutes",
    "Cancelled",
    "Diverted",
    "AirTime",
    "Flights",
    "Distance",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
    "DivAirportLandings",
)

dfsc = dfsc.select(*cols)
dfsc.printSchema()

root
 |-- Year: long (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Operating_Airline: string (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- DestAirportID: long (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Flights: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)
 |-- DivAirportLandings: double (nullable = true)



In [8]:
# inner join the airline desc lookup table to know the Airline Name/Description

dfsci = dfsc.join(
    df_airlines, dfsc.Operating_Airline == df_airlines.Code, "inner"
).select(dfsc["*"], df_airlines.Description)


cols = (
    "Year",
    "FlightDate",
    "Operating_Airline",
    col("Description").alias("Operating_Airline_Name"),
    "OriginAirportID",
    "Origin",
    "OriginCityName",
    "DestAirportID",
    "Dest",
    "DestCityName",
    "DepDelayMinutes",
    "ArrDelayMinutes",
    "Cancelled",
    "Diverted",
    "AirTime",
    "Flights",
    "Distance",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
    "DivAirportLandings",
)

dfsci = dfsci.select(*cols)
dfsci.show()

+----+----------+-----------------+----------------------+---------------+------+--------------+-------------+----+------------+---------------+---------------+---------+--------+-------+-------+--------+------------+------------+--------+-------------+-----------------+------------------+
|Year|FlightDate|Operating_Airline|Operating_Airline_Name|OriginAirportID|Origin|OriginCityName|DestAirportID|Dest|DestCityName|DepDelayMinutes|ArrDelayMinutes|Cancelled|Diverted|AirTime|Flights|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|DivAirportLandings|
+----+----------+-----------------+----------------------+---------------+------+--------------+-------------+----+------------+---------------+---------------+---------+--------+-------+-------+--------+------------+------------+--------+-------------+-----------------+------------------+
|2018|2018-01-23|               9E|     Endeavor Air Inc.|          10146|   ABY|    Albany, GA|        10397| ATL| Atlanta, GA

In [9]:
dfscit = dfsci.selectExpr(
    "cast(Year as long) Year",
    "cast(FlightDate as date ) FlightDate",
    "cast(Operating_Airline as string) Operating_Airline",
    "cast(Operating_Airline_Name as string) Operating_Airline_Name",
    "cast(OriginAirportID as long) OriginAirportID",
    "cast(Origin as string) Origin",
    "cast(OriginCityName as string) OriginCityName",
    "cast(DestAirportID as long) DestAirportID",
    "cast(Dest as string) Dest",
    "cast(DestCityName as string) DestCityName",
    "cast(DepDelayMinutes as double) DepDelayMinutes",
    "cast(ArrDelayMinutes as double) ArrDelayMinutes",
    "cast(Cancelled as boolean) Cancelled",
    "cast(Diverted as boolean) Diverted",
    "cast(AirTime as float) AirTime",
    "cast(Flights as float) Flights",
    "cast(Distance as float) Distance",
    "cast(CarrierDelay as float) CarrierDelay",
    "cast(WeatherDelay as float) WeatherDelay",
    "cast(NASDelay as float) NASDelay",
    "cast(SecurityDelay as float) SecurityDelay",
    "cast(LateAircraftDelay as float) LateAircraftDelay",
    "cast(DivAirportLandings as float) DivAirportLandings",
)
dfscit.printSchema()
dfscit.show(truncate=False)

root
 |-- Year: long (nullable = true)
 |-- FlightDate: date (nullable = true)
 |-- Operating_Airline: string (nullable = true)
 |-- Operating_Airline_Name: string (nullable = true)
 |-- OriginAirportID: long (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- DestAirportID: long (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- AirTime: float (nullable = true)
 |-- Flights: float (nullable = true)
 |-- Distance: float (nullable = true)
 |-- CarrierDelay: float (nullable = true)
 |-- WeatherDelay: float (nullable = true)
 |-- NASDelay: float (nullable = true)
 |-- SecurityDelay: float (nullable = true)
 |-- LateAircraftDelay: float (nullable = true)
 |-- DivAirportLandings: float (nullable = true)

+----+-

In [10]:
dfscit.schema["Operating_Airline"].nullable = False
dfscit.schema["OriginAirportID"].nullable = False
dfscit.schema["DestAirportID"].nullable = False

In [10]:
list(dfscit.schema)

[StructField('Year', LongType(), True),
 StructField('FlightDate', DateType(), True),
 StructField('Operating_Airline', StringType(), False),
 StructField('Operating_Airline_Name', StringType(), True),
 StructField('OriginAirportID', LongType(), False),
 StructField('Origin', StringType(), True),
 StructField('OriginCityName', StringType(), True),
 StructField('DestAirportID', LongType(), False),
 StructField('Dest', StringType(), True),
 StructField('DestCityName', StringType(), True),
 StructField('DepDelayMinutes', DoubleType(), True),
 StructField('ArrDelayMinutes', DoubleType(), True),
 StructField('Cancelled', BooleanType(), True),
 StructField('Diverted', BooleanType(), True),
 StructField('AirTime', FloatType(), True),
 StructField('Flights', FloatType(), True),
 StructField('Distance', FloatType(), True),
 StructField('CarrierDelay', FloatType(), True),
 StructField('WeatherDelay', FloatType(), True),
 StructField('NASDelay', FloatType(), True),
 StructField('SecurityDelay', F

In [11]:
dfscit.createOrReplaceTempView("comb_flights_full_v")
query0 = spark.sql("SELECT * FROM comb_flights_full_v")

In [13]:
query0 = spark.sql(
    """
    SELECT Operating_Airline, Operating_Airline_Name,
    avg(ArrDelayMinutes) AS avg_arrivaldelay, 
    min(ArrDelayMinutes) AS min_arrivaldelay, 
    max(ArrDelayMinutes) AS max_arrivaldelay, 
    avg(DepDelayMinutes) AS average_departuredelay, 
    min(DepDelayMinutes) AS min_departuredelay, 
    max(DepDelayMinutes) AS max_departuredelay 
    FROM comb_flights_full_v 
    GROUP BY Operating_Airline, Operating_Airline_Name
    """
)
query0.show()

+-----------------+----------------------+------------------+----------------+----------------+----------------------+------------------+------------------+
|Operating_Airline|Operating_Airline_Name|  avg_arrivaldelay|min_arrivaldelay|max_arrivaldelay|average_departuredelay|min_departuredelay|max_departuredelay|
+-----------------+----------------------+------------------+----------------+----------------+----------------------+------------------+------------------+
|               PT|     Piedmont Airlines|11.713520368066453|             0.0|          1797.0|     10.88155679573684|               0.0|            1808.0|
|               B6|       JetBlue Airways|20.029908892739368|             0.0|          2029.0|    20.243547102854734|               0.0|            2052.0|
|               G7|  GoJet Airlines LL...|17.641083927888186|             0.0|          2973.0|     17.03163894207755|               0.0|            2976.0|
|               EV|  ExpressJet Airlin...| 18.491006279639

In [14]:
# same as above but using pyspark.sql functions directly on df
df_measures = dfscit.groupBy("Operating_Airline", "Operating_Airline_Name").agg(
    sum("ArrDelayMinutes").alias("total_arrivaldelay"),
    avg("ArrDelayMinutes").alias("avg_arrivaldelay"),
    min("ArrDelayMinutes").alias("min_arrivaldelay"),
    max("ArrDelayMinutes").alias("max_arrivaldelay"),
    sum("DepDelayMinutes").alias("total_departuredelay"),
    avg("DepDelayMinutes").alias("average_departuredelay"),
    min("DepDelayMinutes").alias("min_departuredelay"),
    max("DepDelayMinutes").alias("max_departuredelay"),
)

df_measures.show()

+-----------------+----------------------+------------------+------------------+----------------+----------------+--------------------+----------------------+------------------+------------------+
|Operating_Airline|Operating_Airline_Name|total_arrivaldelay|  avg_arrivaldelay|min_arrivaldelay|max_arrivaldelay|total_departuredelay|average_departuredelay|min_departuredelay|max_departuredelay|
+-----------------+----------------------+------------------+------------------+----------------+----------------+--------------------+----------------------+------------------+------------------+
|               PT|     Piedmont Airlines|         4396869.0|11.713520368066453|             0.0|          1797.0|           4104360.0|     10.88155679573684|               0.0|            1808.0|
|               B6|       JetBlue Airways|       2.1501306E7|20.029908892739368|             0.0|          2029.0|         2.1814001E7|    20.243547102854734|               0.0|            2052.0|
|              

What influences delays? Inflight causes and or ground / technical issues?

In [15]:
total = dfscit.groupBy().agg(
    (sum("CarrierDelay") / 60).alias("total_hours_CarrierDelay"),
    (sum("WeatherDelay") / 60).alias("total_hours_WeatherDelay"),
    (sum("NASDelay") / 60).alias("total_hours_NASDelay"),
    (sum("SecurityDelay") / 60).alias("total_hours_SecurityDelay"),
    (sum("LateAircraftDelay") / 60).alias("total_hours_LateAircraftDelay"),
)
total.show()

+------------------------+------------------------+--------------------+-------------------------+-----------------------------+
|total_hours_CarrierDelay|total_hours_WeatherDelay|total_hours_NASDelay|total_hours_SecurityDelay|total_hours_LateAircraftDelay|
+------------------------+------------------------+--------------------+-------------------------+-----------------------------+
|               1949248.5|      322512.81666666665|           1182829.5|                  10757.7|                   2134218.35|
+------------------------+------------------------+--------------------+-------------------------+-----------------------------+



Late Aircraft delay shows the highest hours value which means in simple terms "a flight is delayed cause of a delay of the previous flight"

However, there are cases in the data set where the cause of delay columns (CarrierDelay, WeatherDelay, etc) have NULL rows though arrival or departure delay columns have non NULLs entries, so there are more delay reasons than the ones flagged in these five cols:

In [16]:
matchs = dfscit.select(
    "ArrDelayMinutes",
    "DepDelayMinutes",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
)

matchss = matchs.filter((col("ArrDelayMinutes") > 0) | (col("DepDelayMinutes") > 0))
matchss.show()

+---------------+---------------+------------+------------+--------+-------------+-----------------+
|ArrDelayMinutes|DepDelayMinutes|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---------------+---------------+------------+------------+--------+-------------+-----------------+
|           22.0|           NULL|         0.0|         0.0|    22.0|          0.0|              0.0|
|            0.0|            2.0|        NULL|        NULL|    NULL|         NULL|             NULL|
|           22.0|           24.0|        22.0|         0.0|     0.0|          0.0|              0.0|
|           16.0|           22.0|         0.0|         0.0|     0.0|          0.0|             16.0|
|          105.0|          105.0|         0.0|         0.0|     0.0|          0.0|            105.0|
|            8.0|            0.0|        NULL|        NULL|    NULL|         NULL|             NULL|
|            1.0|            7.0|        NULL|        NULL|    NULL|         NULL|         

In [17]:
# vice versa: below shows there are no cases where the delay reason cols are populated and there is no departe/arrival delay, i.e. the data is reliable in these five cols
dfscit_delay = dfscit.filter(
    (
        col("CarrierDelay").isNotNull()
        | col("WeatherDelay").isNotNull()
        | col("NASDelay").isNotNull()
        | col("SecurityDelay").isNotNull()
        | col("LateAircraftDelay").isNotNull()
    )
    & (col("ArrDelayMinutes").isNull() & col("DepDelayMinutes").isNull())
).select(
    "ArrDelayMinutes",
    "DepDelayMinutes",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
)
dfscit_delay.show()

+---------------+---------------+------------+------------+--------+-------------+-----------------+
|ArrDelayMinutes|DepDelayMinutes|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---------------+---------------+------------+------------+--------+-------------+-----------------+
+---------------+---------------+------------+------------+--------+-------------+-----------------+



Examples for delay types as per Federal Aviation Administration (Website: https://aspm.faa.gov/aspmhelp/index/Types_of_Delay.html):
- CarrierDelay -> aircraft damage, awaiting the arrival of connecting passengers or crew, cargo loading, engineering inspection, fueling, maintenance, removal of passenger, slow boarding/seating, weight and balance delays
- Late Arrival Delay --> Arrival delay at an airport due to the late arrival of the same plane at a previous airport
- NASDelay --> weather conditions, airport operations, heavy traffic volume, air traffic control               
- SecurityDelay --> evacuation of a terminal, aircraft re-boarding caused by security breach, inoperative screening equipment    
- WeatherDelay --> hazardous weather conditions at departure, enroute, or arrival  

In [15]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark import SparkFiles

# For needed numerical cols exclude NULLs (else correlation calc. not poss.) and use for corr matrix
cols = [
    "DepDelayMinutes",
    "ArrDelayMinutes",
    "Diverted",
    "AirTime",
    "Distance",
    "CarrierDelay",
    "WeatherDelay",
    "NASDelay",
    "SecurityDelay",
    "LateAircraftDelay",
]

dfscit_notnull = dfscit.dropna(subset=cols)  # count is 5005310

input = dfscit_notnull.select(cols)

rdd_vect = input.rdd.map(lambda row: Vectors.dense(row))
corr_matrix = Statistics.corr(rdd_vect, method="pearson")
corr_df = pd.DataFrame(corr_matrix, columns=cols, index=cols)

print("Correlation matrix for flights data:\n")
print(corr_df)

Correlation matrix for flights data:

                   DepDelayMinutes  ArrDelayMinutes  Diverted   AirTime  \
DepDelayMinutes           1.000000         0.977431       NaN -0.046045   
ArrDelayMinutes           0.977431         1.000000       NaN -0.025990   
Diverted                       NaN              NaN       1.0       NaN   
AirTime                  -0.046045        -0.025990       NaN  1.000000   
Distance                 -0.022337        -0.020633       NaN  0.977521   
CarrierDelay              0.671381         0.666030       NaN -0.003895   
WeatherDelay              0.295350         0.306573       NaN -0.022711   
NASDelay                  0.144603         0.237194       NaN  0.058543   
SecurityDelay             0.022072         0.020390       NaN  0.006750   
LateAircraftDelay         0.516943         0.496167       NaN -0.062840   

                   Distance  CarrierDelay  WeatherDelay  NASDelay  \
DepDelayMinutes   -0.022337      0.671381      0.295350  0.144603  

In [61]:
expression = "+".join(cols[5:10])
df0 = input.withColumn("sum_5_delaytypes", expr(expression))

result = (
    df0.withColumn("CarrierDelay_rel", df0.CarrierDelay / df0.sum_5_delaytypes)
    .withColumn("WeatherDelay_rel", df0.WeatherDelay / df0.sum_5_delaytypes)
    .withColumn("NASDelay_rel", df0.NASDelay / df0.sum_5_delaytypes)
    .withColumn("SecurityDelay_rel", df0.SecurityDelay / df0.sum_5_delaytypes)
    .withColumn("LateAircraftDelay_rel", df0.LateAircraftDelay / df0.sum_5_delaytypes)
)


result.show(1000)

+---------------+---------------+--------+-------+--------+------------+------------+--------+-------------+-----------------+----------------+--------------------+--------------------+--------------------+-----------------+---------------------+
|DepDelayMinutes|ArrDelayMinutes|Diverted|AirTime|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|sum_5_delaytypes|    CarrierDelay_rel|    WeatherDelay_rel|        NASDelay_rel|SecurityDelay_rel|LateAircraftDelay_rel|
+---------------+---------------+--------+-------+--------+------------+------------+--------+-------------+-----------------+----------------+--------------------+--------------------+--------------------+-----------------+---------------------+
|           24.0|           22.0|   false|   32.0|   145.0|        22.0|         0.0|     0.0|          0.0|              0.0|            22.0|                 1.0|                 0.0|                 0.0|              0.0|                  0.0|
|           

In [67]:
we = result.filter(col("WeatherDelay").isNotNull())
avg_waeath_delay = we.groupby().agg(avg("WeatherDelay_rel"))
avg_waeath_delay.show()

+---------------------+
|avg(WeatherDelay_rel)|
+---------------------+
|  0.03629347544149675|
+---------------------+



- DepDelayMinutes has a strong correlation to ArrDelayMinutes! And consequently also to CarrierDelay, WeatherDelay, etc. ArrDelayMinutes is composed of. Cause: delay propagation / domino effect

- Airtime to Distance are strongly correlated which is obvious as the longer the distance the longer the airtime, but neither Airtime nor Distance show a correlation to any Delay cols (Dep/Arr/CarrD/WeatherD, etc).
  Note that the definition of WeatherDelay also includes weather conditions enroute, i.e. when the aircraft is at high altitude (not near the airport). So one could safely assume weather conditions on high altitde are mostly not a major factor for arrival delays. Instead weather delays average at about 3% for arrival delays 

Numerically the top 3 causes for arrival delays are in descending order LateAircraftDelay, CarrierDelay and NASDelay:

In [71]:
re = result.groupby().agg(sum("CarrierDelay")/sum("ArrDelayMinutes"), sum("WeatherDelay")/sum("ArrDelayMinutes"), sum("NASDelay") /
                          sum("ArrDelayMinutes"), sum("SecurityDelay")/sum("ArrDelayMinutes"), sum("LateAircraftDelay")/sum("ArrDelayMinutes"))
re.show()

+------------------------------------------+------------------------------------------+--------------------------------------+-------------------------------------------+-----------------------------------------------+
|(sum(CarrierDelay) / sum(ArrDelayMinutes))|(sum(WeatherDelay) / sum(ArrDelayMinutes))|(sum(NASDelay) / sum(ArrDelayMinutes))|(sum(SecurityDelay) / sum(ArrDelayMinutes))|(sum(LateAircraftDelay) / sum(ArrDelayMinutes))|
+------------------------------------------+------------------------------------------+--------------------------------------+-------------------------------------------+-----------------------------------------------+
|                       0.34814368647922733|                       0.05760206468183657|                   0.21127567784626022|                        0.00192143460193768|                              0.381054376448882|
+------------------------------------------+------------------------------------------+-------------------------------------

Means in most cases the delay propagation (same aircraft is delayed at previous airport) and CarrierDelay are the contributing reasons. 

While carrier delay has many different causes (as it can be seen in the definition above), amongst the prominent of them are awaiting the arrival of connecting passengers or crew, but mostly aircraft related technical/maintenance issues. Lastly NASDelay reflects heavy traffic volume and airport operations. 

Abstracting from LateAircraft delay - which is a logical consequence - the CarrierDelay is one of the most determining factors contributing to not hear any applause at landing of the aircraft at the arrival airport.

In [2]:
spark.stop()