# Flights Analysis (January 2008)

Flights analysis is going to be performed as follows:

1. PySpark **environment setup**
2. Data source and **Spark data abstraction** (DataFrame) **set up**
3. Data set **metadata analysis**:
  1. Display **schema and size** of the DataFrame
  2. Get one or multiple **random samples** from the data set to better understand what the data is all about
  3. Identify **data entities**, **metrics** and **dimensions**
  4. **Columns/fields categorization**
4. Columns groups **basic profiling** to better understand our data set:
  1. **Timing related** columns basic profiling
  2. **Flight related** columns basic profiling
  3. **Issue related** columns basic profiling
5. **Answer some business questions** to improve service
  1. **Ratio of delayed** (and no cancelled) flights by severity
  2. **Severe delayed flights statistics** by type of delay (carrier, weather, NAS, security and lateaircraft)
  3. **Top 20 origin airports** (and figures) involved in severe delays

Let's go for it:

## 1. PySpark environment setup

In [1]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## 2. Data source and Spark data abstraction (DataFrame) setup

In [3]:
flightsDF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("Data/immo_data.csv")

## 3. Data set metadata analysis
### A. Display schema and size of the DataFrame

In [5]:
flightsDF.printSchema()
print(flightsDF.count())

root
 |-- regio1: string (nullable = true)
 |-- serviceCharge: string (nullable = true)
 |-- heatingType: string (nullable = true)
 |-- telekomTvOffer: string (nullable = true)
 |-- telekomHybridUploadSpeed: string (nullable = true)
 |-- newlyConst: string (nullable = true)
 |-- balcony: string (nullable = true)
 |-- picturecount: string (nullable = true)
 |-- pricetrend: string (nullable = true)
 |-- telekomUploadSpeed: string (nullable = true)
 |-- totalRent: string (nullable = true)
 |-- yearConstructed: string (nullable = true)
 |-- scoutId: string (nullable = true)
 |-- noParkSpaces: string (nullable = true)
 |-- firingTypes: string (nullable = true)
 |-- hasKitchen: string (nullable = true)
 |-- geo_bln: string (nullable = true)
 |-- cellar: string (nullable = true)
 |-- yearConstructedRange: string (nullable = true)
 |-- baseRent: string (nullable = true)
 |-- houseNumber: string (nullable = true)
 |-- livingSpace: string (nullable = true)
 |-- geo_krs: string (nullable = true)


### B. Get one or multiple random samples from the data set

In [15]:
flightsDF.cache() # optimization to make the processing faster
flightsDF.sample(False, 0.1).take(2)

[Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime='1039', CRSDepTime=1040, ArrTime='1132', CRSArrTime=1150, UniqueCarrier='WN', FlightNum=535, TailNum='N428WN', ActualElapsedTime='233', CRSElapsedTime=250, AirTime='219', ArrDelay='-18', DepDelay='-1', Origin='IND', Dest='LAS', Distance=1591, TaxiIn='7', TaxiOut='7', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime='706', CRSDepTime=700, ArrTime='916', CRSArrTime=915, UniqueCarrier='WN', FlightNum=100, TailNum='N690SW', ActualElapsedTime='130', CRSElapsedTime=135, AirTime='106', ArrDelay='1', DepDelay='6', Origin='IND', Dest='MCO', Distance=828, TaxiIn='5', TaxiOut='19', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')]

### C. Data entities, metrics and dimensions

I've identified the following elements:

* **Entities:** Flight (main one which is measured - facts), Airport (dimension), Air Carrier (dimension)
* **Metrics:** Departure time, scheduled departure time, arrival time, scheduled arrival time, ...
* **Dimensions:** Origin, destination, tailNum, flight number, ...

### D. Column categorization

The following could be a potential column categorization:

* **Timing related columns:** *Year*, *Month*, *DayofMonth*, *DayOfWeek*, *DepTime*, *CRSDepTime*, *ArrTime* and *CRSArrTime*
* **Flight related columns:** *UniqueCarrier*, *FlightNum*, *TailNum*, *ActualElapsedTime*, *CRSElapsedTime*, *AirTime*, *Origin*, *Dest*, *Distance*, *TaxiIn* and *TaxiOut*
* **Issue related columns:** *ArrDelay*, *DepDelay*, *Cancelled*, *CancellationCode*, *Diverted*, *CarrierDelay*, *WeatherDelay*, *NASDelay*, *SecurityDelay* and *LateAircraftDelay*

## 4. Columns groups basic profiling to better understand our data set
### A. Timing related columns basic profiling

In [16]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print ("Summary of columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select("Year","Month","DayofMonth","DayOfWeek").summary().show()

print("Checking for nulls on columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Year","Month","DayofMonth","DayOfWeek"]]).show()

