# Spark GraphX Analysis of US Flight Data

GraphX is a component of Spark for graph analytics that takes advantage of Spark's distributed computing framework. GraphX can be used to effectively model and analyze relationships and processes in a wide variety of applications.  GraphX introduces a Graph abstraction to Spark - a directed multigraph with properties attached to each vertex and edge. GraphX supplies a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. GraphX also includes graph algorithms to simplify graph analytics tasks.

In this notebook, we will demonstrate the capabilities of GraphX to explore and analyze airline data. Airline data is a natural fit for graph analytics. Data in CSV format is readily available on the US Bureau of Transportation (BTS) website (http://www.rita.dot.gov/bts/home). This demo employs US flight data for March 2016. Data required to run this notebook is hosted at https://ibm.box.com/shared/static/rfhew36plv01cixjweq1mrdrpib4fbcx.csv. The data is downloaded within the notebook code itself, so there is no need to manually download the dataset.
  
  

![GraphX](https://spark.apache.org/docs/latest/img/graphx_logo.png)

## Import required libraries

In [1]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.Row;
import sys.process._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators

## Download data as CSV file
#### Data from US Bureau of Transportation Statistics (BTS) for March 2016
#### Data required to run this notebook is hosted at https://ibm.box.com/shared/static/rfhew36plv01cixjweq1mrdrpib4fbcx.csv. Here we download the dataset to local storage.

In [2]:
"rm -f airport_Mar_2016.csv".!
"wget -q -O airport_Mar_2016.csv https://ibm.box.com/shared/static/rfhew36plv01cixjweq1mrdrpib4fbcx.csv".!

0

## Read the CSV data file and clean the data
#### The data is in CSV format and contains a header row.<br>Notice that the DataFrame contains a blank column. This is due to the fact that the CSV file contains a trailing comma at the end of each row.<br>Drop the blank column and any rows that contain nulls

In [3]:
val airport_df_raw = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("airport_Mar_2016.csv")
val airport_df = airport_df_raw.drop("").na.drop
airport_df_raw.show(10)

+-----------------+------+--------------------+---------------+----+--------------+-------------------+--------+---+
|ORIGIN_AIRPORT_ID|ORIGIN|    ORIGIN_CITY_NAME|DEST_AIRPORT_ID|DEST|DEST_CITY_NAME|ACTUAL_ELAPSED_TIME|DISTANCE|   |
+-----------------+------+--------------------+---------------+----+--------------+-------------------+--------+---+
|            10135|   ABE|Allentown/Bethleh...|          10397| ATL|   Atlanta, GA|              120.0|   692.0|   |
|            10135|   ABE|Allentown/Bethleh...|          10397| ATL|   Atlanta, GA|              128.0|   692.0|   |
|            10135|   ABE|Allentown/Bethleh...|          10397| ATL|   Atlanta, GA|              131.0|   692.0|   |
|            10135|   ABE|Allentown/Bethleh...|          10397| ATL|   Atlanta, GA|              127.0|   692.0|   |
|            10135|   ABE|Allentown/Bethleh...|          10397| ATL|   Atlanta, GA|              164.0|   692.0|   |
|            10135|   ABE|Allentown/Bethleh...|          10397| 

## Convert the DataFrame to an RDD
#### To accomplish this we will use a custom case class rather than the generic Row object.
GraphX is based on RDDs, which is why we must decompose the DataFrame into an RDD.


In [4]:
case class flight(origin_id: Int, 
                  origin: String, 
                  origin_city: String, 
                  dest_id: Int, 
                  dest: String, 
                  dest_city: String, 
                  elapsed_time: Double, 
                  distance: Double)
val flights= airport_df.map {case Row(origin_id: Int, 
                                      origin: String, 
                                      origin_city: String, 
                                      dest_id: Int, 
                                      dest: String, 
                                      dest_city: String, 
                                      elapsed_time: Double, 
                                      distance: Double) => flight(origin_id = origin_id, 
                                                                  origin = origin, 
                                                                  origin_city = origin_city, 
                                                                  dest_id = dest_id, 
                                                                  dest = dest, 
                                                                  dest_city = dest_city, 
                                                                  elapsed_time = elapsed_time, 
                                                                  distance = distance)}
flights.take(5).foreach(println)

flight(10135,ABE,Allentown/Bethlehem/Easton, PA,10397,ATL,Atlanta, GA,120.0,692.0)
flight(10135,ABE,Allentown/Bethlehem/Easton, PA,10397,ATL,Atlanta, GA,128.0,692.0)
flight(10135,ABE,Allentown/Bethlehem/Easton, PA,10397,ATL,Atlanta, GA,131.0,692.0)
flight(10135,ABE,Allentown/Bethlehem/Easton, PA,10397,ATL,Atlanta, GA,127.0,692.0)
flight(10135,ABE,Allentown/Bethlehem/Easton, PA,10397,ATL,Atlanta, GA,164.0,692.0)


## Extract airport ID and airport codes, which will be used to create the graph vertices
#### Each vertex is keyed by a unique 64-bit long identifier (VertexID), so the airport identifier must be converted to Long. The airport code is used as the vertex property (VertexProperty). VertexID can occur only once in the VertexRDD.

In [5]:
val airportVertices = flights.map(x => (x.origin_id.toLong, x.origin)).distinct()
airportVertices.take(5).foreach(println)
println("The number of airports used in the analysis = " + airportVertices.count())

(11013,CIU)
(14057,PDX)
(12177,HOB)
(14262,PSP)
(10529,BDL)
The number of airports used in the analysis = 296


## Extract origin airport id, destination airport id and distance between airports for use as graph edges
#### Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property. srcid and dstid must be Long, so they are converted during the mapping.

In [8]:
val tripEdges = flights.map(x => Edge(x.origin_id, x.dest_id, x.distance))
tripEdges.takeSample(false, 5).foreach(println)
println("The number of trips used in the analysis = " + tripEdges.count())

Edge(13830,14771,2338.0)
Edge(14635,10397,515.0)
Edge(14100,13930,678.0)
Edge(13204,11278,759.0)
Edge(11618,12339,645.0)


## Create the graph
#### The Graph class represents a graph with arbitrary objects associated with vertices and edges. The graph provides basic operations to access and manipulate the data associated with vertices and edges as well as the underlying structure.

In [9]:
val graph = Graph(airportVertices, tripEdges)

## Show the vertices of the graph
#### We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

In [12]:
graph.vertices.take(5).foreach(println)
println("Number of airports = " + graph.vertices.count())
println("Number of airports = " + graph.numVertices)

(10208,AGS)
(12264,IAD)
(15024,STT)
(12278,ICT)
(11292,DEN)
Number of airports = 296
Number of airports = 296


## Show the edges of the graph, which represent flights
#### using the edge member of the graph class

In [13]:
graph.edges.take(5).foreach(println)
println("Number of trips = " + graph.edges.count())
println("Number of trips = " + graph.numEdges)

Edge(10135,10397,692.0)
Edge(10135,10397,692.0)
Edge(10135,10397,692.0)
Edge(10135,10397,692.0)
Edge(10135,10397,692.0)
Number of trips = 473286
Number of trips = 473286


## Determine the number of flights greater than 4000 miles
### using the Edge class attr member which stores the edge property

In [14]:
val distanceInput = 4000
val number_trips_over_option2 = graph.edges.filter (e => e.attr > distanceInput).count
println("Number of trips over " + distanceInput + " miles = " + number_trips_over_option2)

Number of trips over 4000 miles = 287


## In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view
<img src="https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/GraphX/triplet.png" height="70%" width="70%"></img>
#### The triplet view logically joins the vertex and edge properties yielding an EdgeTriplet object containing instances of the EdgeTriplet class.

In [15]:
graph.triplets.takeSample(false, 5).foreach(println)

((10721,BOS),(15304,TPA),1185.0)
((15024,STT),(12478,JFK),1623.0)
((13930,ORD),(14057,PDX),1739.0)
((12892,LAX),(13204,MCO),2218.0)
((14771,SFO),(14057,PDX),550.0)


## Show graph triplets for flights over 4000 miles
#### Not surprisingly all the resulting trips represent flights to/from Hawaii.

In [16]:
val distanceInput = 4000
graph.triplets.filter(x => x.attr > distanceInput).distinct().sortBy(x => x.attr, false).take(10).foreach(println)

((12173,HNL),(12478,JFK),4983.0)
((12478,JFK),(12173,HNL),4983.0)
((12173,HNL),(11618,EWR),4962.0)
((11618,EWR),(12173,HNL),4962.0)
((12173,HNL),(12264,IAD),4817.0)
((12264,IAD),(12173,HNL),4817.0)
((10397,ATL),(12173,HNL),4502.0)
((12173,HNL),(10397,ATL),4502.0)
((12173,HNL),(13930,ORD),4243.0)
((13930,ORD),(12173,HNL),4243.0)


## Show graph triplets originating from Boston
#### Sort the result in descending order of distance.

In [17]:
graph.triplets.filter(x => x.srcAttr == "BOS").distinct().sortBy(x => x.attr, false).take(5).foreach(println)

((10721,BOS),(14771,SFO),2704.0)
((10721,BOS),(14831,SJC),2689.0)
((10721,BOS),(12892,LAX),2611.0)
((10721,BOS),(12954,LGB),2602.0)
((10721,BOS),(14679,SAN),2588.0)


#### Determine the number of flights from Boston to San Francisco in March 2016 (the dataset we are using contains only March 2016 data)

In [18]:
println(" The number of flights from Boston To San Francisco in March 2016 = " + 
        graph.triplets.filter(x => x.srcAttr == "BOS").filter(x => x.dstAttr == "SFO").count())

 The number of flights from Boston To San Francisco in March 2016 = 374


## A common aggregation task is computing the degree of each vertex
#### The degree of a vertrix represents the number of edges adjacent to each vertex.
#### In the context of directed graphs it is often useful to know the in-degree, out-degree, and the total degree of each vertex. 
## Define a reduce operation and compute the highest degree vertex

In [19]:
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}

val maxInDegree  = graph.inDegrees.reduce(max)
val maxOutDegree = graph.outDegrees.reduce(max)
val maxDegrees = graph.degrees.reduce(max)

println("Max in-degree = " + maxInDegree)
println("Max out-degree = " + maxOutDegree)
println("Max total degrees = " + maxDegrees)

Max in-degree = (10397,32682)
Max out-degree = (10397,32755)
Max total degrees = (10397,65437)


## Find which airport corresponds to the airport id with the maximum number of degrees

In [20]:
val maxDegrees_id = maxDegrees._1
val airport = graph.triplets.filter(x => x.srcId == maxDegrees_id).map(x => x.srcAttr).first()
println("The airport with the most inbound and outbound flights is " + airport +".")

The airport with the most inbound and outbound flights is ATL.


## PageRank measures the importance of each vertex in a graph
<img src="https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/GraphX/pagerank.png" height="70%" width="70%"></img>
#### PageRank assumes an edge from u to v represents an endorsement of vâ€™s importance by u. For example, if a Twitter user is followed by many others, the user will be ranked highly.
#### PageRank works by counting the number of edges to a vertex to determine a rough estimate of how important the vertex is. The underlying assumption is that more important vertices are likely to have more edges from other vertices. 
## Calculate PageRanks in descending order
#### To get the airport codes, we have to join the vertex id to the airport code in the airportVertices RDD

In [21]:
val ranks = graph.pageRank(0.1).vertices
ranks.sortBy(x => x._2, false).take(5).foreach(println)

val ranksByAirport = ranks.join(airportVertices).distinct().sortBy(x => x._2._1, false)
println("The most important airports by PageRank are")
ranksByAirport.map(x => x._2._2).take(5).foreach(println)

(10397,9.579238325208161)
(13930,5.7030256297628945)
(11298,5.086766672794899)
(11292,5.004115199618762)
(13487,3.7695530123968424)
The most important airports by PageRank are
ATL
ORD
DFW
DEN
MSP


## Subgraphs
The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links.
## Create a subgraph consisting of only flights that are greater than 4000 miles apart
### Here we are using an edge predicate to create the subgraph.

In [22]:
val distanceInput = 4000
val subgraph = graph.subgraph(epred = x => x.attr > distanceInput)
println("Created subgraph only with flights that are greater than " + distanceInput + " miles apart.")

subgraph.edges.distinct().sortBy(x => x.attr, true).take(5).foreach(println)
println("The number of vertices in the subgraph = "+ subgraph.numVertices)
println("The number of edges in the subgraph = "+ subgraph.numEdges)

Created subgraph only with flights that are greater than 4000 miles apart.
Edge(13830,13930,4184.0)
Edge(13930,13830,4184.0)
Edge(12173,13930,4243.0)
Edge(13930,12173,4243.0)
Edge(12173,10397,4502.0)
The number of vertices in the subgraph = 296
The number of edges in the subgraph = 287


## This time let's create a subgraph using a vertex predicate

In [23]:
val airportList = List("BOS", "SFO", "ATL", "LAX", "ORD")
val subgraph2 = graph.subgraph(vpred = (vid, attr) => airportList contains attr)
subgraph2.vertices.take(10).foreach(println)

(13930,ORD)
(12892,LAX)
(10397,ATL)
(10721,BOS)
(14771,SFO)


## Triangle Counting
#### A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the TriangleCount object that determines the number of triangles passing through each vertex, providing a measure of clustering. Note that TriangleCount requires the edges to be in canonical orientation (srcId < dstId) and the graph to be partitioned using the partitionBy member of the graph class. partitionBy repartitions the edges in the graph according to partitionStrategy. 
## Create a subgraph for Triangle Counting
#### To meet the edge orientation requirement, we will create a new subgraph that only contains edges/flights where org_id < dest_id. Although not exact, we can roughly view this as working with round trip segments starting at the lower order airport id for purposes of this demonstration. Note that the org_id is less then the dest_id for the sample Edges shown.
#### Also, note that the number of trips/edges is about half the number of edges in the original graph, while the number of vertices remains the same as we only emplyed an edge predicate.

In [24]:
val subgraphTC = graph.subgraph(epred = x => x.srcId < x.dstId)
println("The number of vertices in the subgraph = "+ subgraphTC.numVertices)
println("The number of edges in the subgraph = "+ subgraphTC.numEdges)
subgraphTC.edges.takeSample(false, 5).foreach(println)

The number of vertices in the subgraph = 296
The number of edges in the subgraph = 236619
Edge(12892,14869,590.0)
Edge(11298,12889,1055.0)
Edge(11697,13930,1182.0)
Edge(11292,12519,573.0)
Edge(14057,14771,550.0)


## Create a Traingle Count graph
#### The edges have the same attribute (distance) as the original graph and subgraph.

In [25]:
val TriangleCountGraph = subgraphTC.partitionBy(PartitionStrategy.RandomVertexCut).triangleCount()
val TriangleCountEdges = TriangleCountGraph.edges
TriangleCountEdges.sortBy(x => x.attr, false).take(5).foreach(println)

Edge(12173,12478,4983.0)
Edge(11618,12173,4962.0)
Edge(12173,12264,4817.0)
Edge(10397,12173,4502.0)
Edge(12173,13930,4243.0)


## Display the Vertices in Triangle Count descending order
#### The Triangle Count is given for each vertex.

In [26]:
val TriangleCountVertices = TriangleCountGraph.vertices
TriangleCountVertices.join(airportVertices).sortBy(x => x._2._1, false).take(5).foreach(println)

(10397,(1440,ATL))
(11292,(1331,DEN))
(13930,(1319,ORD))
(11298,(1125,DFW))
(12266,(1094,IAH))


## Determine the airports by airport code in Triangle Count descending order
#### To get the airport codes, we have to join the vertex id to the airport code in the airportVertices RDD

In [27]:
println("The airports with the highest Triangle Clount are")
TriangleCountVertices.join(airportVertices).sortBy(x => x._2._1, false).take(5).foreach(println)

The airports with the highest Triangle Clount are
(10397,(1440,ATL))
(11292,(1331,DEN))
(13930,(1319,ORD))
(11298,(1125,DFW))
(12266,(1094,IAH))


## Conclusion

This notebook demonstrated how to use Apache Spark GraphX to analyze airline data that is readily downloadable from the US Bureau of Transportation website. We specifically explored the Graph abstraction introduced by GraphX and used several of the basic operators to analyze the strucuture and relationships in the data. We saw how GraphX exposes RDD views of the vertices and edges stored within the graph and how GraphX maintains the vertices and edges in optimized data structures, VertrexRDD and EdgeRDD, which provide additional functionality above and beyond standard Spark RDDs. 

We also employed graph operators to transform the graphs into new graphs with transformed properties and structure. Finally, we worked with a few of the graph algorithms included with GraphX, like PageRank and Triangle Counting, to demonstrate how they simplify analytics tasks. 