In [1]:
from graphframes import *

In [14]:
# Set File Paths
tripdelaysFilePath = "/Users/liouscott/Documents/scott/graph/departuredelays.csv"
airportsnaFilePath = "/Users/liouscott/Documents/scott/graph/airport-codes-na.txt"

In [15]:
# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports_na")

In [16]:
airportsna.show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
|   Alliance|   NE|    USA| AIA|
|     Alpena|   MI|    USA| APN|
|    Altoona|   PA|    USA| AOO|
|   Amarillo|   TX|    USA| AMA|
|Anahim Lake|   BC| Canada| YAA|
|  Anchorage|   AK|    USA| ANC|
|   Appleton|   WI|    USA| ATW|
|     Arviat|  NWT| Canada| YEK|
|  Asheville|   NC|    USA| AVL|
|      Aspen|   CO|    USA| ASE|
+-----------+-----+-------+----+
only showing top 20 rows



In [17]:
# Obtain departure Delays data
departureDelays = spark.read.csv(tripdelaysFilePath, header='true')
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [18]:
departureDelays.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [19]:
# Available IATA codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

In [20]:
tripIATA.show()

+----+
|iata|
+----+
| PSE|
| INL|
| MSY|
| PPG|
| GEG|
| BUR|
| SNA|
| GRB|
| GTF|
| IDA|
| GRR|
| JLN|
| EUG|
| PSG|
| GSO|
| MYR|
| PVD|
| OAK|
| BTM|
| COD|
+----+
only showing top 20 rows



In [21]:
# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

DataFrame[IATA: string, City: string, State: string, Country: string]

In [22]:
airports.show()

+----+-------------------+-----+-------+
|IATA|               City|State|Country|
+----+-------------------+-----+-------+
| INL|International Falls|   MN|    USA|
| MSY|        New Orleans|   LA|    USA|
| GEG|            Spokane|   WA|    USA|
| BUR|            Burbank|   CA|    USA|
| SNA|      Orange County|   CA|    USA|
| GRB|          Green Bay|   WI|    USA|
| GTF|        Great Falls|   MT|    USA|
| IDA|        Idaho Falls|   ID|    USA|
| GRR|       Grand Rapids|   MI|    USA|
| JLN|             Joplin|   MO|    USA|
| EUG|             Eugene|   OR|    USA|
| GSO|         Greensboro|   NC|    USA|
| MYR|       Myrtle Beach|   SC|    USA|
| PVD|         Providence|   RI|    USA|
| OAK|            Oakland|   CA|    USA|
| BTM|              Butte|   MT|    USA|
| COD|               Cody|   WY|    USA|
| FAR|              Fargo|   ND|    USA|
| FSM|         Fort Smith|   AR|    USA|
| MQT|          Marquette|   MI|    USA|
+----+-------------------+-----+-------+
only showing top

In [23]:
departureDelays.count()

1391578

In [24]:
# Build `departureDelays_geo` DataFrame
#  Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

# Count
departureDelays_geo.count()

1361141

In [25]:
# Review the top 10 rows of the `departureDelays_geo` DataFrame
departureDelays_geo.show(10)

+-------+--------------------+-----+--------+---+---+---------+--------+---------+---------+
| tripid|           localdate|delay|distance|src|dst| city_src|city_dst|state_src|state_dst|
+-------+--------------------+-----+--------+---+---+---------+--------+---------+---------+
|1011245|2014-01-01 12:45:...|    6|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1020600|2014-01-02 06:00:...|   -8|     369|ABE|DTW|Allentown| Detroit|       PA|       MI|
|1021245|2014-01-02 12:45:...|   -2|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1020605|2014-01-02 06:05:...|   -4|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1031245|2014-01-03 12:45:...|   -4|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1030605|2014-01-03 06:05:...|    0|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1041243|2014-01-04 12:43:...|   10|     602|ABE|ATL|Allentown| Atlanta|       PA|       GA|
|1040605|2014-01-04 06:05:...|   28|     602|ABE|ATL|Allentown| Atlant

