In [None]:
#Initialisation
from pyspark.sql.functions import *
import functools
file_type= 'csv'
inferSchema= 'true'
firstRowIsHeader= 'true'
delimiter= ','

In [None]:
#Definition for converting .csv files to dataframes
def processDF (fileLocation):
    return (spark.read.csv(fileLocation, inferSchema= inferSchema, header=firstRowIsHeader, sep=delimiter))

#Dataframes
jan2019DF= processDF("/FileStore/2019_01.csv")
feb2019DF= processDF("/FileStore/2019_02.csv")
mar2019DF= processDF("/FileStore/2019_03.csv")
day_name= processDF("/FileStore/L_WEEKDAYS.csv")
month_name= processDF("/FileStore/L_MONTHS.csv")

In [None]:
%sql
create database model_layer

In [None]:
day_name.write.format('delta').saveAsTable('model_layer.day_name')

In [None]:
#Dataframe containing all of the data
def union(dfs):
    return functools.reduce(lambda jan2019DF, feb2019DF: jan2019DF.union(feb2019DF.select(jan2019DF.columns)), dfs)

unionDF= union([jan2019DF, feb2019DF, mar2019DF]).select('year', 'month','day_of_month','day_of_week','dep_time','dep_delay','arr_delay')

#Only display required columns
unionDF.display()

year,month,day_of_month,day_of_week,dep_time,dep_delay,arr_delay
2019,1,3,4,1205.0,25.0,25.0
2019,1,4,5,1250.0,70.0,82.0
2019,1,5,6,956.0,6.0,-8.0
2019,1,6,7,945.0,-5.0,-24.0
2019,1,7,1,947.0,-3.0,-9.0
2019,1,8,2,950.0,0.0,-13.0
2019,1,9,3,945.0,-5.0,-8.0
2019,1,10,4,943.0,-7.0,-21.0
2019,1,11,5,945.0,-5.0,20.0
2019,1,12,6,949.0,-1.0,-8.0


In [None]:
#Best time of day

#Dataframe showing required columns
best_timeDF= unionDF.select ('dep_time','dep_delay','arr_delay')

#Dataframe where departure time is not null and is ascending
dep_time_ordered= best_timeDF.sort('dep_time').filter('dep_time IS NOT NULL')

#Shows every delayed departure
every_del_time= dep_time_ordered.where(dep_time_ordered.dep_delay>='0').groupBy('dep_time').avg('dep_delay').sort('dep_time')

#Shows average delays over 24 hours
delayed_time=every_del_time.withColumn('Hour', expr('cast(dep_time/ 100 as int)')). groupBy('Hour').agg(round(avg('avg(dep_delay)'),2).alias('Average Delay')).sort('Hour')

delayed_time.display()

Hour,Average Delay
0,106.51
1,124.58
2,156.62
3,109.12
4,96.47
5,14.3
6,18.64
7,24.8
8,26.92
9,30.29


In [None]:
#Best day of the week

#Dataframe showing required columns
best_dayDF= unionDF.select('day_of_week','dep_delay','arr_delay')
seven_days= best_dayDF.where(best_dayDF.dep_delay>='0')\
            .groupBy('day_of_week')\
            .avg('dep_delay')\
            .join(day_name, best_dayDF['day_of_week'] == day_name['code'])\
            .sort('code')\
            .select('description','avg(dep_delay)')

seven_days.display()

description,avg(dep_delay)
Monday,37.20579612680866
Tuesday,33.40634647788571
Wednesday,36.58598295223921
Thursday,35.60963952130765
Friday,34.287149551623955
Saturday,33.29475408586994
Sunday,34.49151008402497


In [None]:
#Best time of the year(months)

#Dataframe showing required columns
best_monthDF= unionDF.select('month','dep_delay','arr_delay')
best_month= best_monthDF.where(best_dayDF.dep_delay>='0')\
            .groupBy('month')\
            .avg('dep_delay')\
            .join(month_name, best_monthDF['month'] == month_name['code'])\
            .sort(asc('month'))\
            .select('Description','avg(dep_delay)')

best_month.display()

Description,avg(dep_delay)
January,35.38190699620594
February,38.0646369792809
March,31.91831079675473


In [None]:
plane_dataDF= processDF("/FileStore/plane_data.csv")

In [None]:
#Dataframe containing combined data for months

def union(dfs):
    return functools.reduce(lambda jan2019DF, feb2019DF: jan2019DF\
                            .union(feb2019DF\
                            .select(jan2019DF.columns)), dfs)

unionDF= union(([jan2019DF, feb2019DF, mar2019DF]))

#Only display required columns
req_columns= unionDF.select('tail_num','dep_delay').groupBy('tail_num').avg('dep_delay')
req_columns.display()

