In [0]:
#dbutils.fs.mkdirs("/FileStore/tables/landing/")

In [0]:
#%fs ls dbfs:/FileStore/tables

In [0]:
#dbutils.fs.mkdirs("/FileStore/tables/on_time_flight_performance")

In [0]:
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.functions import *
from graphframes import *

from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import DateType


In [0]:
#input files
tripdelaysFilePath = "/FileStore/tables/raw/departuredelays.csv"
airportsnaFilePath = "/FileStore/tables/raw/airport_codes_na.txt"
tripdelaysParquet  = "/FileStore/tables/landing/tripdelays"
airportsnaParquet  = "/FileStore/tables/landing/airportsna"

In [0]:
#display(spark.read.csv(tripdelaysFilePath))
display(spark.read.option("delimiter", "\t").csv(airportsnaFilePath))

_c0,_c1,_c2,_c3
City,State,Country,IATA
Abbotsford,BC,Canada,YXX
Aberdeen,SD,USA,ABR
Abilene,TX,USA,ABI
Akron,OH,USA,CAK
Alamosa,CO,USA,ALS
Albany,GA,USA,ABY
Albany,NY,USA,ALB
Albuquerque,NM,USA,ABQ
Alexandria,LA,USA,AEX


In [0]:
# Read Airports data and create a temp table
spark.read.\
  option("header", "true").\
  option("inferschema", "true").\
  option("delimiter", "\t").\
  csv(airportsnaFilePath).\
  write.parquet(airportsnaParquet,mode='overwrite')
airportsna = spark.read.parquet(airportsnaParquet)
airportsna.createOrReplaceTempView("airports_na")

# Read departure Delays data and create a temp table
spark.read.\
  option("header", "true").\
  csv(tripdelaysFilePath).\
  write.parquet(tripdelaysParquet,mode='overwrite')
departureDelays = spark.read.parquet(tripdelaysParquet)
departureDelays.createOrReplaceTempView("departureDelays")

# Available IATA (International Air Transport Association) codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")

In [0]:
# Build `departureDelays_geo` DataFrame
# Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# RegisterTempTable
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")

In [0]:
# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

In [0]:
# Build `tripGraph` GraphFrame
# This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)
print(tripGraph)

# Build `tripGraphPrime` GraphFrame
# This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)

GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])


In [0]:
print(f"Airports: {tripGraph.vertices.count()}")
print(f"Trips: {tripGraph.edges.count()}")

Airports: 279
Trips: 1361141


In [0]:
longestDelay = tripGraph.edges.groupBy().max("delay")
display(longestDelay)

max(delay)
1642


In [0]:
print(f'On-time / Early Flights: {tripGraph.edges.filter("delay <= 0").count()}')
print(f'Delayed Flights: {tripGraph.edges.filter("delay > 0").count()}')

On-time / Early Flights: 780469
Delayed Flights: 580672


In [0]:
# Delay can be <= 0 i.e. the flight left on time or early
sfoDelayedTrips = tripGraph.edges.\
  filter("src = 'SFO' and delay > 0").\
  groupBy("src", "dst").\
  avg("delay").\
  sort(desc("avg(delay)"))

In [0]:
display(sfoDelayedTrips)

src,dst,avg(delay)
SFO,OKC,59.073170731707314
SFO,JAC,57.13333333333333
SFO,COS,53.97619047619048
SFO,OTH,48.09090909090909
SFO,SAT,47.625
SFO,MOD,46.80952380952381
SFO,SUN,46.723404255319146
SFO,CIC,46.72164948453608
SFO,ABQ,44.8125
SFO,ASE,44.285714285714285


In [0]:
# After displaying tripDelays, use Plot Options to set `state_dst` as a Key.
tripDelays = tripGraph.edges.filter("delay > 0")
display(tripDelays)

