In [1]:
%fs ls /databricks-datasets/flights/

path,name,size
dbfs:/databricks-datasets/flights/README.md,README.md,412
dbfs:/databricks-datasets/flights/airport-codes-na.txt,airport-codes-na.txt,11411
dbfs:/databricks-datasets/flights/departuredelays.csv,departuredelays.csv,33396236


In [2]:
# Set File Paths
tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports_na")

# Obtain departure Delays data
departureDelays = spark.read.csv(tripdelaysFilePath, header='true')
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()

# Available IATA codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

In [3]:
departureDelays.count()

In [4]:
# Build `departureDelays_geo` DataFrame
#  Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

# Count
departureDelays_geo.count()

In [5]:
departureDelays_geo.show(10)

In [6]:
display(departureDelays_geo)

tripid,localdate,delay,distance,src,dst,city_src,city_dst,state_src,state_dst
1011111,2014-01-01T11:11:00.000+0000,-5,221,MSP,INL,Minneapolis,International Falls,MN,MN
1021111,2014-01-02T11:11:00.000+0000,7,221,MSP,INL,Minneapolis,International Falls,MN,MN
1031111,2014-01-03T11:11:00.000+0000,0,221,MSP,INL,Minneapolis,International Falls,MN,MN
1041925,2014-01-04T19:25:00.000+0000,0,221,MSP,INL,Minneapolis,International Falls,MN,MN
1061115,2014-01-06T11:15:00.000+0000,33,221,MSP,INL,Minneapolis,International Falls,MN,MN
1071115,2014-01-07T11:15:00.000+0000,23,221,MSP,INL,Minneapolis,International Falls,MN,MN
1081115,2014-01-08T11:15:00.000+0000,-9,221,MSP,INL,Minneapolis,International Falls,MN,MN
1091115,2014-01-09T11:15:00.000+0000,11,221,MSP,INL,Minneapolis,International Falls,MN,MN
1101115,2014-01-10T11:15:00.000+0000,-3,221,MSP,INL,Minneapolis,International Falls,MN,MN
1112015,2014-01-11T20:15:00.000+0000,-7,221,MSP,INL,Minneapolis,International Falls,MN,MN


In [7]:
# Note, ensure you have already installed the GraphFrames spack-package
from pyspark.sql.functions import *
from graphframes import *

# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

In [8]:
# Vertices
#   The vertices of our graph are the airports
display(tripVertices)

id,City,State,Country
FAT,Fresno,CA,USA
CMH,Columbus,OH,USA
PHX,Phoenix,AZ,USA
PAH,Paducah,KY,USA
COS,Colorado Springs,CO,USA
MYR,Myrtle Beach,SC,USA
RNO,Reno,NV,USA
SRQ,Sarasota,FL,USA
VLD,Valdosta,GA,USA
PSC,Pasco,WA,USA


In [9]:
# Edges
#  The edges of our graph are the flights between airports
display(tripEdges)

tripid,delay,src,dst,city_dst,state_dst
1011111,-5,MSP,INL,International Falls,MN
1021111,7,MSP,INL,International Falls,MN
1031111,0,MSP,INL,International Falls,MN
1041925,0,MSP,INL,International Falls,MN
1061115,33,MSP,INL,International Falls,MN
1071115,23,MSP,INL,International Falls,MN
1081115,-9,MSP,INL,International Falls,MN
1091115,11,MSP,INL,International Falls,MN
1101115,-3,MSP,INL,International Falls,MN
1112015,-7,MSP,INL,International Falls,MN


In [10]:
# Build `tripGraph` GraphFrame
#  This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)

# Build `tripGraphPrime` GraphFrame
#   This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)

In [11]:
print ("Airports:" ,tripGraph.vertices.count())
print ("Trips:" , tripGraph.edges.count())

In [12]:
tripGraph.edges.groupBy().max("delay").show()

In [13]:
# Finding the longest Delay
longestDelay = tripGraph.edges.groupBy().max("delay")
display(longestDelay)

max(delay)
1642


In [14]:
# Determining number of on-time / early flights vs. delayed flights
print ("On-time / Early Flights:", tripGraph.edges.filter("delay <= 0").count())
print ("Delayed Flights:", tripGraph.edges.filter("delay > 0").count())

In [15]:
tripGraph.edges\
  .filter("src = 'SEA' and delay > 0")\
  .groupBy("src", "dst")\
  .avg("delay")\
  .sort(desc("avg(delay)"))\
  .show(5)

In [16]:
display(tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)")))

src,dst,avg(delay)
SEA,PHL,55.66666666666666
SEA,COS,43.53846153846154
SEA,FAT,43.03846153846154
SEA,LGB,39.39705882352941
SEA,IAD,37.73333333333333
SEA,MIA,37.325581395348834
SEA,SFO,36.50210378681627
SEA,SBA,36.48275862068966
SEA,JFK,35.03125
SEA,ORD,33.60335195530726


In [17]:
# After displaying tripDelays, use Plot Options to set `state_dst` as a Key.
tripDelays = tripGraph.edges.filter("delay > 0")
display(tripDelays)

