In [5]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [6]:
# Set File Paths
tripdelaysFilePath = "/mnt/asaha/Spark/learningPySpark-master/Data/departuredelays.csv"
airportsnaFilePath = "/mnt/asaha/Spark/learningPySpark-master/Data/airport-codes-na.txt"

# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports_na")

# Obtain departure Delays data
departureDelays = spark.read.csv(tripdelaysFilePath, header='true')
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()

# Available IATA codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

DataFrame[IATA: string, City: string, State: string, Country: string]

In [9]:
departureDelays.count()

1391578

In [7]:
# Build `departureDelays_geo` DataFrame
#  Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

# Count
departureDelays_geo.count()

1361141

In [11]:
# Review the top 10 rows of the `departureDelays_geo` DataFrame
departureDelays_geo.show(10)

+-------+-------------------+-----+--------+---+---+-----------+-------------------+---------+---------+
| tripid|          localdate|delay|distance|src|dst|   city_src|           city_dst|state_src|state_dst|
+-------+-------------------+-----+--------+---+---+-----------+-------------------+---------+---------+
|1011111|2014-01-01 11:11:00|   -5|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1021111|2014-01-02 11:11:00|    7|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1031111|2014-01-03 11:11:00|    0|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1041925|2014-01-04 19:25:00|    0|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1061115|2014-01-06 11:15:00|   33|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1071115|2014-01-07 11:15:00|   23|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1081115|2014-01-08 11:15:00|   -9|     221|MSP|INL|Min

In [12]:
# Using `display` to view the data
display(departureDelays_geo)

DataFrame[tripid: int, localdate: timestamp, delay: int, distance: int, src: string, dst: string, city_src: string, city_dst: string, state_src: string, state_dst: string]

## Building the Graph
Now that we've imported our data, we're going to need to build our graph. To do so we're going to do two things: we are going to build the structure of the vertices (or nodes) and we're going to build the structure of the edges. What's awesome about GraphFrames is that this process is incredibly simple. 
* Rename IATA airport code to **id** in the Vertices Table
* Start and End airports to **src** and **dst** for the Edges Table (flights)

In [8]:
# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

DataFrame[id: string, City: string, State: string, Country: string]

In [14]:
# Vertices
# The vertices of our graph are the airports
display(tripVertices)

DataFrame[id: string, City: string, State: string, Country: string]

In [15]:
# Edges
# The edges of our graph are the flights between airports
display(tripEdges)

DataFrame[tripid: int, delay: int, src: string, dst: string, city_dst: string, state_dst: string]

In [22]:
# Note, ensure you have already installed the GraphFrames spack-package

spark.sparkContext.addPyFile('/mnt/asaha/Spark/spark-2.3.2-bin-hadoop2.7/jars/graphframes_graphframes-0.6.0-spark2.3-s_2.11.jar')
from graphframes import *

# Build `tripGraph` GraphFrame
# This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)
print (tripGraph)

# Build `tripGraphPrime` GraphFrame
# This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)
print (tripGraphPrime)

GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])
GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 2 more fields])


## Simple Queries
Let's start with a set of simple graph queries to understand flight performance and departure delays

#### Determine the number of airports and trips

In [15]:
print ("Airports: %d" % tripGraph.vertices.count())
print ("Trips: %d" % tripGraph.edges.count())

Airports: 279
Trips: 1361141


#### Determining the longest delay in this dataset

In [16]:
tripGraph.edges.groupBy().max("delay").show()

+----------+
|max(delay)|
+----------+
|      1642|
+----------+



In [18]:
# Finding the longest Delay
longestDelay = tripGraph.edges.groupBy().max("delay")
display(longestDelay)

DataFrame[max(delay): int]

#### Determining the number of delayed vs. on-time / early flights

In [19]:
# Determining number of on-time / early flights vs. delayed flights
print ("On-time / Early Flights: %d" % tripGraph.edges.filter("delay <= 0").count())
print ("Delayed Flights: %d" % tripGraph.edges.filter("delay > 0").count())

On-time / Early Flights: 780469
Delayed Flights: 580672


#### What flights departing SEA are most likely to have significant delays
Note, delay can be <= 0 meaning the flight left on time or early

In [20]:
tripGraph.edges\
  .filter("src = 'SEA' and delay > 0")\
  .groupBy("src", "dst")\
  .avg("delay")\
  .sort(desc("avg(delay)"))\
  .show(5)

+---+---+------------------+
|src|dst|        avg(delay)|
+---+---+------------------+
|SEA|PHL|55.666666666666664|
|SEA|COS| 43.53846153846154|
|SEA|FAT| 43.03846153846154|
|SEA|LGB| 39.39705882352941|
|SEA|IAD|37.733333333333334|
+---+---+------------------+
only showing top 5 rows



In [21]:
display(tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)")))

DataFrame[src: string, dst: string, avg(delay): double]