tripid,delay,src,dst,city_dst,state_dst
3081956,7,SLC,BTM,Butte,MT
3032208,10,SLC,BTM,Butte,MT
3031113,5,SLC,BTM,Butte,MT
1201110,16,SLC,BTM,Butte,MT
1121110,25,SLC,BTM,Butte,MT
1101110,3,SLC,BTM,Butte,MT
1092151,6,SLC,BTM,Butte,MT
1091110,48,SLC,BTM,Butte,MT
1061110,1,SLC,BTM,Butte,MT
1042150,6,SLC,BTM,Butte,MT


In [0]:
#(display(tripEdges))

In [0]:
# States with the longest cumulative delays (with individual delays > 100 minutes) (origin: Philadelphia)
#tripEdges.createOrReplaceTempView("tripgraph")
display(tripGraph.edges.filter("src ='PHL' and delay > 100"))

tripid,delay,src,dst,city_dst,state_dst
3310600,111,PHL,ATL,Atlanta,GA
3301810,216,PHL,ATL,Atlanta,GA
3291732,182,PHL,ATL,Atlanta,GA
3061728,126,PHL,ATL,Atlanta,GA
3040600,484,PHL,ATL,Atlanta,GA
3291530,104,PHL,ATL,Atlanta,GA
3291805,126,PHL,ATL,Atlanta,GA
3281915,118,PHL,ATL,Atlanta,GA
3191915,106,PHL,ATL,Atlanta,GA
3180615,393,PHL,ATL,Atlanta,GA


# Vertex Degrees
- <code>inDegrees</code>: Incoming connections to the airport
- <code>outDegrees</code>: Outgoing connections from the airport
- <code>degrees</code>: Total connections to and from the airport

Reviewing the various properties of the property graph to understand the incoming and outgoing connections between airports.

In [0]:
display(tripGraph.degrees.sort(desc("degree")).limit(20))

id,degree
ATL,179774
DFW,133966
ORD,125405
LAX,106853
DEN,103699
IAH,85685
PHX,79672
SFO,77635
LAS,66101
CLT,56103


#City / Flight Relationships through Motif Finding
To more easily understand the complex relationship of city airports and their flights with each other, we can use motifs to find patterns of airports (i.e. vertices) connected by flights (i.e. edges). The result is a DataFrame in which the column names are given by the motif keys.

In [0]:
'''
Using tripGraphPrime to more easily display 
- The associated edge (ab, bc) relationships 
- With the different the city / airports (a, b, c) where SFO is the connecting city (b)
- Ensuring that flight ab (i.e. the flight to SFO) occured before flight bc (i.e. flight leaving SFO)
- Note, TripID was generated based on time in the format of MMDDHHMM converted to int
- Therefore bc.tripid < ab.tripid + 10000 means the second flight (bc) occured within approx a day of the first flight (ab)
Note: In reality, we would need to be more careful to link trips ab and bc.
'''
motifs = tripGraphPrime.\
  find("(a)-[ab]->(b); (b)-[bc]->(c)").\
  filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000")

display(motifs)

a,ab,b,bc,c
"List(MSY, New Orleans, LA, USA)","List(1201725, 2, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(MSY, New Orleans, LA, USA)","List(1011751, -4, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(1021507, 536, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(MSY, New Orleans, LA, USA)","List(2121725, 15, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2131420, 504, SFO, SAN)","List(SAN, San Diego, CA, USA)"
"List(MSY, New Orleans, LA, USA)","List(2091725, 87, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2092110, 740, SFO, MIA)","List(MIA, Miami, FL, USA)"
"List(MSY, New Orleans, LA, USA)","List(2091725, 87, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2092230, 636, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(1210750, 7, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(1211105, 19, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(1211335, 3, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(1201825, 80, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(1201650, 21, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"


#Determining Airport Ranking using PageRank

There are a large number of flights and connections through these various airports included in this Departure Delay Dataset. Using the pageRank algorithm, Spark iteratively traverses the graph and determines a rough estimate of how important the airport is.

In [0]:
ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)

In [0]:
display(ranks.vertices.orderBy(desc("pagerank")).limit(20))