In [26]:
# Note, ensure you have already installed the GraphFrames spack-package
from pyspark.sql.functions import *
from graphframes import *

# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

DataFrame[id: string, City: string, State: string, Country: string]

In [30]:
tripEdges.show()

+-------+-----+---+---+--------+---------+
| tripid|delay|src|dst|city_dst|state_dst|
+-------+-----+---+---+--------+---------+
|1011245|    6|ABE|ATL| Atlanta|       GA|
|1020600|   -8|ABE|DTW| Detroit|       MI|
|1021245|   -2|ABE|ATL| Atlanta|       GA|
|1020605|   -4|ABE|ATL| Atlanta|       GA|
|1031245|   -4|ABE|ATL| Atlanta|       GA|
|1030605|    0|ABE|ATL| Atlanta|       GA|
|1041243|   10|ABE|ATL| Atlanta|       GA|
|1040605|   28|ABE|ATL| Atlanta|       GA|
|1051245|   88|ABE|ATL| Atlanta|       GA|
|1050605|    9|ABE|ATL| Atlanta|       GA|
|1061215|   -6|ABE|ATL| Atlanta|       GA|
|1061725|   69|ABE|ATL| Atlanta|       GA|
|1061230|    0|ABE|DTW| Detroit|       MI|
|1060625|   -3|ABE|ATL| Atlanta|       GA|
|1070600|    0|ABE|DTW| Detroit|       MI|
|1071725|    0|ABE|ATL| Atlanta|       GA|
|1071230|    0|ABE|DTW| Detroit|       MI|
|1070625|    0|ABE|ATL| Atlanta|       GA|
|1071219|    0|ABE|ORD| Chicago|       IL|
|1080600|    0|ABE|DTW| Detroit|       MI|
+-------+--

In [31]:
tripVertices.show()

+---+----------------+-----+-------+
| id|            City|State|Country|
+---+----------------+-----+-------+
|FAT|          Fresno|   CA|    USA|
|CMH|        Columbus|   OH|    USA|
|PHX|         Phoenix|   AZ|    USA|
|PAH|         Paducah|   KY|    USA|
|COS|Colorado Springs|   CO|    USA|
|MYR|    Myrtle Beach|   SC|    USA|
|RNO|            Reno|   NV|    USA|
|SRQ|        Sarasota|   FL|    USA|
|VLD|        Valdosta|   GA|    USA|
|PSC|           Pasco|   WA|    USA|
|BPT|        Beaumont|   TX|    USA|
|CAE|        Columbia|   SC|    USA|
|LAX|     Los Angeles|   CA|    USA|
|DAY|          Dayton|   OH|    USA|
|AVP|    Wilkes-Barre|   PA|    USA|
|MFR|         Medford|   OR|    USA|
|JFK|        New York|   NY|    USA|
|LAS|       Las Vegas|   NV|    USA|
|BNA|       Nashville|   TN|    USA|
|CLT|       Charlotte|   NC|    USA|
+---+----------------+-----+-------+
only showing top 20 rows



In [32]:
# Build `tripGraph` GraphFrame
#  This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)
print (tripGraph)

# Build `tripGraphPrime` GraphFrame
#   This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)

GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])


In [33]:
print ("Airports: %d" % tripGraph.vertices.count())
print ("Trips: %d" % tripGraph.edges.count())

Airports: 279
Trips: 1361141


In [34]:
tripGraph.edges.groupBy().max("delay").show()

+----------+
|max(delay)|
+----------+
|      1642|
+----------+



In [35]:
longestDelay = tripGraph.edges.groupBy().max("delay")
longestDelay.show()

+----------+
|max(delay)|
+----------+
|      1642|
+----------+



In [36]:
# Determining number of on-time / early flights vs. delayed flights
print ("On-time / Early Flights: %d" % tripGraph.edges.filter("delay <= 0").count())
print ("Delayed Flights: %d" % tripGraph.edges.filter("delay > 0").count())