print("Checking amount of distinct values in columns Year, Month, DayofMonth and DayOfWeek:")
flightsDF.select([countDistinct(c).alias(c) for c in ["Year","Month","DayofMonth","DayOfWeek"]]).show()




print ("Most and least frequent occurrences for DayofMonth and DayOfWeek columns:")
dayofMonthOccurrencesDF = flightsDF.groupBy("DayofMonth").agg(count(lit(1)).alias("Total"))
dayOfWeekDF = flightsDF.groupBy("DayOfWeek").agg(count(lit(1)).alias("Total"))

leastFreqDayOfMonth    = dayofMonthOccurrencesDF.orderBy(col("Total").asc()).first()
mostFreqDayOfMonth     = dayofMonthOccurrencesDF.orderBy(col("Total").desc()).first()
leastFreqDayOfWeek     = dayOfWeekDF.orderBy(col("Total").asc()).first()
mostFreqDayOfWeek      = dayOfWeekDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqDayOfMonth", "mostFreqDayOfMonth", "leastFreqDayOfWeek", "mostFreqDayOfWeek", \
       "%d (%d occurrences)" % (leastFreqDayOfMonth["DayofMonth"], leastFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (mostFreqDayOfMonth["DayofMonth"], mostFreqDayOfMonth["Total"]), \
       "%d (%d occurrences)" % (leastFreqDayOfWeek["DayOfWeek"], leastFreqDayOfWeek["Total"]), \
       "%d (%d occurrences)" % (mostFreqDayOfWeek["DayOfWeek"], mostFreqDayOfWeek["Total"]))))




print ("Summary of columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select("DepTime","CRSDepTime","ArrTime","CRSArrTime").summary().show()

print("Checking for nulls on columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["DepTime","CRSDepTime","ArrTime","CRSArrTime"]]).show()

print("Checking amount of distinct values in columns DepTime, CRSDepTime, ArrTime and CRSArrTime:")
flightsDF.select([countDistinct(c).alias(c) for c in ["DepTime","CRSDepTime","ArrTime","CRSArrTime"]]).show()

Summary of columns Year, Month, DayofMonth and DayOfWeek:
+-------+------+------+-----------------+------------------+
|summary|  Year| Month|       DayofMonth|         DayOfWeek|
+-------+------+------+-----------------+------------------+
|  count|100000|100000|           100000|            100000|
|   mean|2008.0|   1.0|         17.08786|           3.89004|
| stddev|   0.0|   0.0|8.356363976550366|1.9529994727263522|
|    min|  2008|     1|                1|                 1|
|    25%|  2008|     1|               10|                 2|
|    50%|  2008|     1|               17|                 4|
|    75%|  2008|     1|               24|                 5|
|    max|  2008|     1|               31|                 7|
+-------+------+------+-----------------+------------------+

Checking for nulls on columns Year, Month, DayofMonth and DayOfWeek:
+----+-----+----------+---------+
|Year|Month|DayofMonth|DayOfWeek|
+----+-----+----------+---------+
|   0|    0|         0|        0|
+---


| leastFreqDayOfMonth | mostFreqDayOfMonth | leastFreqDayOfWeek | mostFreqDayOfWeek |
|----|----|----|----|
| 2 (188 occurrences) | 11 (3639 occurrences) | 6 (11285 occurrences) | 4 (17079 occurrences) |


Summary of columns DepTime, CRSDepTime, ArrTime and CRSArrTime:
+-------+-----------------+------------------+------------------+------------------+
|summary|          DepTime|        CRSDepTime|           ArrTime|        CRSArrTime|
+-------+-----------------+------------------+------------------+------------------+
|  count|           100000|            100000|            100000|            100000|
|   mean|1355.200722248073|         1341.9894|1492.7392247056678|        1502.60154|
| stddev|464.0896318178077|451.35698421417186| 496.3767939169903|478.10021559061414|
|    min|                1|               600|                 1|                 5|
|    25%|            942.0|               940|            1114.0|              1120|
|    50%|           1344.0|              1335|            1518.0|              1520|
|    75%|           1743.0|              1730|            1913.0|              1910|
|    max|               NA|              2235|                NA|              2355|
+

### B. Flight related columns basic profiling

In [17]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select("UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance").summary().show()

print("Checking for nulls on columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance"]]).show()