tail_num,avg(dep_delay)
N8322X,10.257971014492751
N8554X,6.36461126005362
N954WN,7.96
N8513F,9.072164948453608
N229PS,15.169767441860465
N623NK,3.277333333333333
N466SW,31.357798165137616
N866AS,11.927360774818402
N178SY,0.2799097065462754
N510UW,8.538461538461538


In [None]:
#Dataframe containing combined data including tailnumber

TNunionDF= req_columns.join(plane_dataDF,req_columns['tail_num'] == plane_dataDF['tailnum'],'leftouter')\
            .filter('tailnum IS NOT NULL')

TNunionDF.display()

tail_num,avg(dep_delay),tailnum,type,manufacturer,issue_date,model,status,aircraft_type,engine_type,year
N466SW,31.357798165137616,N466SW,Corporation,BOMBARDIER INC,10/17/2003,CL-600-2B19,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2003.0
N866AS,11.927360774818402,N866AS,Corporation,BOMBARDIER INC,06/15/2001,CL-600-2B19,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2001.0
N102UW,14.300380228136882,N102UW,Corporation,AIRBUS INDUSTRIE,05/26/1999,A320-214,Valid,Fixed Wing Multi-Engine,Turbo-Fan,1998.0
N73283,18.106382978723403,N73283,Corporation,BOEING,02/26/2004,737-824,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2004.0
N496WN,5.993927125506073,N496WN,Corporation,BOEING,02/04/2005,737-7H4,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2004.0
N251WN,7.759183673469388,N251WN,Corporation,BOEING,09/13/2006,737-7H4,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2006.0
N445WN,9.02510460251046,N445WN,Corporation,BOEING,01/23/2004,737-7H4,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2003.0
N914WN,13.364035087719298,N914WN,Corporation,BOEING,04/17/2008,737-7H4,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2008.0
N654AW,6.973684210526316,N654AW,Corporation,AIRBUS INDUSTRIE,06/17/2004,A320-232,Valid,Fixed Wing Multi-Engine,Turbo-Jet,1999.0
N448WN,10.617710583153348,N448WN,Corporation,BOEING,02/03/2004,737-7H4,Valid,Fixed Wing Multi-Engine,Turbo-Fan,2003.0


In [None]:
#Correlation between the issue date and frequencies of delays

correlation= TNunionDF.select('avg(dep_delay)', 'year')\
            .groupBy('year')\
            .avg('avg(dep_delay)')\
            .sort(asc('year'))\
            .filter('year IS NOT NULL')

correlation.display()

year,avg(avg(dep_delay))
1986.0,36.63157894736842
1987.0,4.666227472063214
1988.0,4.84319504906159
1989.0,13.10130008506452
1990.0,9.04193327320219
1991.0,6.850884102640643
1992.0,9.705381981443791
1993.0,14.551395331838552
1994.0,16.839204032321692
1995.0,11.835422726294668


In [None]:
Our_DF=unionDF.select('year', 'month','day_of_month','day_of_week','ORIGIN','DEST')
Our_DF.display()

year,month,day_of_month,day_of_week,ORIGIN,DEST
2019,1,3,4,TYS,ATL
2019,1,4,5,TYS,ATL
2019,1,5,6,ATL,SGF
2019,1,6,7,ATL,SGF
2019,1,7,1,ATL,SGF
2019,1,8,2,ATL,SGF
2019,1,9,3,ATL,SGF
2019,1,10,4,ATL,SGF
2019,1,11,5,ATL,SGF
2019,1,12,6,ATL,SGF


In [None]:
Flight_date=Our_DF.withColumn('Date', concat_ws('-', Our_DF['year'],Our_DF['month'],Our_DF['day_of_month']).cast('Date'))

In [None]:
Flight_date.display()

year,month,day_of_month,day_of_week,ORIGIN,DEST,Date
2019,1,3,4,TYS,ATL,2019-01-03
2019,1,4,5,TYS,ATL,2019-01-04
2019,1,5,6,ATL,SGF,2019-01-05
2019,1,6,7,ATL,SGF,2019-01-06
2019,1,7,1,ATL,SGF,2019-01-07
2019,1,8,2,ATL,SGF,2019-01-08
2019,1,9,3,ATL,SGF,2019-01-09
2019,1,10,4,ATL,SGF,2019-01-10
2019,1,11,5,ATL,SGF,2019-01-11
2019,1,12,6,ATL,SGF,2019-01-12


In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import struct
Flight_date_group = Flight_date.groupBy(
    "Date",
    "DEST",
    "ORIGIN"
).agg(
    F.count(F.lit(1)).alias("count")
).select(
    "Date",
    "DEST",'ORIGIN',
    "count"
)
Flight_date_group.display()