On-time / Early Flights: 780469
Delayed Flights: 580672


In [37]:
tripGraph.edges\
  .filter("src = 'SEA' and delay > 0")\
  .groupBy("src", "dst")\
  .avg("delay")\
  .sort(desc("avg(delay)"))\
  .show(5)

+---+---+------------------+
|src|dst|        avg(delay)|
+---+---+------------------+
|SEA|PHL|55.666666666666664|
|SEA|COS| 43.53846153846154|
|SEA|FAT| 43.03846153846154|
|SEA|LGB| 39.39705882352941|
|SEA|IAD|37.733333333333334|
+---+---+------------------+
only showing top 5 rows



In [38]:
tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)")).show()

+---+---+------------------+
|src|dst|        avg(delay)|
+---+---+------------------+
|SEA|PHL|55.666666666666664|
|SEA|COS| 43.53846153846154|
|SEA|FAT| 43.03846153846154|
|SEA|LGB| 39.39705882352941|
|SEA|IAD|37.733333333333334|
|SEA|MIA|37.325581395348834|
|SEA|SFO| 36.50210378681627|
|SEA|SBA| 36.48275862068966|
|SEA|JFK|          35.03125|
|SEA|ORD| 33.60335195530726|
|SEA|PDX| 32.74285714285714|
|SEA|BOS| 30.46031746031746|
|SEA|LAS|28.933333333333334|
|SEA|DEN|28.881294964028775|
|SEA|IAH|27.844444444444445|
|SEA|JAC|27.666666666666668|
|SEA|OGG|27.473684210526315|
|SEA|JNU|27.196969696969695|
|SEA|HNL|26.702290076335878|
|SEA|OAK|26.539473684210527|
+---+---+------------------+
only showing top 20 rows



In [40]:
# After displaying tripDelays, use Plot Options to set `state_dst` as a Key.
tripDelays = tripGraph.edges.filter("delay > 0")
tripDelays.show()

+-------+-----+---+---+--------+---------+
| tripid|delay|src|dst|city_dst|state_dst|
+-------+-----+---+---+--------+---------+
|1011245|    6|ABE|ATL| Atlanta|       GA|
|1041243|   10|ABE|ATL| Atlanta|       GA|
|1040605|   28|ABE|ATL| Atlanta|       GA|
|1051245|   88|ABE|ATL| Atlanta|       GA|
|1050605|    9|ABE|ATL| Atlanta|       GA|
|1061725|   69|ABE|ATL| Atlanta|       GA|
|1081230|   33|ABE|DTW| Detroit|       MI|
|1080625|    1|ABE|ATL| Atlanta|       GA|
|1080607|    5|ABE|ORD| Chicago|       IL|
|1081219|   54|ABE|ORD| Chicago|       IL|
|1091215|   43|ABE|ATL| Atlanta|       GA|
|1090600|  151|ABE|DTW| Detroit|       MI|
|1090625|    8|ABE|ATL| Atlanta|       GA|
|1091219|   83|ABE|ORD| Chicago|       IL|
|1101725|    7|ABE|ATL| Atlanta|       GA|
|1100625|   52|ABE|ATL| Atlanta|       GA|
|1111215|  127|ABE|ATL| Atlanta|       GA|
|1131215|   14|ABE|ATL| Atlanta|       GA|
|1130625|   29|ABE|ATL| Atlanta|       GA|
|1161219|   68|ABE|ORD| Chicago|       IL|
+-------+--

In [41]:
tripGraph.edges.filter("src = 'SEA' and delay > 100")

DataFrame[tripid: int, delay: int, src: string, dst: string, city_dst: string, state_dst: string]

## Vertex Degrees
* `inDegrees`: Incoming connections to the airport
* `outDegrees`: Outgoing connections from the airport 
* `degrees`: Total connections to and from the airport

Reviewing the various properties of the property graph to understand the incoming and outgoing connections between airports.

In [43]:
tripGraph.degrees.sort(desc("degree")).limit(20)
tripGraph.degrees.sort(desc("degree")).limit(20).show()