print("Checking amount of distinct values in columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:")
flightsDF.select([countDistinct(c).alias(c) for c in ["UniqueCarrier", "FlightNum", "TailNum", "Origin", "Dest", "Distance"]]).show()

print ("Most and least frequent occurrences for FlightNum, TailNum, Origin and Dest columns:")
FlightNumDF = flightsDF.groupBy("FlightNum").agg(count(lit(1)).alias("Total"))
TailNumDF   = flightsDF.groupBy("TailNum").agg(count(lit(1)).alias("Total"))
OriginDF    = flightsDF.groupBy("Origin").agg(count(lit(1)).alias("Total"))
DestDF      = flightsDF.groupBy("Dest").agg(count(lit(1)).alias("Total"))

leastFreqFlightNum    = FlightNumDF.orderBy(col("Total").asc()).first()
mostFreqFlightNum     = FlightNumDF.orderBy(col("Total").desc()).first()
leastFreqTailNum      = TailNumDF.orderBy(col("Total").asc()).first()
mostFreqTailNum       = TailNumDF.orderBy(col("Total").desc()).first()
leastFreqOrigin       = OriginDF.orderBy(col("Total").asc()).first()
mostFreqOrigin        = OriginDF.orderBy(col("Total").desc()).first()
leastFreqDest         = DestDF.orderBy(col("Total").asc()).first()
mostFreqDest          = DestDF.orderBy(col("Total").desc()).first()

display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqFlightNum", "mostFreqFlightNum", "leastFreqTailNum", "mostFreqTailNum", \
       "%d (%d occurrences)" % (leastFreqFlightNum["FlightNum"], leastFreqFlightNum["Total"]), \
       "%d (%d occurrences)" % (mostFreqFlightNum["FlightNum"], mostFreqFlightNum["Total"]), \
       "%s (%d occurrences)" % (leastFreqTailNum["TailNum"], leastFreqTailNum["Total"]), \
       "%s (%d occurrences)" % (mostFreqTailNum["TailNum"], mostFreqTailNum["Total"]))))
display(Markdown("""
| %s | %s | %s | %s |
|----|----|----|----|
| %s | %s | %s | %s |
""" % ("leastFreqOrigin", "mostFreqOrigin", "leastFreqDest", "mostFreqDest", \
       "%s (%d occurrences)" % (leastFreqOrigin["Origin"], leastFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (mostFreqOrigin["Origin"], mostFreqOrigin["Total"]), \
       "%s (%d occurrences)" % (leastFreqDest["Dest"], leastFreqDest["Total"]), \
       "%s (%d occurrences)" % (mostFreqDest["Dest"], mostFreqDest["Total"]))))

print ("Summary of columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select("ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut").summary().show()

print("Checking for nulls on columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut"]]).show()

print("Checking amount of distinct values in columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:")
flightsDF.select([countDistinct(c).alias(c) for c in ["ActualElapsedTime", "CRSElapsedTime", "AirTime", "TaxiIn", "TaxiOut"]]).show()