Date,DEST,ORIGIN,count
2019-01-22,BTV,DTW,3
2019-01-13,ATL,SBN,4
2019-01-11,EVV,DTW,3
2019-01-21,GRR,LGA,3
2019-01-04,DFW,DTW,11
2019-01-04,ATL,CAK,3
2019-01-02,MSP,MEM,3
2019-01-04,LAS,ELP,3
2019-01-04,ABQ,HOU,3
2019-01-04,TUS,MDW,1


In [None]:
Flight_Trip_group=Flight_date_group.withColumn("Trip",struct(Flight_date_group.ORIGIN,Flight_date_group.DEST))

In [None]:
Flight_Trip_group.display()

Date,DEST,ORIGIN,count,Trip
2019-01-22,BTV,DTW,3,"List(DTW, BTV)"
2019-01-13,ATL,SBN,4,"List(SBN, ATL)"
2019-01-11,EVV,DTW,3,"List(DTW, EVV)"
2019-01-21,GRR,LGA,3,"List(LGA, GRR)"
2019-01-04,DFW,DTW,11,"List(DTW, DFW)"
2019-01-04,ATL,CAK,3,"List(CAK, ATL)"
2019-01-02,MSP,MEM,3,"List(MEM, MSP)"
2019-01-04,LAS,ELP,3,"List(ELP, LAS)"
2019-01-04,ABQ,HOU,3,"List(HOU, ABQ)"
2019-01-04,TUS,MDW,1,"List(MDW, TUS)"


In [None]:
#Dataframe containing combined data for months

def union(dfs):
    return functools.reduce(lambda jan2019DF, feb2019DF: jan2019DF\
                            .union(feb2019DF\
                            .select(jan2019DF.columns)), dfs)

unionDF= union(([jan2019DF, feb2019DF, mar2019DF]))

In [None]:
Cascade_delay  = unionDF.select('ORIGIN', 'DEST','DEP_DELAY','TAIL_NUM','YEAR','MONTH','DAY_OF_MONTH')
Cascade_delayed = Cascade_delay.withColumn('Delayed',F.when(Cascade_delay.DEP_DELAY>0,1).otherwise(0))
Cascade_delayed.display()

ORIGIN,DEST,DEP_DELAY,TAIL_NUM,YEAR,MONTH,DAY_OF_MONTH,Delayed
TYS,ATL,25.0,N195PQ,2019,1,3,1
TYS,ATL,70.0,N919XJ,2019,1,4,1
ATL,SGF,6.0,N316PQ,2019,1,5,1
ATL,SGF,-5.0,N325PQ,2019,1,6,0
ATL,SGF,-3.0,N904XJ,2019,1,7,0
ATL,SGF,0.0,N917XJ,2019,1,8,0
ATL,SGF,-5.0,N904XJ,2019,1,9,0
ATL,SGF,-7.0,N919XJ,2019,1,10,0
ATL,SGF,-5.0,N135EV,2019,1,11,0
ATL,SGF,-1.0,N302PQ,2019,1,12,0


In [None]:
Cascade_delayed_=Cascade_delayed.withColumn('Date', concat_ws('-', Cascade_delayed['year'],Cascade_delayed['month'],Cascade_delayed['day_of_month']).cast('Date'))
Cascade_delayed_.display()

ORIGIN,DEST,DEP_DELAY,TAIL_NUM,YEAR,MONTH,DAY_OF_MONTH,Delayed,Date
TYS,ATL,25.0,N195PQ,2019,1,3,1,2019-01-03
TYS,ATL,70.0,N919XJ,2019,1,4,1,2019-01-04
ATL,SGF,6.0,N316PQ,2019,1,5,1,2019-01-05
ATL,SGF,-5.0,N325PQ,2019,1,6,0,2019-01-06
ATL,SGF,-3.0,N904XJ,2019,1,7,0,2019-01-07
ATL,SGF,0.0,N917XJ,2019,1,8,0,2019-01-08
ATL,SGF,-5.0,N904XJ,2019,1,9,0,2019-01-09
ATL,SGF,-7.0,N919XJ,2019,1,10,0,2019-01-10
ATL,SGF,-5.0,N135EV,2019,1,11,0,2019-01-11
ATL,SGF,-1.0,N302PQ,2019,1,12,0,2019-01-12


In [None]:
Cascade_delay = Cascade_delayed_.withColumn('cascaded_Delay_destination',F.when(Cascade_delayed_.Delayed==1,Cascade_delayed_.DEST).otherwise('NO'))
Cascade_delay = Cascade_delay.withColumn('cascaded_Delay_tail_number',F.when(Cascade_delay.Delayed==1,Cascade_delay.TAIL_NUM).otherwise('NO'))
Cascade_delayed = Cascade_delay.select('ORIGIN','DEST','DEP_DELAY','TAIL_NUM','Date','Delayed')
Cascade_delay.display()

