In [1]:
# Load data
airports = spark.read.option("Header",True).option("InferSchema", True).csv("airports.csv")
trips_usa_nov2017 = spark.read.option("Header",True).option("InferSchema", True).csv("216420986_T_ONTIME.csv")
trips_usa_nov2017 = trips_usa_nov2017.sample(False,0.001,seed=42)

In [2]:
# Solo necesitamos los de USA
airports = airports.filter(airports["country"] == "United States")

airports.printSchema()
airports.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- altitude: integer (nullable = true)
 |-- timezone: string (nullable = true)
 |-- DST: string (nullable = true)
 |-- tz: string (nullable = true)
 |-- type: string (nullable = true)
 |-- source: string (nullable = true)

+----+--------------------+-------------+-------------+----+----+------------------+-------------------+--------+--------+---+-----------------+-------+-----------+
|  id|                name|         city|      country|IATA|ICAO|          latitude|          longitude|altitude|timezone|DST|               tz|   type|     source|
+----+--------------------+-------------+-------------+----+----+------------------+-------------------+--------+--------+---+---------------

In [3]:
trips_usa_nov2017.printSchema()
trips_usa_nov2017 = trips_usa_nov2017.drop("_c11")
trips_usa_nov2017.show(5)

root
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- ORIGIN_STATE_NM: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- DEST_STATE_NM: string (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- _c11: string (nullable = true)

+------+----------------+----------------+---------------+----+--------------+--------------+-------------+---------+---------+---------+
|ORIGIN|ORIGIN_CITY_NAME|ORIGIN_STATE_ABR|ORIGIN_STATE_NM|DEST|DEST_CITY_NAME|DEST_STATE_ABR|DEST_STATE_NM|DEP_DELAY|ARR_DELAY|CANCELLED|
+------+----------------+----------------+---------------+----+--------------+--------------+-------------+---------+---------+---------+
|   ABQ| Albuquerque, NM|              NM|     New Mexico| PHX|   P

In [3]:
from graphframes import *
from pyspark.sql.functions import monotonically_increasing_id

tripVertices = airports.drop("id").withColumnRenamed("IATA", "id").distinct()
tripEdges = trips_usa_nov2017.select("DEP_DELAY", "ORIGIN", "DEST", "DEST_CITY_NAME", "DEST_STATE_NM")\
    .withColumnRenamed("ORIGIN","src").withColumnRenamed("DEST","dst").withColumn('tripid', monotonically_increasing_id())
    
## Add and id for trip

tripVertices.show(5)
tripEdges.show(5)
print(tripEdges.count())
# Generamos el grafo
tripGraph = GraphFrame(tripVertices, tripEdges)

+--------------------+-----------+-------------+---+----+------------------+------------------+--------+--------+---+-------------------+-------+-----------+
|                name|       city|      country| id|ICAO|          latitude|         longitude|altitude|timezone|DST|                 tz|   type|     source|
+--------------------+-----------+-------------+---+----+------------------+------------------+--------+--------+---+-------------------+-------+-----------+
|  Dillingham Airport| Dillingham|United States|DLG|PADL|       59.04470062|      -158.5050049|      81|      -9|  A|  America/Anchorage|airport|OurAirports|
|    Bowerman Airport|    Hoquiam|United States|HQM|KHQM|46.971199035599994|     -123.93699646|      18|      -8|  A|America/Los_Angeles|airport|OurAirports|
|Chippewa Valley R...| Eau Claire|United States|EAU|KEAU| 44.86579895019531|-91.48429870605469|     913|      -6|  A|    America/Chicago|airport|OurAirports|
|Santa Maria Pub/C...|Santa Maria|United States|SMX|

In [4]:
# Operaciones (queries) simples
print( "Airports: %d" % tripGraph.vertices.count() )
print( "Trips: %d" % tripGraph.edges.count() )

Airports: 1435
Trips: 459


In [5]:
# Que viajes tienen un retardo significativo?
tripGraph.edges.groupby("src","dst").avg("DEP_DELAY").sort("avg(DEP_DELAY)",ascending=False).show()

+---+---+--------------+
|src|dst|avg(DEP_DELAY)|
+---+---+--------------+
|IMT|DTW|        1144.0|
|SJC|DFW|         400.0|
|LAX|IAH|         346.0|
|FCA|SLC|         302.0|
|TUL|PHX|         271.0|
|LAX|IAD|         263.0|
|ESC|DTW|         247.0|
|SMF|SFO|         219.0|
|ORD|SFO|         133.5|
|SJC|LAS|         115.0|
|RSW|EWR|         102.0|
|SLC|LGB|          99.0|
|MIA|DFW|          90.0|
|SEA|ANC|          81.0|
|LGA|CLT|          76.0|
|MDW|PHX|          71.0|
|BWI|PVD|          67.0|
|SFO|LAX|          65.0|
|MCO|MIA|          58.5|
|PDX|SFO|          54.0|
+---+---+--------------+
only showing top 20 rows



In [6]:
# Aeropuertos en New York
tripGraph.vertices.filter(tripGraph.vertices["city"] == "New York").count()

4

In [7]:
tripGraph.degrees.sort("degree", ascending=False).show()

+---+------+
| id|degree|
+---+------+
|ATL|    66|
|ORD|    41|
|LAX|    36|
|DEN|    35|
|PHX|    29|
|MSP|    27|
|SFO|    27|
|LAS|    26|
|BWI|    26|
|DFW|    24|
|MCO|    21|
|CLT|    20|
|IAH|    19|
|SLC|    19|
|BOS|    19|
|FLL|    18|
|LGA|    18|
|EWR|    17|
|SEA|    17|
|MDW|    14|
+---+------+
only showing top 20 rows



In [8]:
tripGraph.inDegrees.sort("inDegree", ascending=False).show()
tripGraph.outDegrees.sort("outDegree", ascending=False).show()

+---+--------+
| id|inDegree|
+---+--------+
|ATL|      34|
|DEN|      25|
|LAX|      19|
|ORD|      16|
|SFO|      14|
|MSP|      14|
|BOS|      13|
|PHX|      13|
|BWI|      12|
|LAS|      11|
|MCO|      11|
|DFW|      10|
|DTW|      10|
|SEA|       9|
|PHL|       9|
|FLL|       9|
|EWR|       8|
|IAH|       8|
|MIA|       7|
|LGA|       7|
+---+--------+
only showing top 20 rows

+---+---------+
| id|outDegree|
+---+---------+
|ATL|       32|
|ORD|       25|
|LAX|       17|
|PHX|       16|
|LAS|       15|
|CLT|       15|
|BWI|       14|
|DFW|       14|
|MSP|       13|
|SFO|       13|
|SLC|       13|
|LGA|       11|
|IAH|       11|
|DEN|       10|
|MCO|       10|
|MDW|       10|
|EWR|        9|
|DAL|        9|
|FLL|        9|
|SEA|        8|
+---+---------+
only showing top 20 rows



In [9]:
results = tripGraph.pageRank(maxIter=2)

In [12]:
results.vertices.select("id", "pagerank").sort("pagerank", ascending=False).show()
results.edges.select("src", "dst", "weight").show()

+---+------------------+
| id|          pagerank|
+---+------------------+
|ATL|25.180545233641052|
|LAX|22.677174030246405|
|DEN|21.745270490884046|
|ORD|15.864421756758215|
|SFO|13.353888981362047|
|LAS| 12.49171007742848|
|PHX|12.356446930768762|
|EWR| 11.14766171733489|
|BWI|11.035565166241234|
|MSP|10.865477290048789|
|DFW| 10.53879138901022|
|MCO| 9.947138271774534|
|BOS| 9.583145309192634|
|PHL| 8.968718290593522|
|TPA|  8.93298720182507|
|BET| 8.482824062342004|
|IAH|7.7893737513139785|
|FLL| 7.270464088891417|
|IAD| 6.854605024280949|
|RDU|  6.81861194971853|
+---+------------------+
only showing top 20 rows

+---+---+--------------------+
|src|dst|              weight|
+---+---+--------------------+
|SNA|EWR|                 1.0|
|FCA|SLC|                 1.0|
|DFW|BOS| 0.07142857142857142|
|SFO|IAD| 0.07692307692307693|
|LAX|MCO|0.058823529411764705|
|MSP|IAH| 0.07692307692307693|
|MSP|BOS| 0.07692307692307693|
|MCO|ALB|                 0.1|
|CLT|PHX| 0.06666666666666667|
|P

In [13]:
# Motif
# Buscamos aeropuertos con conexiones de vuelos directos entre ellos 
motifs = tripGraph.find("(a)-[e1]->(b); (b)-[e2]->(a)")
motifs.show()

# De estos viajes, miramos cuales de ellos tienen vuelos directos con mas de una hora de retraso (desde origen)
motifs.filter("e1.DEP_DELAY > 60").show()

+--------------------+--------------------+--------------------+--------------------+
|                   a|                  e1|                   b|                  e2|
+--------------------+--------------------+--------------------+--------------------+
|[Phoenix Sky Harb...|[19.0, PHX, DEN, ...|[Denver Internati...|[2.0, DEN, PHX, P...|
|[Phoenix Sky Harb...|[-8.0, PHX, SAF, ...|[Santa Fe Municip...|[-6.0, SAF, PHX, ...|
|[Los Angeles Inte...|[346.0, LAX, IAH,...|[George Bush Inte...|[-5.0, IAH, LAX, ...|
|[Los Angeles Inte...|[19.0, LAX, SLC, ...|[Salt Lake City I...|[-4.0, SLC, LAX, ...|
|[Los Angeles Inte...|[-1.0, LAX, DEN, ...|[Denver Internati...|[-4.0, DEN, LAX, ...|
|[Los Angeles Inte...|[-1.0, LAX, MCO, ...|[Orlando Internat...|[-4.0, MCO, LAX, ...|
|[Los Angeles Inte...|[-5.0, LAX, PDX, ...|[Portland Interna...|[8.0, PDX, LAX, L...|
|[Los Angeles Inte...|[-5.0, LAX, PDX, ...|[Portland Interna...|[-2.0, PDX, LAX, ...|
|[Los Angeles Inte...|[3.0, LAX, SEA, S...|[Seattle Ta

In [10]:
# Shortest paths
results = tripGraph.shortestPaths(landmarks=["JFK", "LAX"])
results.select("id", "distances").show()

+---+--------------------+
| id|           distances|
+---+--------------------+
|RAC|                  []|
|MLJ|                  []|
|OBU|                  []|
|PTB|                  []|
|LUP|                  []|
|HNH|                  []|
|IND|[LAX -> 2, JFK -> 2]|
|BTM|                  []|
|ORF|                  []|
|FDY|                  []|
|ATW|                  []|
|DLF|                  []|
|GRK|                  []|
|ALO|                  []|
|HTS|                  []|
|MRF|                  []|
|GFL|                  []|
|POU|                  []|
|DRG|                  []|
|MKT|                  []|
+---+--------------------+
only showing top 20 rows