id,City,State,Country,pagerank
ATL,Atlanta,GA,USA,18.91010461672981
DFW,Dallas,TX,USA,13.699227467378964
ORD,Chicago,IL,USA,13.163049993795983
DEN,Denver,CO,USA,9.723388283811564
LAX,Los Angeles,CA,USA,8.703656827807166
IAH,Houston,TX,USA,7.991324463091128
SFO,San Francisco,CA,USA,6.903242998287933
PHX,Phoenix,AZ,USA,6.505886984498643
SLC,Salt Lake City,UT,USA,5.799587684561128
LAS,Las Vegas,NV,USA,5.25359244560915


#Most popular flights (single city hops)
Using the tripGraph, we can quickly determine what are the most popular single city hop flights

In [0]:
topTrips = tripGraph.edges.groupBy("src", "dst").agg(count("delay").alias("trips"))

In [0]:
display(topTrips.orderBy(desc("trips")).limit(20))

src,dst,trips
SFO,LAX,3232
LAX,SFO,3198
LAS,LAX,3016
LAX,LAS,2964
JFK,LAX,2720
LAX,JFK,2719
ATL,LGA,2501
LGA,ATL,2500
LAX,PHX,2394
PHX,LAX,2387


#Top Transfer Cities
Many airports are used as transfer points instead of the final Destination. An easy way to calculate this is by calculating the ratio of inDegree (the number of flights to the airport) / outDegree (the number of flights leaving the airport). Values close to 1 may indicate many transfers, whereas values < 1 indicate many outgoing flights and > 1 indicate many incoming flights.

In [0]:
# Calculate the inDeg (flights into the airport) and outDeg (flights leaving the airport)
inDeg = tripGraph.inDegrees
outDeg = tripGraph.outDegrees

# Calculate the degreeRatio (inDeg/outDeg), perform inner join on "id" column
degreeRatio = inDeg.join(outDeg, inDeg.id == outDeg.id).drop(outDeg.id).selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio").cache()

# Join back to the `airports` DataFrame (instead of registering temp table as above)
nonTransferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA).selectExpr("id", "city", "degreeRatio").filter("degreeRatio < 0.9 or degreeRatio > 1.1")

# List out the city airports which have abnormal degree ratios
display(nonTransferAirports)

id,city,degreeRatio
GFK,Grand Forks,1.3333333333333333
FAI,Fairbanks,1.123268698060942
OME,Nome,0.5084745762711864
BRW,Barrow,0.2865168539325842


In [0]:
# Join back to the `airports` DataFrame (instead of registering temp table as above)
transferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA).selectExpr("id", "city", "degreeRatio").filter("degreeRatio between 0.9 and 1.1")
  
# List out the top 10 transfer city airports
display(transferAirports.orderBy("degreeRatio").limit(10))

id,city,degreeRatio
MSP,Minneapolis,0.9375183338222352
DEN,Denver,0.958025717037065
DFW,Dallas,0.964339653074092
ORD,Chicago,0.9671063983310064
SLC,Salt Lake City,0.9827417906368358
IAH,Houston,0.9846895050147084
PHX,Phoenix,0.9891643572266746
OGG,"Kahului, Maui",0.9898718478710212
HNL,"Honolulu, Oahu",0.990535889872173
SFO,San Francisco,0.9909473252295224


#Breadth First Search
Breadth-first search (BFS) is designed to traverse the graph to quickly find the desired vertices (i.e. airports) and edges (i.e flights). Let's try to find the shortest number of connections between cities based on the dataset.

In [0]:
# Example 1: Direct Seattle to San Francisco
# This method returns a DataFrame of valid shortest paths from vertices matching "fromExpr" to vertices matching "toExpr"
filteredPaths = tripGraph.bfs("id = 'SEA'","id ='SFO'",maxPathLength=1)
display(filteredPaths)

from,e0,to
"List(SEA, Seattle, WA, USA)","List(3311855, 99, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3311440, 101, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3311045, 189, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3310615, -7, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3301855, -7, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3301440, -3, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3301045, -5, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3291440, 82, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3290615, -7, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
"List(SEA, Seattle, WA, USA)","List(3281855, -2, SEA, SFO, San Francisco, CA)","List(SFO, San Francisco, CA, USA)"