ORIGIN,DEST,DEP_DELAY,TAIL_NUM,Date,Delayed,cascaded_Delay_destination,cascaded_Delay_tail_number
TYS,ATL,25.0,N195PQ,2019-01-03,1,ATL,N195PQ
TYS,ATL,70.0,N919XJ,2019-01-04,1,ATL,N919XJ
ATL,SGF,6.0,N316PQ,2019-01-05,1,SGF,N316PQ
ATL,SGF,-5.0,N325PQ,2019-01-06,0,NO,NO
ATL,SGF,-3.0,N904XJ,2019-01-07,0,NO,NO
ATL,SGF,0.0,N917XJ,2019-01-08,0,NO,NO
ATL,SGF,-5.0,N904XJ,2019-01-09,0,NO,NO
ATL,SGF,-7.0,N919XJ,2019-01-10,0,NO,NO
ATL,SGF,-5.0,N135EV,2019-01-11,0,NO,NO
ATL,SGF,-1.0,N302PQ,2019-01-12,0,NO,NO


In [None]:
#Dataframes
carriersDF= processDF("/FileStore/carriers.csv")

carriers_nameDF= processDF("/FileStore/L_UNIQUE_CARRIERS.csv")


In [None]:
#Dataframe containing combined data for months

def union(dfs):
    return functools.reduce(lambda jan2019DF, feb2019DF: jan2019DF.union(feb2019DF.select(jan2019DF.columns)), dfs)

unionDF= union(([jan2019DF, feb2019DF, mar2019DF]))

#Only display required columns
req_columns= unionDF.select('op_unique_carrier','carrier_delay','dep_delay').groupBy('op_unique_carrier').avg('dep_delay','carrier_delay').join(carriers_nameDF, unionDF['op_unique_carrier'] == carriers_nameDF['code'])
req_columns.display()

op_unique_carrier,avg(dep_delay),avg(carrier_delay),Code,Description
UA,11.963119633577522,15.261810557640931,UA,United Air Lines Inc.
NK,8.555953427065026,15.673102889549014,NK,Spirit Air Lines
AA,9.910563773062936,20.46314032158861,AA,American Airlines Inc.
B6,18.41371933627011,28.19862546561041,B6,JetBlue Airways
OO,15.962570031550776,30.81353419416547,OO,SkyWest Airlines Inc.
MQ,9.472886265053562,14.785596983151876,MQ,Envoy Air
OH,8.803159246319687,19.34791789847116,OH,PSA Airlines Inc.
G4,8.306582656275907,21.5468364831553,G4,Allegiant Air
WN,10.046289567311034,18.772232224024528,WN,Southwest Airlines Co.
9E,10.40867178740566,18.935831760582367,9E,Endeavor Air Inc.


Databricks visualization. Run in Databricks to view.

In [None]:
#Highest number of delays
max_delays = req_columns.sort(desc("avg(carrier_delay)"))

max_delays.display()

op_unique_carrier,avg(dep_delay),avg(carrier_delay),Code,Description
OO,15.962570031550776,30.81353419416547,OO,SkyWest Airlines Inc.
B6,18.41371933627011,28.19862546561041,B6,JetBlue Airways
YV,10.874698749091465,25.98618402873722,YV,Mesa Airlines Inc.
EV,14.580662333295976,25.28380557475236,EV,ExpressJet Airlines LLC
HA,2.4739866754818696,24.35979790689282,HA,Hawaiian Airlines Inc.
DL,7.936962088341987,23.520491541653367,DL,Delta Air Lines Inc.
G4,8.306582656275907,21.5468364831553,G4,Allegiant Air
AA,9.910563773062936,20.46314032158861,AA,American Airlines Inc.
OH,8.803159246319687,19.34791789847116,OH,PSA Airlines Inc.
9E,10.40867178740566,18.935831760582367,9E,Endeavor Air Inc.


In [None]:
#Lowest number of delays
min_delays = req_columns.sort(asc("avg(carrier_delay)"))
min_delays.display()

op_unique_carrier,avg(dep_delay),avg(carrier_delay),Code,Description
AS,5.816063386155129,13.021348859776808,AS,Alaska Airlines Inc.
YX,7.88664696399785,14.43276713812005,YX,Republic Airline
MQ,9.472886265053562,14.785596983151876,MQ,Envoy Air
UA,11.963119633577522,15.261810557640931,UA,United Air Lines Inc.
NK,8.555953427065026,15.673102889549014,NK,Spirit Air Lines
WN,10.046289567311034,18.772232224024528,WN,Southwest Airlines Co.
F9,12.97552736550832,18.793162393162397,F9,Frontier Airlines Inc.
9E,10.40867178740566,18.935831760582367,9E,Endeavor Air Inc.
OH,8.803159246319687,19.34791789847116,OH,PSA Airlines Inc.
AA,9.910563773062936,20.46314032158861,AA,American Airlines Inc.


Databricks visualization. Run in Databricks to view.