Summary of columns UniqueCarrier, FlightNum, TailNum, Origin, Dest and Distance:
+-------+-------------+------------------+-------+------+------+-----------------+
|summary|UniqueCarrier|         FlightNum|TailNum|Origin|  Dest|         Distance|
+-------+-------------+------------------+-------+------+------+-----------------+
|  count|       100000|            100000|  98858|100000|100000|           100000|
|   mean|         null|        1510.64854|   null|  null|  null|        630.58632|
| stddev|         null|1186.1285980391729|   null|  null|  null|437.3570752611293|
|    min|           WN|                 1| N11109|   ABQ|   ABQ|               66|
|    25%|         null|               502|   null|  null|  null|              324|
|    50%|         null|              1320|   null|  null|  null|              453|
|    75%|         null|              2362|   null|  null|  null|              843|
|    max|           XE|              7676|   N906|   TUS|   XNA|             2363|
+-----


| leastFreqFlightNum | mostFreqFlightNum | leastFreqTailNum | mostFreqTailNum |
|----|----|----|----|
| 2711 (1 occurrences) | 224 (256 occurrences) | N13118 (1 occurrences) | None (1142 occurrences) |



| leastFreqOrigin | mostFreqOrigin | leastFreqDest | mostFreqDest |
|----|----|----|----|
| CRW (1 occurrences) | LAS (6817 occurrences) | MYR (1 occurrences) | LAS (6734 occurrences) |


Summary of columns ActualElapsedTime, CRSElapsedTime, AirTime, TaxiIn and TaxiOut:
+-------+------------------+-----------------+-----------------+-----------------+------------------+
|summary| ActualElapsedTime|   CRSElapsedTime|          AirTime|           TaxiIn|           TaxiOut|
+-------+------------------+-----------------+-----------------+-----------------+------------------+
|  count|            100000|           100000|           100000|           100000|            100000|
|   mean|107.55850169203023|        111.98954| 91.8637966321506|4.768961883726114|10.927765077181412|
| stddev|    55.35097080091|57.20913510480459|54.20822434609592|2.983635448088949| 5.989579591427709|
|    min|               100|               17|              100|                1|                 1|
|    25%|              68.0|               70|             53.0|              3.0|               8.0|
|    50%|              88.0|               90|             71.0|              4.0|               9.0|

### C. Issue related columns basic profiling

In [18]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first

print ("Summary of columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select("ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted").summary().show()

print("Checking for nulls on columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted"]]).show()

print("Checking amount of distinct values in columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:")
flightsDF.select([countDistinct(c).alias(c) for c in ["ArrDelay", "DepDelay", "Cancelled", "CancellationCode", "Diverted"]]).show()

print ("Summary of columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select("CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay").summary().show()

print("Checking for nulls on columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select([count(when(col(c).isNull(), c)).alias(c) for c in ["CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]]).show()

print("Checking amount of distinct values in columns CarrierDelay, WeatherDelay, NASDelay, SecurityDelay and LateAircraftDelay:")
flightsDF.select([countDistinct(c).alias(c) for c in ["CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay"]]).show()


Summary of columns ArrDelay, DepDelay, Cancelled, CancellationCode and Diverted:
+-------+-----------------+------------------+-------------------+----------------+------------------+
|summary|         ArrDelay|          DepDelay|          Cancelled|CancellationCode|          Diverted|
+-------+-----------------+------------------+-------------------+----------------+------------------+
|  count|           100000|            100000|             100000|            1142|            100000|
|   mean|5.729954001094247|10.379048736571649|            0.01142|            null|            0.0016|
| stddev|30.96695927246462|28.384428068170866|0.10625298347324127|            null|0.0399681870311844|
|    min|               -1|                -1|                  0|               A|                 0|
|    25%|             -9.0|              -2.0|                  0|            null|                 0|
|    50%|             -2.0|               1.0|                  0|            null|            

## 5. Answer some business questions to improve service

### A. Ratio of delayed (and no cancelled) flights by severity

In [19]:
from pyspark.sql.functions import count, round

# Delay severity is going to be categorized as follows (totally made up):
#
#   "nodelay"      - delay=(-infinity,0] mins
#   "acceptable"   - delay=(0,15] mins
#   "annoying"     - delay=(15,30] mins
#   "impactul"     - delay=(30,60] mins
#   "unacceptable" - delay=(60,+infinity) mins

