In [1]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark Airlines Data Analysis") \
    .getOrCreate()

In [3]:
# Creating dataframe from csv at the HDFS :

airlines_df = spark.read.format("csv").option("header","true").option("inferSchema", "true")\
.load("/common_folder/airlines/data_2004-08.csv")

In [5]:
# Checking the dataframe created:
airlines_df.head()

Row(_c0=1, Year=2006, Month=1, DayofMonth=11, DayOfWeek=3, DepTime='743', CRSDepTime=745, ArrTime='1024', CRSArrTime=1018, UniqueCarrier='US', FlightNum=343, TailNum='N657AW', ActualElapsedTime='281', CRSElapsedTime=273, AirTime='223', ArrDelay='6', DepDelay='-2', Origin='ATL', Dest='PHX', Distance=1587, TaxiIn='45', TaxiOut='13', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay='0', WeatherDelay='0', NASDelay='0', SecurityDelay='0', LateAircraftDelay='0')

In [8]:
# Filter the data to find the values where the ArrDelay and DeptDelay are greater than zero because time cannot be in negative.

from pyspark.sql.functions import col, asc

filterDF = airlines_df.filter((col("ArrDelay") > 0) & (col("DepDelay") > 0))
display(filterDF)
filterDF.show()

DataFrame[_c0: int, Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: int, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: int, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|_c0|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  7|2006|    1|        11|        3|    825|       820|   1041|      1021|       

In [23]:
# Apply 'groupby' on Destination and summarize by mean on arrival delay.
 # Sort it in a descending order to identify the airport with the highest arrival delay.
    
from pyspark.sql.functions import expr, desc

agg_df = filterDF.groupby("dest").agg(expr("mean(Arrdelay)").alias("mean_arrival_delay")).sort(desc("mean_arrival_delay"))
agg_df.show()

+----+------------------+
|dest|mean_arrival_delay|
+----+------------------+
| BIL|             99.25|
| MDT|              69.5|
| ORD|  67.7074788902292|
| GUC|              66.2|
| GSO|            66.125|
| DAY|             63.75|
| CVG|62.774193548387096|
| GRR|58.794871794871796|
| DSM|58.017857142857146|
| ATL|56.663716814159294|
| OGG|              54.4|
| IAH| 50.20524017467249|
| EWR|49.515358361774744|
| ICT|48.310344827586206|
| SFO|47.056569343065696|
| LGA|  46.1924882629108|
| COS| 44.38461538461539|
| MSP| 43.18939393939394|
| PHL| 43.14143426294821|
| GEG|  43.1139896373057|
+----+------------------+
only showing top 20 rows



In [25]:
# Apply 'groupby' on Destination and summarize by mean on departure delay.
 # Sort it in a descending order to identify the airport with the highest departure delay.

from pyspark.sql.functions import expr, desc

agg_df = filterDF.groupby("dest").agg(expr("mean(depdelay)").alias("mean_departure_delay")).sort(desc("mean_departure_delay"))
agg_df.show()

+----+--------------------+
|dest|mean_departure_delay|
+----+--------------------+
| BIL|              100.25|
| MDT|   76.66666666666667|
| GSO|                71.7|
| GUC|                69.6|
| OGG|                59.3|
| DAY|              59.275|
| GRR|   56.46153846153846|
| ATL|   56.35398230088496|
| ORD|  56.274427020506636|
| DSM|   55.69642857142857|
| CVG|   55.45161290322581|
| PWM|                49.0|
| IAH|   48.98689956331878|
| EWR|   48.13993174061434|
| BTV|   46.76470588235294|
| ICT|   45.37931034482759|
| LGA|   44.64553990610329|
| GEG|   42.73056994818653|
| TUS|   42.24338624338624|
| SFO|  42.051094890510946|
+----+--------------------+
only showing top 20 rows



## Conclusion:
On average, this airport seems to have the highest arrival as well as departure delay . Looks like they need to take some action about this! 