In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DateType, DoubleType
from pyspark.sql import functions

In [2]:
spark = SparkSession \
    .builder \
    .appName("airlinedelay") \
    .getOrCreate()

context = spark.sparkContext

22/12/19 13:32:26 WARN Utils: Your hostname, lucian resolves to a loopback address: 127.0.1.1; using 192.168.100.7 instead (on interface eno1)
22/12/19 13:32:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/19 13:32:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType([
    StructField("FL_DATE", DateType(), True),
    StructField("OP_CARRIER", StringType(), True),
    StructField("OP_CARRIER_FL_NUM", IntegerType(), True),
    StructField("ORIGIN", StringType(), True),
    StructField("DEST", StringType(), True),
    StructField("CRS_DEP_TIME", DoubleType(), True),
    StructField("DEP_TIME", DoubleType(), True),
    StructField("DEP_DELAY", DoubleType(), True),
    StructField("TAXI_OUT", DoubleType(), True),
    StructField("WHEELS_OFF", DoubleType(), True),
    StructField("WHEELS_ON", DoubleType(), True),
    StructField("TAXI_IN", DoubleType(), True),
    StructField("CRS_ARR_TIME", DoubleType(), True),
    StructField("ARR_TIME", DoubleType(), True),
    StructField("ARR_DELAY", DoubleType(), True),
    StructField("CANCELLED", DoubleType(), True),
    StructField("CANCELLATION_CODE", StringType(), True),
    StructField("DIVERTED", DoubleType(), True),
    StructField("CRS_ELAPSED_TIME", DoubleType(), True),
    StructField("ACTUAL_ELAPSED_TIME", DoubleType(), True),
    StructField("AIR_TIME", DoubleType(), True),
    StructField("DISTANCE", DoubleType(), True),
    StructField("CARRIER_DELAY", DoubleType(), True),
    StructField("WEATHER_DELAY", DoubleType(), True),
    StructField("NAS_DELAY", DoubleType(), True),
    StructField("SECURITY_DELAY", DoubleType(), True),
    StructField("LATE_AIRCRAFT_DELAY", DoubleType(), True)
])

df = spark.read.format("csv") \
    .option("sep", ",") \
    .option("header", True) \
    .schema(schema) \
    .load("*.csv")

df.createOrReplaceTempView("airlinedelay")

22/12/19 13:32:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [None]:
df.show()

GENERAL EVOLUTION OF DELAYS/IMMEDIACIES FOR DEPARTURES AND ARRIVALS

In [63]:
spark.sql("SELECT EXTRACT(year from FL_DATE) AS Year, AVG(IF(DEP_DELAY > 0, DEP_DELAY, NULL)) AS AverageDepartureDelayPerYear, AVG(IF(DEP_DELAY < 0, DEP_DELAY, NULL)) AS AverageDepartureImmediacyPerYear, AVG(IF(ARR_DELAY > 0, ARR_DELAY, NULL)) AS AverageArrivalDelayPerYear, AVG(IF(ARR_DELAY < 0, ARR_DELAY, NULL)) AS AverageArrivalImmediacyPerYear \
    FROM airlinedelay \
    GROUP BY Year") \
    .show()



+----+----------------------------+--------------------------------+--------------------------+------------------------------+
|Year|AverageDepartureDelayPerYear|AverageDepartureImmediacyPerYear|AverageArrivalDelayPerYear|AverageArrivalImmediacyPerYear|
+----+----------------------------+--------------------------------+--------------------------+------------------------------+
|2012|          29.308491558603826|              -4.775003687415096|         30.01057264950832|           -12.005357641941504|
|2013|          31.401555361002433|              -4.904385394290586|        31.827785050297418|           -11.702265936434728|
|2014|           32.49641736820618|              -4.833083676692695|        32.533315365395346|           -11.405155103231625|
+----+----------------------------+--------------------------------+--------------------------+------------------------------+



                                                                                

AVERAGE DEPARTURE/ARRIVAL DELAYS AND IMMEDIACIES FOR EACH CARRIER

In [64]:
spark.sql("SELECT OP_CARRIER AS Carrier, AVG(IF(DEP_DELAY > 0, DEP_DELAY, NULL)) AS AverageDepartureDelayPerYear, AVG(IF(DEP_DELAY < 0, DEP_DELAY, NULL)) AS AverageDepartureImmediacyPerYear, AVG(IF(ARR_DELAY > 0, ARR_DELAY, NULL)) AS AverageArrivalDelayPerYear, AVG(IF(ARR_DELAY < 0, ARR_DELAY, NULL)) AS AverageArrivalImmediacyPerYear \
    FROM airlinedelay \
    GROUP BY OP_CARRIER \
    ORDER BY OP_CARRIER") \
    .show(1000)



+-------+----------------------------+--------------------------------+--------------------------+------------------------------+
|Carrier|AverageDepartureDelayPerYear|AverageDepartureImmediacyPerYear|AverageArrivalDelayPerYear|AverageArrivalImmediacyPerYear|
+-------+----------------------------+--------------------------------+--------------------------+------------------------------+
|     9E|            41.1994042994043|              -5.564889633425305|         35.87002260292782|           -13.436900873769936|
|     AA|           32.54444204760743|              -4.700877399716953|        33.255888586969014|           -12.431163678020225|
|     AS|          25.501613592356573|              -6.637907372005587|         21.63877411823345|            -12.85842735530121|
|     B6|          38.235298232523434|              -5.639004307460789|         37.05889731902625|           -13.144293274691284|
|     DL|          29.328027779297397|              -4.020968669531213|         29.3424858

                                                                                

