In [0]:
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.functions import *
from graphframes import *

from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import DateType

In [0]:
tripdelaysFilePath = "/FileStore/tables/departuredelays.csv"
airportsnaFilePath = "/FileStore/tables/airport_codes_na.txt"
tripdelaysParquet  = "/FileStore/tables/landing/tripdelays"
airportsnaParquet  = "/FileStore/tables/landing/airportsna"

In [0]:
display(spark.read.option("delimiter", "\t").csv(airportsnaFilePath))

_c0,_c1,_c2,_c3
City,State,Country,IATA
Abbotsford,BC,Canada,YXX
Aberdeen,SD,USA,ABR
Abilene,TX,USA,ABI
Akron,OH,USA,CAK
Alamosa,CO,USA,ALS
Albany,GA,USA,ABY
Albany,NY,USA,ALB
Albuquerque,NM,USA,ABQ
Alexandria,LA,USA,AEX


In [0]:

# Read Airports data and create a temp table
spark.read.\
  option("header", "true").\
  option("inferschema", "true").\
  option("delimiter", "\t").\
  csv(airportsnaFilePath).\
  write.parquet(airportsnaParquet,mode='overwrite')
airportsna = spark.read.parquet(airportsnaParquet)
airportsna.createOrReplaceTempView("airports_na")

# Read departure Delays data and create a temp table
spark.read.\
  option("header", "true").\
  csv(tripdelaysFilePath).\
  write.parquet(tripdelaysParquet,mode='overwrite')
departureDelays = spark.read.parquet(tripdelaysParquet)
departureDelays.createOrReplaceTempView("departureDelays")

# Available IATA (International Air Transport Association) codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")

In [0]:
# Build `departureDelays_geo` DataFrame
# Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# RegisterTempTable
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")

In [0]:
# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

In [0]:
# Build `tripGraph` GraphFrame
# This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)
print(tripGraph)

# Build `tripGraphPrime` GraphFrame
# This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)

GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])


In [0]:
print(f"Airports: {tripGraph.vertices.count()}")
print(f"Trips: {tripGraph.edges.count()}")

Airports: 279
Trips: 1361141


In [0]:
longestDelay = tripGraph.edges.groupBy().max("delay")
display(longestDelay)

max(delay)
1642


In [0]:
print(f'On-time / Early Flights: {tripGraph.edges.filter("delay <= 0").count()}')
print(f'Delayed Flights: {tripGraph.edges.filter("delay > 0").count()}')

On-time / Early Flights: 780469
Delayed Flights: 580672


In [0]:
# Delay can be <= 0 i.e. the flight left on time or early
sfoDelayedTrips = tripGraph.edges.\
  filter("src = 'SFO' and delay > 0").\
  groupBy("src", "dst").\
  avg("delay").\
  sort(desc("avg(delay)"))

In [0]:
display(sfoDelayedTrips)

src,dst,avg(delay)
SFO,OKC,59.073170731707314
SFO,JAC,57.13333333333333
SFO,COS,53.97619047619048
SFO,OTH,48.09090909090909
SFO,SAT,47.625
SFO,MOD,46.80952380952381
SFO,SUN,46.723404255319146
SFO,CIC,46.72164948453608
SFO,ABQ,44.8125
SFO,ASE,44.285714285714285


In [0]:
# After displaying tripDelays, use Plot Options to set `state_dst` as a Key.
tripDelays = tripGraph.edges.filter("delay > 0")
display(tripDelays)

tripid,delay,src,dst,city_dst,state_dst
1311815,43,MSY,FLL,Fort Lauderdale,FL
1311545,29,MSY,STL,St. Louis,MO
1311410,5,MSY,BNA,Nashville,TN
1311545,15,MSY,DAL,Dallas,TX
1311110,14,MSY,EYW,Key West,FL
1310600,47,MSY,DAL,Dallas,TX
1311835,38,MSY,TPA,Tampa,FL
1311155,13,MSY,LAS,Las Vegas,NV
1311910,56,MSY,HOU,Houston,TX
1311310,6,MSY,HOU,Houston,TX


In [0]:
display(tripGraph.degrees.sort(desc("degree")).limit(20))

id,degree
ATL,179774
DFW,133966
ORD,125405
LAX,106853
DEN,103699
IAH,85685
PHX,79672
SFO,77635
LAS,66101
CLT,56103


In [0]:
motifs = tripGraphPrime.\
  find("(a)-[ab]->(b); (b)-[bc]->(c)").\
  filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000")

display(motifs)

a,ab,b,bc,c
"List(MSY, New Orleans, LA, USA)","List(1201725, 2, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(1211508, 593, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(MSY, New Orleans, LA, USA)","List(1011751, -4, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(1021507, 536, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(MSY, New Orleans, LA, USA)","List(2121725, 15, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2131420, 504, SFO, SAN)","List(SAN, San Diego, CA, USA)"
"List(MSY, New Orleans, LA, USA)","List(2091725, 87, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2092110, 740, SFO, MIA)","List(MIA, Miami, FL, USA)"
"List(MSY, New Orleans, LA, USA)","List(2091725, 87, MSY, SFO)","List(SFO, San Francisco, CA, USA)","List(2092230, 636, SFO, JFK)","List(JFK, New York, NY, USA)"
"List(SNA, Orange County, CA, USA)","List(2190755, -7, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(2190925, 1638, SFO, ORD)","List(ORD, Chicago, IL, USA)"
"List(SNA, Orange County, CA, USA)","List(2182030, 12, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(2190925, 1638, SFO, ORD)","List(ORD, Chicago, IL, USA)"
"List(SNA, Orange County, CA, USA)","List(2181935, 9, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(2190925, 1638, SFO, ORD)","List(ORD, Chicago, IL, USA)"
"List(SNA, Orange County, CA, USA)","List(2180940, 8, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(2190925, 1638, SFO, ORD)","List(ORD, Chicago, IL, USA)"
"List(SNA, Orange County, CA, USA)","List(2181320, 12, SNA, SFO)","List(SFO, San Francisco, CA, USA)","List(2190925, 1638, SFO, ORD)","List(ORD, Chicago, IL, USA)"


In [0]:
ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)

In [0]:
display(ranks.vertices.orderBy(desc("pagerank")).limit(20))

id,City,State,Country,pagerank
ATL,Atlanta,GA,USA,18.91010461672981
DFW,Dallas,TX,USA,13.699227467378964
ORD,Chicago,IL,USA,13.163049993795983
DEN,Denver,CO,USA,9.723388283811564
LAX,Los Angeles,CA,USA,8.703656827807166
IAH,Houston,TX,USA,7.991324463091128
SFO,San Francisco,CA,USA,6.903242998287933
PHX,Phoenix,AZ,USA,6.505886984498643
SLC,Salt Lake City,UT,USA,5.799587684561128
LAS,Las Vegas,NV,USA,5.25359244560915