# 1. Let's enrich the DF with delay severity based on our categorization
totalFlights = flightsDF.count()
delayCategorizationDF = flightsDF\
   .where(col("ArrDelay")!="NA")\
   .withColumn("DelaySeverity", when(col("ArrDelay")<=0,"1.nodelay")\
                               .when((col("ArrDelay")>0) & (col("ArrDelay")<=15),"2.acceptable")\
                               .when((col("ArrDelay")>15) & (col("ArrDelay")<=30),"3.annoying")\
                               .when((col("ArrDelay")>30) & (col("ArrDelay")<=60),"4.impactul")\
                               .otherwise("5.unacceptable"))
delayCategorizationDF.cache() # optimization to make the processing faster
# 2. Ready to answer to this business question
delayCategorizationDF.where(col("Cancelled")==0)\
                     .select("DelaySeverity", "ArrDelay")\
                     .groupBy("DelaySeverity")\
                     .agg(count("DelaySeverity").alias("NumFlights"), \
                          (count("DelaySeverity")/totalFlights*100).alias("Ratio"))\
                     .orderBy("DelaySeverity")\
                     .select("DelaySeverity","NumFlights",round("Ratio",2).alias("RoundedRatio")).show()

+--------------+----------+------------+
| DelaySeverity|NumFlights|RoundedRatio|
+--------------+----------+------------+
|     1.nodelay|     57143|       57.14|
|  2.acceptable|     22761|       22.76|
|    3.annoying|      8101|         8.1|
|    4.impactul|      6050|        6.05|
|5.unacceptable|      4643|        4.64|
+--------------+----------+------------+



### B. Severe delayed flights statistics by type of delay (carrier, weather, NAS, security and lateaircraft)

In [20]:
from pyspark.sql.functions import max, min, avg, stddev
from pyspark.sql.types import IntegerType

# To get statistics of severe delayed flights, we have to prepare the previous DataFrame (delayCategorizationDF):
#   1. Remove cancelled flights
#   2. Keep only severe delayed flights: annoying, impactful and unacceptable
#   3. To get proper statistics, convert String columns with delay info into Integer columns
#      (none of the converted fields contain "NA" as value in severeDelaysDF - you can easily check this out) 
#
severeDelaysDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .withColumn("IntArrDelay", col("ArrDelay").cast(IntegerType()))\
                       .withColumn("IntCarrierDelay", col("CarrierDelay").cast(IntegerType()))\
                       .withColumn("IntWeatherDelay", col("WeatherDelay").cast(IntegerType()))\
                       .withColumn("IntNASDelay", col("NASDelay").cast(IntegerType()))\
                       .withColumn("IntSecurityDelay", col("SecurityDelay").cast(IntegerType()))\
                       .withColumn("IntLateAircraftDelay", col("LateAircraftDelay").cast(IntegerType()))\
                       .select("DelaySeverity", "IntArrDelay","IntCarrierDelay","IntWeatherDelay",\
                               "IntNASDelay", "IntSecurityDelay", "IntLateAircraftDelay")
severeDelaysDF.cache() # optimization to make the processing faster