tripid,delay,src,dst,city_dst,state_dst
1021111,7,MSP,INL,International Falls,MN
1061115,33,MSP,INL,International Falls,MN
1071115,23,MSP,INL,International Falls,MN
1091115,11,MSP,INL,International Falls,MN
1171115,4,MSP,INL,International Falls,MN
2091925,1,MSP,INL,International Falls,MN
2152015,16,MSP,INL,International Falls,MN
2161925,169,MSP,INL,International Falls,MN
2171115,27,MSP,INL,International Falls,MN
2181115,96,MSP,INL,International Falls,MN


In [18]:
# States with the longest cumulative delays (with individual delays > 100 minutes) (origin: Seattle)
display(tripGraph.edges.filter("src = 'SEA' and delay > 100"))

tripid,delay,src,dst,city_dst,state_dst
3201938,108,SEA,BUR,Burbank,CA
3201655,107,SEA,SNA,Orange County,CA
1011950,123,SEA,OAK,Oakland,CA
1021950,194,SEA,OAK,Oakland,CA
1021615,317,SEA,OAK,Oakland,CA
1021755,385,SEA,OAK,Oakland,CA
1031950,283,SEA,OAK,Oakland,CA
1031615,364,SEA,OAK,Oakland,CA
1031325,130,SEA,OAK,Oakland,CA
1061755,107,SEA,OAK,Oakland,CA


In [19]:
# Degrees
#  The number of degrees - the number of incoming and outgoing connections - for various airports within this sample dataset
display(tripGraph.degrees.sort(desc("degree")).limit(20))

id,degree
ATL,179774
DFW,133966
ORD,125405
LAX,106853
DEN,103699
IAH,85685
PHX,79672
SFO,77635
LAS,66101
CLT,56103


In [20]:
# inDegrees
#  The number of degrees - the number of incoming connections - for various airports within this sample dataset
display(tripGraph.inDegrees.sort(desc("inDegree")).limit(20))

id,inDegree
ATL,89633
DFW,65767
ORD,61654
LAX,53184
DEN,50738
IAH,42512
PHX,39619
SFO,38641
LAS,32994
CLT,28044


In [21]:
# outDegrees
#  The number of degrees - the number of outgoing connections - for various airports within this sample dataset
display(tripGraph.outDegrees.sort(desc("outDegree")).limit(20))

id,outDegree
ATL,90141
DFW,68199
ORD,63751
LAX,53669
DEN,52961
IAH,43173
PHX,40053
SFO,38994
LAS,33107
CLT,28059


In [22]:
%scala
package d3a
// We use a package object so that we can define top level classes like Edge that need to be used in other cells

import org.apache.spark.sql._
import com.databricks.backend.daemon.driver.EnhancedRDDFunctions.displayHTML

case class Edge(src: String, dest: String, count: Long)

case class Node(name: String)
case class Link(source: Int, target: Int, value: Long)
case class Graph(nodes: Seq[Node], links: Seq[Link])

