## Flight Data Analysis using Spark GraphFrames

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('GraphFlightsAnalysis').getOrCreate()


#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1573842681666'),
 ('spark.driver.memory', '4g'),
 ('spark.executor.memory', '4g'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '50706'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.driver.host', 'DESKTOP-RV4I2EU'),
 ('spark.app.name', 'Spark Updated Conf'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [3]:
dataDir = "."
airports = spark.read.json(dataDir+"//airports1.json", multiLine=True)
airports.show(5)

+----------+-----------+-------------+----+---+--------+----+------------+-----------+--------------------+--------+-----------------+
|Airport ID|       City|      Country|DBTZ|DST|IATA/FAA|ICAO|    Latitude|  Longitude|                Name|Timezone|     destinations|
+----------+-----------+-------------+----+---+--------+----+------------+-----------+--------------------+--------+-----------------+
|      7252|  Postville|       Canada|   A| -4|     YSO|CCD4|   54.910278| -59.785278|   Postville Airport|     223|[5492, 188, 5502]|
|      6972|      Osubi|      Nigeria|   U|  1|     QRW|DNSU|        5.31|       5.45|       Warri Airport|      50|       [260, 273]|
|      4026|   Rockland|United States|   A| -5|     RKD|KRKD|  44.0601111|-69.0992303|Knox County Regio...|      56|           [3448]|
|      4027|Jacksn Hole|United States|   A| -7|     JAC|KJAC|43.607333333| -110.73775|Jackson Hole Airport|    6451|     [3536, 3751]|
|      4024|     Flores|    Guatemala|   U| -6|     FRS

In [4]:
airports = airports.select("City", "Country", "Name", "IATA/FAA")
airports.show(5)

+-----------+-------------+--------------------+--------+
|       City|      Country|                Name|IATA/FAA|
+-----------+-------------+--------------------+--------+
|  Postville|       Canada|   Postville Airport|     YSO|
|      Osubi|      Nigeria|       Warri Airport|     QRW|
|   Rockland|United States|Knox County Regio...|     RKD|
|Jacksn Hole|United States|Jackson Hole Airport|     JAC|
|     Flores|    Guatemala|Mundo Maya Intern...|     FRS|
+-----------+-------------+--------------------+--------+
only showing top 5 rows



In [5]:
flights = spark.read.csv(dataDir+"//flights.csv", header=True, inferSchema=True)
flights.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

In [6]:
flights = flights.select("FlightNum", "Origin", "Dest","Distance", "Year" ,"Month", "DepTime", "ArrTime", "DepDelay", "ActualElapsedTime", "AirTime")
flights.show(5)

+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+
|FlightNum|Origin|Dest|Distance|Year|Month|DepTime|ArrTime|DepDelay|ActualElapsedTime|AirTime|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+
|      335|   IAD| TPA|     810|2008|    1|   2003|   2211|       8|              128|    116|
|     3231|   IAD| TPA|     810|2008|    1|    754|   1002|      19|              128|    113|
|      448|   IND| BWI|     515|2008|    1|    628|    804|       8|               96|     76|
|     1746|   IND| BWI|     515|2008|    1|    926|   1054|      -4|               88|     78|
|     3920|   IND| BWI|     515|2008|    1|   1829|   1959|      34|               90|     77|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+
only showing top 5 rows



In [7]:
flights.printSchema()

root
 |-- FlightNum: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)



In [8]:
from pyspark.sql.types import IntegerType
flights = flights.withColumn("DepDelay", flights.DepDelay.cast(IntegerType()))

Lets create a graph with airports as vertices and flights as edges

**Vertex DataFrame:**

We set a special column named “id” which specifies unique IDs for each vertex in the graph.

**Edge DataFrame:** 

We set id columns: “src” (source vertex ID of edge) and “dst” (destination vertex ID of edge).

In [9]:
airports = airports.withColumn("id",airports["IATA/FAA"])

flights = flights.withColumn("src", flights["Origin"])
flights = flights.withColumn("dst", flights["Dest"])

In [10]:
from graphframes import *

In [11]:
#build Graph - GraphFrame(vertices, edges)
graph = GraphFrame(airports, flights)

In [12]:
graph.vertices.show(5)

+-----------+-------------+--------------------+--------+---+
|       City|      Country|                Name|IATA/FAA| id|
+-----------+-------------+--------------------+--------+---+
|  Postville|       Canada|   Postville Airport|     YSO|YSO|
|      Osubi|      Nigeria|       Warri Airport|     QRW|QRW|
|   Rockland|United States|Knox County Regio...|     RKD|RKD|
|Jacksn Hole|United States|Jackson Hole Airport|     JAC|JAC|
|     Flores|    Guatemala|Mundo Maya Intern...|     FRS|FRS|
+-----------+-------------+--------------------+--------+---+
only showing top 5 rows



In [13]:
graph.edges.show(5)

+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|FlightNum|Origin|Dest|Distance|Year|Month|DepTime|ArrTime|DepDelay|ActualElapsedTime|AirTime|src|dst|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|      335|   IAD| TPA|     810|2008|    1|   2003|   2211|       8|              128|    116|IAD|TPA|
|     3231|   IAD| TPA|     810|2008|    1|    754|   1002|      19|              128|    113|IAD|TPA|
|      448|   IND| BWI|     515|2008|    1|    628|    804|       8|               96|     76|IND|BWI|
|     1746|   IND| BWI|     515|2008|    1|    926|   1054|      -4|               88|     78|IND|BWI|
|     3920|   IND| BWI|     515|2008|    1|   1829|   1959|      34|               90|     77|IND|BWI|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
only showing top 5 rows



In [14]:
#How many airports?
graph.vertices.count()

3266

In [15]:
#How many flights?
graph.edges.count()

100000

In [16]:
#Most connected airports
graph.degrees.orderBy("degree", ascending=False).show(10)

+---+------+
| id|degree|
+---+------+
|LAS| 13551|
|MDW| 12587|
|PHX| 11081|
|BWI|  9269|
|OAK|  7865|
|HOU|  7723|
|DAL|  7119|
|LAX|  6751|
|SAN|  6671|
|MCO|  6568|
+---+------+
only showing top 10 rows



In [17]:
#filter all airports in Chicago
graph.vertices.filter("City='Chicago'").show()

+-------+-------------+-------------------+--------+---+
|   City|      Country|               Name|IATA/FAA| id|
+-------+-------------+-------------------+--------+---+
|Chicago|United States| Chicago Ohare Intl|     ORD|ORD|
|Chicago|United States|Chicago Midway Intl|     MDW|MDW|
+-------+-------------+-------------------+--------+---+



In [18]:
#filter all flights from Chicago O'Hare
graph.edges.filter("Origin='ORD'").show()

+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|FlightNum|Origin|Dest|Distance|Year|Month|DepTime|ArrTime|DepDelay|ActualElapsedTime|AirTime|src|dst|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|     1226|   ORD| EWR|     719|2008|    1|   1711|   2031|      71|              140|    106|ORD|EWR|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+



In [19]:
#flights traveling distance beyond 2000 miles, drop duplicate origins and destinations
graph.edges.filter("Distance > 2000").dropDuplicates(['Origin','Dest']).orderBy('Distance',ascending=False).show()

+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|FlightNum|Origin|Dest|Distance|Year|Month|DepTime|ArrTime|DepDelay|ActualElapsedTime|AirTime|src|dst|
+---------+------+----+--------+----+-----+-------+-------+--------+-----------------+-------+---+---+
|     3423|   PVD| LAS|    2363|2008|    1|    747|   1019|      17|              332|    318|PVD|LAS|
|      416|   LAS| PVD|    2363|2008|    1|   1630|     23|      30|              293|    266|LAS|PVD|
|     2719|   MHT| LAS|    2356|2008|    1|    754|   1028|      69|              334|    320|MHT|LAS|
|      767|   LAS| MHT|    2356|2008|    1|   1703|     33|      18|              270|    257|LAS|MHT|
|     3655|   BDL| LAS|    2298|2008|    1|    706|    947|       6|              341|    325|BDL|LAS|
|     1018|   LAS| BDL|    2298|2008|    1|   1739|    114|      59|              275|    253|LAS|BDL|
|      257|   SAN| BWI|    2295|2008|    1|    714|   1451|      -1|     

In [20]:
#longest distance routes
graph.edges.groupBy("src", "dst").max("Distance").orderBy("max(Distance)", ascending =False).show()

+---+---+-------------+
|src|dst|max(Distance)|
+---+---+-------------+
|PVD|LAS|         2363|
|LAS|PVD|         2363|
|MHT|LAS|         2356|
|LAS|MHT|         2356|
|BDL|LAS|         2298|
|LAS|BDL|         2298|
|SAN|BWI|         2295|
|BWI|SAN|         2295|
|LAS|ISP|         2283|
|ISP|LAS|         2283|
|PHX|MHT|         2279|
|MHT|PHX|         2279|
|PHX|PVD|         2277|
|PVD|PHX|         2277|
|LAS|ALB|         2237|
|ALB|LAS|         2237|
|PHL|LAS|         2176|
|LAS|PHL|         2176|
|LAS|ORF|         2155|
|ORF|LAS|         2155|
+---+---+-------------+
only showing top 20 rows



In [21]:
# Most frequent flight routes 
flightRouteCount = graph.edges.groupBy("src", "dst").count().orderBy("count", ascending = False)

flightRouteCount.show(10)

+---+---+-----+
|src|dst|count|
+---+---+-----+
|HOU|DAL|  762|
|DAL|HOU|  752|
|OAK|LAX|  562|
|LAX|OAK|  554|
|LAS|PHX|  516|
|PHX|LAS|  512|
|SAN|OAK|  496|
|OAK|SAN|  491|
|OAK|BUR|  466|
|BUR|OAK|  449|
+---+---+-----+
only showing top 10 rows



In [22]:
#What is the average delay for delayed flights departing from Chicago Midway?
graph.edges.filter("src = 'MDW' and DepDelay > 1").groupBy("src", "dst").avg("DepDelay").orderBy("avg(DepDelay)", ascending=False).show(5)

+---+---+------------------+
|src|dst|     avg(DepDelay)|
+---+---+------------------+
|MDW|SFO| 78.71698113207547|
|MDW|AUS|55.391304347826086|
|MDW|BHM|47.851851851851855|
|MDW|BDL| 45.88095238095238|
|MDW|SAT|              44.5|
+---+---+------------------+
only showing top 5 rows



### Triangle Count 

Compute the number of triangles passing through each vertex.

In [23]:
#Computes the number of triangles passing through each vertex.
graph.triangleCount().orderBy("count", ascending=False).show()

+-----+--------------+-------------+--------------------+--------+---+
|count|          City|      Country|                Name|IATA/FAA| id|
+-----+--------------+-------------+--------------------+--------+---+
|  358|     Las Vegas|United States|      Mc Carran Intl|     LAS|LAS|
|  300|       Chicago|United States| Chicago Midway Intl|     MDW|MDW|
|  293|       Phoenix|United States|Phoenix Sky Harbo...|     PHX|PHX|
|  230|     Baltimore|United States|Baltimore Washing...|     BWI|BWI|
|  198|   Albuquerque|United States|Albuquerque Inter...|     ABQ|ABQ|
|  188|   Kansas City|United States|    Kansas City Intl|     MCI|MCI|
|  173|       Orlando|United States|        Orlando Intl|     MCO|MCO|
|  172|       Houston|United States|     William P Hobby|     HOU|HOU|
|  172|     Nashville|United States|      Nashville Intl|     BNA|BNA|
|  167|         Tampa|United States|          Tampa Intl|     TPA|TPA|
|  156|        Austin|United States|Austin Bergstrom ...|     AUS|AUS|
|  152

### PageRank

Measure the importance of each vertex in a graph.

In [24]:
#Run PageRank until convergence to tolerance "tol".
results = graph.pageRank(resetProbability=0.15, tol= 0.01)
results.vertices.orderBy('pagerank',ascending=False).show()

+------------+-------------+--------------------+--------+---+------------------+
|        City|      Country|                Name|IATA/FAA| id|          pagerank|
+------------+-------------+--------------------+--------+---+------------------+
|     Chicago|United States| Chicago Midway Intl|     MDW|MDW|23.553984004332065|
|   Las Vegas|United States|      Mc Carran Intl|     LAS|LAS|22.668728471777207|
|   Baltimore|United States|Baltimore Washing...|     BWI|BWI| 18.79579410407541|
|     Phoenix|United States|Phoenix Sky Harbo...|     PHX|PHX|18.441418076081263|
|     Houston|United States|     William P Hobby|     HOU|HOU| 14.73748376108117|
|      Dallas|United States|     Dallas Love Fld|     DAL|DAL|14.072635152069376|
|     Orlando|United States|        Orlando Intl|     MCO|MCO|13.078088528975796|
|     Oakland|United States|Metropolitan Oakl...|     OAK|OAK|12.782949985589404|
|   San Diego|United States|      San Diego Intl|     SAN|SAN|12.172420830031903|
| Los Angeles|Un