+---+------+
| id|degree|
+---+------+
|ATL|179774|
|DFW|133966|
|ORD|125405|
|LAX|106853|
|DEN|103699|
|IAH| 85685|
|PHX| 79672|
|SFO| 77635|
|LAS| 66101|
|CLT| 56103|
|EWR| 54407|
|MCO| 54300|
|LGA| 50927|
|SLC| 50780|
|BOS| 49936|
|DTW| 46705|
|MSP| 46235|
|SEA| 45816|
|JFK| 43661|
|BWI| 42526|
+---+------+



In [44]:
tripGraph.inDegrees.sort(desc("inDegree")).limit(20).show()

+---+--------+
| id|inDegree|
+---+--------+
|ATL|   89633|
|DFW|   65767|
|ORD|   61654|
|LAX|   53184|
|DEN|   50738|
|IAH|   42512|
|PHX|   39619|
|SFO|   38641|
|LAS|   32994|
|CLT|   28044|
|EWR|   27201|
|MCO|   27071|
|LGA|   25469|
|SLC|   25169|
|BOS|   24973|
|DTW|   23297|
|SEA|   22906|
|MSP|   22372|
|JFK|   21832|
|BWI|   21262|
+---+--------+



In [45]:
tripGraph.outDegrees.sort(desc("outDegree")).limit(20).show()

+---+---------+
| id|outDegree|
+---+---------+
|ATL|    90141|
|DFW|    68199|
|ORD|    63751|
|LAX|    53669|
|DEN|    52961|
|IAH|    43173|
|PHX|    40053|
|SFO|    38994|
|LAS|    33107|
|CLT|    28059|
|MCO|    27229|
|EWR|    27206|
|SLC|    25611|
|LGA|    25458|
|BOS|    24963|
|MSP|    23863|
|DTW|    23408|
|SEA|    22910|
|JFK|    21829|
|BWI|    21264|
+---+---------+



## City / Flight Relationships through Motif Finding
To more easily understand the complex relationship of city airports and their flights with each other, we can use motifs to find patterns of airports (i.e. vertices) connected by flights (i.e. edges). The result is a DataFrame in which the column names are given by the motif keys.

In [47]:
# Calculate the inDeg (flights into the airport) and outDeg (flights leaving the airport)
inDeg = tripGraph.inDegrees
outDeg = tripGraph.outDegrees

# Calculate the degreeRatio (inDeg/outDeg)
degreeRatio = inDeg.join(outDeg, inDeg.id == outDeg.id) \
  .drop(outDeg.id) \
  .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio") \
  .cache()

# Join back to the `airports` DataFrame (instead of registering temp table as above)
nonTransferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio < .9 or degreeRatio > 1.1")

# List out the city airports which have abnormal degree ratios.
nonTransferAirports.show()

+---+-----------+-------------------+
| id|       city|        degreeRatio|
+---+-----------+-------------------+
|GFK|Grand Forks| 1.3333333333333333|
|FAI|  Fairbanks| 1.1232686980609419|
|OME|       Nome| 0.5084745762711864|
|BRW|     Barrow|0.28651685393258425|
+---+-----------+-------------------+



In [49]:
# Join back to the `airports` DataFrame (instead of registering temp table as above)
transferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio between 0.9 and 1.1")
  
# List out the top 10 transfer city airports
transferAirports.orderBy("degreeRatio").limit(10).show()

+---+--------------+------------------+
| id|          city|       degreeRatio|
+---+--------------+------------------+
|MSP|   Minneapolis|0.9375183338222353|
|DEN|        Denver| 0.958025717037065|
|DFW|        Dallas| 0.964339653074092|
|ORD|       Chicago|0.9671063983310065|
|SLC|Salt Lake City|0.9827417906368358|
|IAH|       Houston|0.9846895050147083|
|PHX|       Phoenix|0.9891643572266746|
|OGG| Kahului, Maui|0.9898718478710211|
|HNL|Honolulu, Oahu| 0.990535889872173|
|SFO| San Francisco|0.9909473252295224|
+---+--------------+------------------+