display(Markdown("**'Arrival' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntArrDelay").alias("AverageDelay"),\
                   min("IntArrDelay").alias("LowestDelay"),\
                   max("IntArrDelay").alias("HighestDelay"),\
                   stddev("IntArrDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Carrier' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntCarrierDelay").alias("AverageDelay"),\
                   min("IntCarrierDelay").alias("LowestDelay"),\
                   max("IntCarrierDelay").alias("HighestDelay"),\
                   stddev("IntCarrierDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Weather' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntWeatherDelay").alias("AverageDelay"),\
                   min("IntWeatherDelay").alias("LowestDelay"),\
                   max("IntWeatherDelay").alias("HighestDelay"),\
                   stddev("IntWeatherDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'NAS' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntNASDelay").alias("AverageDelay"),\
                   min("IntNASDelay").alias("LowestDelay"),\
                   max("IntNASDelay").alias("HighestDelay"),\
                   stddev("IntNASDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'Security' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntSecurityDelay").alias("AverageDelay"),\
                   min("IntSecurityDelay").alias("LowestDelay"),\
                   max("IntSecurityDelay").alias("HighestDelay"),\
                   stddev("IntSecurityDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

display(Markdown("**'LateAircraft' severe delays basic stats** (in mins):"))
severeDelaysDF.groupBy("DelaySeverity")\
              .agg(avg("IntLateAircraftDelay").alias("AverageDelay"),\
                   min("IntLateAircraftDelay").alias("LowestDelay"),\
                   max("IntLateAircraftDelay").alias("HighestDelay"),\
                   stddev("IntLateAircraftDelay").alias("StdDevDelay"))\
              .orderBy("DelaySeverity").show()

**'Arrival' severe delays basic stats** (in mins):

+--------------+------------------+-----------+------------+------------------+
| DelaySeverity|      AverageDelay|LowestDelay|HighestDelay|       StdDevDelay|
+--------------+------------------+-----------+------------+------------------+
|    3.annoying| 21.97555857301568|         16|          30| 4.204130253081042|
|    4.impactul| 42.49900826446281|         31|          60| 8.490116530348118|
|5.unacceptable|112.50118457893603|         61|         500|55.535970510739816|
+--------------+------------------+-----------+------------+------------------+



**'Carrier' severe delays basic stats** (in mins):

+--------------+-----------------+-----------+------------+-----------------+
| DelaySeverity|     AverageDelay|LowestDelay|HighestDelay|      StdDevDelay|
+--------------+-----------------+-----------+------------+-----------------+
|    3.annoying|5.307616343661277|          0|          30|7.465239778273078|
|    4.impactul| 9.72809917355372|          0|          60|14.06377159723449|
|5.unacceptable|17.89511091966401|          0|         431|39.35785159709625|
+--------------+-----------------+-----------+------------+-----------------+



**'Weather' severe delays basic stats** (in mins):

+--------------+-------------------+-----------+------------+------------------+
| DelaySeverity|       AverageDelay|LowestDelay|HighestDelay|       StdDevDelay|
+--------------+-------------------+-----------+------------+------------------+
|    3.annoying|0.25367238612516974|          0|          28| 1.996159994036475|
|    4.impactul| 0.8046280991735537|          0|          60| 4.891432829821254|
|5.unacceptable|  7.095843204824467|          0|         267|27.110169659379295|
+--------------+-------------------+-----------+------------+------------------+



**'NAS' severe delays basic stats** (in mins):

+--------------+------------------+-----------+------------+------------------+
| DelaySeverity|      AverageDelay|LowestDelay|HighestDelay|       StdDevDelay|
+--------------+------------------+-----------+------------+------------------+
|    3.annoying|5.6590544377237375|          0|          30| 7.991772840270974|
|    4.impactul|  6.91702479338843|          0|          60|11.897814956328913|
|5.unacceptable|15.459616627180703|          0|         366|33.584072958913225|
+--------------+------------------+-----------+------------+------------------+



**'Security' severe delays basic stats** (in mins):

+--------------+-------------------+-----------+------------+------------------+
| DelaySeverity|       AverageDelay|LowestDelay|HighestDelay|       StdDevDelay|
+--------------+-------------------+-----------+------------+------------------+
|    3.annoying|0.11122083693371189|          0|          28|1.2349234007295635|
|    4.impactul|0.11123966942148761|          0|          48|1.6527896485929991|
|5.unacceptable|0.10618134826620719|          0|          88|  2.47445599778738|
+--------------+-------------------+-----------+------------+------------------+



**'LateAircraft' severe delays basic stats** (in mins):

+--------------+------------------+-----------+------------+------------------+
| DelaySeverity|      AverageDelay|LowestDelay|HighestDelay|       StdDevDelay|
+--------------+------------------+-----------+------------+------------------+
|    3.annoying|10.643994568571781|          0|          30| 9.288445499357707|
|    4.impactul| 24.93801652892562|          0|          60|17.331641832099727|
|5.unacceptable| 71.94443247900064|          0|         434| 59.86123488142751|
+--------------+------------------+-----------+------------+------------------+



### C. Top 20 origin airports (and figures) involved in severe delays

In [21]:
# Our answer to this business question will be:
#   1. List of top 20 origin airports with highest severe delayed (aka unacceptable) flights ratio 
#      (based on total number of flights)
#   2. List of top 20 origin airports with severe delayed flights ratio by severity category (unacceptable,
#      impactful and annoying)

# In order to be able to deliver these insights, we need some preparation:
#   1. Define a DataFrame with total flights per Origin airport (totalFlightsOriginDF)
#   2. Define a DataFrame with aggregated data by Origin and DelaySeverity to figure out
#      number of flights delayed per severity category (severeDelaysOriginDF)
#   3. Combine both DataFrames to come up with one single DataFrame containin total flights
#      per origin airport and number of flights delayed by severity to compute ratios (combinedDF)

totalFlightsOriginDF = \
   flightsDF.groupBy("Origin")\
            .agg(count(lit(1)).alias("TotalFlights"))
severeDelaysOriginDF = \
  delayCategorizationDF.where((col("Cancelled")==0))\
                       .where((col("DelaySeverity")!="1.nodelay") & (col("DelaySeverity")!="2.acceptable"))\
                       .select("Origin", "DelaySeverity")\
                       .groupBy("Origin", "DelaySeverity")\
                       .agg(count(lit(1)).alias("NumSevereDelayedFlights"))

combinedDF = \
  severeDelaysOriginDF\
     .join(totalFlightsOriginDF, "Origin")\
     .withColumn("SevereDelayedRatio", round(col("NumSevereDelayedFlights")/col("TotalFlights")*100,2))\
     .orderBy(col("SevereDelayedRatio").desc())
combinedDF.cache() # optimization to make the processing faster

display(Markdown("**Top 20 origin airports** with highest severe delayed (**unacceptable**) flights ratio (in \%):"))
combinedDF.limit(20).show()
display(Markdown("**Top 20 origin airports with severe delayed flights ratio** by severity category (in \%):"))
combinedDF\
   .groupBy("Origin")\
   .pivot("DelaySeverity")\
   .min("SevereDelayedRatio")\
   .orderBy(col("`5.unacceptable`").desc(), col("`4.impactul`").desc(), col("`3.annoying`").desc())\
   .limit(20).show()



**Top 20 origin airports** with highest severe delayed (**unacceptable**) flights ratio (in \%):

+------+--------------+-----------------------+------------+------------------+
|Origin| DelaySeverity|NumSevereDelayedFlights|TotalFlights|SevereDelayedRatio|
+------+--------------+-----------------------+------------+------------------+
|   LCH|    4.impactul|                      1|           1|             100.0|
|   ORD|    4.impactul|                      1|           1|             100.0|
|   SYR|    4.impactul|                      1|           1|             100.0|
|   IAH|    4.impactul|                      2|           9|             22.22|
|   SFO|5.unacceptable|                    154|         743|             20.73|
|   COS|    3.annoying|                     23|         181|             12.71|
|   LAS|    3.annoying|                    743|        6817|              10.9|
|   GEG|    3.annoying|                     73|         673|             10.85|
|   MAF|    3.annoying|                     34|         329|             10.33|
|   BOI|    3.annoying|                 

**Top 20 origin airports with severe delayed flights ratio** by severity category (in \%):

+------+----------+----------+--------------+
|Origin|3.annoying|4.impactul|5.unacceptable|
+------+----------+----------+--------------+
|   SFO|      7.67|      6.46|         20.73|
|   SLC|      9.84|      7.89|          9.52|
|   BFL|      3.28|      4.92|           8.2|
|   LAX|      9.85|       9.2|          7.48|
|   LAS|      10.9|      8.32|          7.16|
|   EWR|      7.14|      7.14|          7.14|
|   SAN|      8.67|      8.46|          7.12|
|   MDW|      8.88|      7.42|          6.78|
|   GEG|     10.85|       5.5|          6.09|
|   OAK|      9.32|      7.72|           6.0|
|   RNO|      9.59|      7.31|           5.9|
|   BOI|     10.21|      6.86|          5.58|
|   COS|     12.71|      6.08|          5.52|
|   FAT|      2.07|      4.14|          5.52|
|   SJC|      8.06|      7.78|          5.33|
|   LGB|      7.41|      2.96|          5.19|
|   SMF|      8.62|      6.89|          4.97|
|   PDX|      6.86|      6.69|          4.85|
|   TUS|      7.49|      6.49|    