object graphs {
val sqlContext = SQLContext.getOrCreate(org.apache.spark.SparkContext.getOrCreate())
import sqlContext.implicits._

def force(clicks: Dataset[Edge], height: Int = 100, width: Int = 960): Unit = {
  val data = clicks.collect()
  val nodes = (data.map(_.src) ++ data.map(_.dest)).map(_.replaceAll("_", " ")).toSet.toSeq.map(Node)
  val links = data.map { t =>
    Link(nodes.indexWhere(_.name == t.src.replaceAll("_", " ")), nodes.indexWhere(_.name == t.dest.replaceAll("_", " ")), t.count / 20 + 1)
  }
  showGraph(height, width, Seq(Graph(nodes, links)).toDF().toJSON.first())
}

/**
 * Displays a force directed graph using d3
 * input: {"nodes": [{"name": "..."}], "links": [{"source": 1, "target": 2, "value": 0}]}
 */
def showGraph(height: Int, width: Int, graph: String): Unit = {

displayHTML(s"""<!DOCTYPE html>
<html>
  <head>
    <link type="text/css" rel="stylesheet" href="https://mbostock.github.io/d3/talk/20111116/style.css"/>
    <style type="text/css">
      #states path {
        fill: #ccc;
        stroke: #fff;
      }

      path.arc {
        pointer-events: none;
        fill: none;
        stroke: #000;
        display: none;
      }

      path.cell {
        fill: none;
        pointer-events: all;
      }

      circle {
        fill: steelblue;
        fill-opacity: .8;
        stroke: #fff;
      }

      #cells.voronoi path.cell {
        stroke: brown;
      }

      #cells g:hover path.arc {
        display: inherit;
      }
    </style>
  </head>
  <body>
    <script src="https://mbostock.github.io/d3/talk/20111116/d3/d3.js"></script>
    <script src="https://mbostock.github.io/d3/talk/20111116/d3/d3.csv.js"></script>
    <script src="https://mbostock.github.io/d3/talk/20111116/d3/d3.geo.js"></script>
    <script src="https://mbostock.github.io/d3/talk/20111116/d3/d3.geom.js"></script>
    <script>
      var graph = $graph;
      var w = $width;
      var h = $height;

      var linksByOrigin = {};
      var countByAirport = {};
      var locationByAirport = {};
      var positions = [];

      var projection = d3.geo.azimuthal()
          .mode("equidistant")
          .origin([-98, 38])
          .scale(1400)
          .translate([640, 360]);

      var path = d3.geo.path()
          .projection(projection);

      var svg = d3.select("body")
          .insert("svg:svg", "h2")
          .attr("width", w)
          .attr("height", h);

      var states = svg.append("svg:g")
          .attr("id", "states");

      var circles = svg.append("svg:g")
          .attr("id", "circles");

      var cells = svg.append("svg:g")
          .attr("id", "cells");

      var arc = d3.geo.greatArc()
          .source(function(d) { return locationByAirport[d.source]; })
          .target(function(d) { return locationByAirport[d.target]; });

      d3.select("input[type=checkbox]").on("change", function() {
        cells.classed("voronoi", this.checked);
      });

      // Draw US map.
      d3.json("https://mbostock.github.io/d3/talk/20111116/us-states.json", function(collection) {
        states.selectAll("path")
          .data(collection.features)
          .enter().append("svg:path")
          .attr("d", path);
      });

      // Parse links
      graph.links.forEach(function(link) {
        var origin = graph.nodes[link.source].name;
        var destination = graph.nodes[link.target].name;

        var links = linksByOrigin[origin] || (linksByOrigin[origin] = []);
        links.push({ source: origin, target: destination });

        countByAirport[origin] = (countByAirport[origin] || 0) + 1;
        countByAirport[destination] = (countByAirport[destination] || 0) + 1;
      });

      d3.csv("https://mbostock.github.io/d3/talk/20111116/airports.csv", function(data) {

        // Build list of airports.
        var airports = graph.nodes.map(function(node) {
          return data.find(function(airport) {
            if (airport.iata === node.name) {
              var location = [+airport.longitude, +airport.latitude];
              locationByAirport[airport.iata] = location;
              positions.push(projection(location));

              return true;
            } else {
              return false;
            }
          });
        });

        // Compute the Voronoi diagram of airports' projected positions.
        var polygons = d3.geom.voronoi(positions);

        var g = cells.selectAll("g")
            .data(airports)
          .enter().append("svg:g");

        g.append("svg:path")
            .attr("class", "cell")
            .attr("d", function(d, i) { return "M" + polygons[i].join("L") + "Z"; })
            .on("mouseover", function(d, i) { d3.select("h2 span").text(d.name); });

        g.selectAll("path.arc")
            .data(function(d) { return linksByOrigin[d.iata] || []; })
          .enter().append("svg:path")
            .attr("class", "arc")
            .attr("d", function(d) { return path(arc(d)); });

        circles.selectAll("circle")
            .data(airports)
            .enter().append("svg:circle")
            .attr("cx", function(d, i) { return positions[i][0]; })
            .attr("cy", function(d, i) { return positions[i][1]; })
            .attr("r", function(d, i) { return Math.sqrt(countByAirport[d.iata]); })
            .sort(function(a, b) { return countByAirport[b.iata] - countByAirport[a.iata]; });
      });
    </script>
  </body>
</html>""")
  }

  def help() = {
displayHTML("""
<p>
Produces a force-directed graph given a collection of edges of the following form:</br>
<tt><font color="#a71d5d">case class</font> <font color="#795da3">Edge</font>(<font color="#ed6a43">src</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">dest</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">count</font>: <font color="#a71d5d">Long</font>)</tt>
</p>
<p>Usage:<br/>
<tt>%scala</tt></br>
<tt><font color="#a71d5d">import</font> <font color="#ed6a43">d3._</font></tt><br/>
<tt><font color="#795da3">graphs.force</font>(</br>
&nbsp;&nbsp;<font color="#ed6a43">height</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">width</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">clicks</font>: <font color="#795da3">Dataset</font>[<font color="#795da3">Edge</font>])</tt>
</p>""")
  }
}


In [23]:
%scala d3a.graphs.help()

In [24]:
%scala
// On-time and Early Arrivals
import d3a._
graphs.force(
  height = 800,
  width = 1200,
  clicks = sql("""select src, dst as dest, count(1) as count from departureDelays_geo where delay <= 0 group by src, dst""").as[Edge])

In [25]:
%scala
// Delayed Trips from CA, OR, and/or WA
import d3a._
graphs.force(
  height = 800,
  width = 1200,
  clicks = sql("""select src, dst as dest, count(1) as count from departureDelays_geo where state_src in ('CA', 'OR', 'WA') and delay > 0 group by src, dst""").as[Edge])

In [26]:
%scala
// Trips (from DepartureDelays Dataset)
import d3a._
graphs.force(
  height = 800,
  width = 1200,
  clicks = sql("""select src, dst as dest, count(1) as count from departureDelays_geo group by src, dst""").as[Edge])