In [50]:
# Example 1: Direct Seattle to San Francisco 
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SEA'",
  toExpr = "id = 'SFO'",
  maxPathLength = 1)
filteredPaths.show()

+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SEA,Seattle,WA,USA]|[1010710,31,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1012125,-4,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1011840,-5,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1010610,-4,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1011230,-2,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1010955,-6,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1011100,2,SEA,SF...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1011405,0,SEA,SF...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1020710,-1,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1022125,-4,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1021840,-5,SEA,S...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[1020610,3,SEA,SF...|[SFO,San Francisc...|
|[SEA,Seattle,WA,USA]|[10

In [51]:
# Example 2: Direct San Francisco and Buffalo
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 1)
filteredPaths.show()

+---+----+-----+-------+
| id|City|State|Country|
+---+----+-----+-------+
+---+----+-----+-------+



In [52]:
# Example 2a: Flying from San Francisco to Buffalo
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 2)
filteredPaths.show()

+--------------------+--------------------+-------------------+--------------------+--------------------+
|                from|                  e0|                 v1|                  e1|                  to|
+--------------------+--------------------+-------------------+--------------------+--------------------+
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1010635,-6,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1011059,13,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1011427,19,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1020635,-4,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1021059,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1021427,194,BOS,...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BO

In [54]:
filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count")).limit(10).show()

+---+---------------+-------+
| id|           City|  count|
+---+---------------+-------+
|JFK|       New York|1233728|
|ORD|        Chicago|1088283|
|ATL|        Atlanta| 285383|
|LAS|      Las Vegas| 275091|
|BOS|         Boston| 238576|
|CLT|      Charlotte| 143444|
|PHX|        Phoenix| 104580|
|FLL|Fort Lauderdale|  96317|
|EWR|         Newark|  95370|
|MCO|        Orlando|  88615|
+---+---------------+-------+



In [57]:
#Generate motifs
motifs = tripGraphPrime.find("(a)-[ab]->(b); (b)-[bc]->(c)").filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000")

In [59]:
motifs.show(1)

+--------------------+-------------------+--------------------+--------------------+--------------------+
|                   a|                 ab|                   b|                  bc|                   c|
+--------------------+-------------------+--------------------+--------------------+--------------------+
|[ABQ,Albuquerque,...|[1020600,0,ABQ,SFO]|[SFO,San Francisc...|[1021507,536,SFO,...|[JFK,New York,NY,...|
+--------------------+-------------------+--------------------+--------------------+--------------------+
only showing top 1 row



In [60]:
# Determining Airport ranking of importance using 'pageRank'
ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)

In [61]:
ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(20).show()

+---+--------------+-----+-------+------------------+
| id|          City|State|Country|          pagerank|
+---+--------------+-----+-------+------------------+
|ATL|       Atlanta|   GA|    USA|10.102340247485012|
|DFW|        Dallas|   TX|    USA| 7.252067259651102|
|ORD|       Chicago|   IL|    USA| 7.165214941662068|
|DEN|        Denver|   CO|    USA| 5.041255573485869|
|LAX|   Los Angeles|   CA|    USA| 4.178333397888139|
|IAH|       Houston|   TX|    USA| 4.008169343175302|
|SFO| San Francisco|   CA|    USA| 3.518595203652925|
|SLC|Salt Lake City|   UT|    USA|3.3564822581626763|
|PHX|       Phoenix|   AZ|    USA|3.0896771274953343|
|LAS|     Las Vegas|   NV|    USA| 2.437744837094217|
|SEA|       Seattle|   WA|    USA| 2.372392233277875|
|DTW|       Detroit|   MI|    USA|2.1688712347162338|
|MSP|   Minneapolis|   MN|    USA|2.1574735230729862|
|MCO|       Orlando|   FL|    USA|  2.10982981314059|
|EWR|        Newark|   NJ|    USA|2.0700271952450677|
|CLT|     Charlotte|   NC|  