NUMBER OF DELAYS/IMMEDIACIES ON DELAYS/ARRIVALS PER CARRIER FOR EACH YEAR

In [65]:
spark.sql(" \
    SELECT OP_CARRIER AS Carrier, \
    SUM( \
          CASE WHEN DEP_DELAY > 0 THEN 1 \
          ELSE 0 END \
    ) AS DepartureDelayCount, \
    SUM( \
          CASE WHEN ARR_DELAY > 0 THEN 1 \
          ELSE 0 END \
    ) AS ArrivalDelayCount, \
    SUM( \
          CASE WHEN DEP_DELAY < 0 THEN 1 \
          ELSE 0 END \
    ) AS DepartureImmediaciesCount, \
    SUM( \
          CASE WHEN ARR_DELAY < 0 THEN 1 \
          ELSE 0 END \
    ) AS ArrivalImmediaciesCount \
    FROM airlinedelay \
    GROUP BY Carrier") \
    .show(1000)



+-------+-------------------+-----------------+-------------------------+-----------------------+
|Carrier|DepartureDelayCount|ArrivalDelayCount|DepartureImmediaciesCount|ArrivalImmediaciesCount|
+-------+-------------------+-----------------+-------------------------+-----------------------+
|     UA|             754037|           590542|                   667765|                 886818|
|     AA|             639732|           652491|                   852861|                 879049|
|     EV|             794388|           843827|                  1228926|                1212830|
|     B6|             257316|           280607|                   419969|                 413810|
|     DL|             694618|           721697|                  1424077|                1492563|
|     OO|             566909|           708499|                  1163431|                1050931|
|     F9|             107035|           123663|                   120377|                 108418|
|     YV|           

                                                                                

EVOLUTION OF DELAYS/IMMEDIACIES FOR DEPARTURES AND ARRIVALS PER CARRIER FOR EACH YEAR

In [66]:
carrierDelayEvolution = spark.sql("SELECT OP_CARRIER AS Carrier, EXTRACT(year from FL_DATE) AS Year, AVG(IF(DEP_DELAY > 0, DEP_DELAY, NULL)) AS AverageDepartureDelayPerYear, AVG(IF(DEP_DELAY < 0, DEP_DELAY, NULL)) AS AverageDepartureImmediacyPerYear, AVG(IF(ARR_DELAY > 0, ARR_DELAY, NULL)) AS AverageArrivalDelayPerYear, AVG(IF(ARR_DELAY < 0, ARR_DELAY, NULL)) AS AverageArrivalImmediacyPerYear \
    FROM airlinedelay \
    GROUP BY OP_CARRIER, Year  \
    ORDER BY OP_CARRIER, Year")

carrierDelayEvolution.show(1000)



+-------+----+----------------------------+--------------------------------+--------------------------+------------------------------+
|Carrier|Year|AverageDepartureDelayPerYear|AverageDepartureImmediacyPerYear|AverageArrivalDelayPerYear|AverageArrivalImmediacyPerYear|
+-------+----+----------------------------+--------------------------------+--------------------------+------------------------------+
|     9E|2013|            41.1994042994043|              -5.564889633425305|         35.87002260292782|           -13.436900873769936|
|     AA|2012|            31.9744497812039|              -4.552307015030712|        31.822441532976004|            -12.59330599656538|
|     AA|2013|           30.80871770776232|              -4.624880325958691|         33.37034777594022|           -12.778652352657465|
|     AA|2014|          35.007743983428355|              -4.912890145205526|        34.518669487146646|           -11.904497881883877|
|     AS|2012|          27.879336977186313|            

                                                                                

In [67]:
carrierDelayEvolution.createOrReplaceTempView("carrierDelayEvolution")

EVOLUTION OF DELAYS/IMMEDIACIES FOR DEPARTURES AND ARRIVALS PER CARRIER FOR EACH YEAR: absolute change YoY of average departure/arrival delays/immediacies 

*Note: negative => immediacy decreases, plane leaves on time; positive => immediacy increases, plane does not leave on time.*

In [69]:
spark.sql(" \
    SELECT Carrier, Year, \
    IF(Carrier = LAG(Carrier) OVER (ORDER BY Carrier, Year), AverageDepartureDelayPerYear - LAG(AverageDepartureDelayPerYear) OVER (ORDER BY Carrier, Year), NULL) AS AverageDepartureDelayYearOverYearDifference, \
    IF(Carrier = LAG(Carrier) OVER (ORDER BY Carrier, Year), - AverageDepartureImmediacyPerYear + LAG(AverageDepartureImmediacyPerYear) OVER (ORDER BY Carrier, Year), NULL) AS AverageDepartureImmediacyYearOverYearDifference, \
    IF(Carrier = LAG(Carrier) OVER (ORDER BY Carrier, Year), AverageArrivalDelayPerYear - LAG(AverageArrivalDelayPerYear) OVER (ORDER BY Carrier, Year), NULL) AS AverageArrivalDelayYearOverYearDifference, \
    IF(Carrier = LAG(Carrier) OVER (ORDER BY Carrier, Year), - AverageArrivalImmediacyPerYear + LAG(AverageArrivalImmediacyPerYear) OVER (ORDER BY Carrier, Year), NULL) AS AverageArrivalImmediacyYearOverYearDifference \
    FROM carrierDelayEvolution") \
    .na.drop('any') \
    .show(1000)

22/12/19 16:48:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/12/19 16:48:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/19 16:48:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------+-

                                                                                