# Spark to DocumentDB Connector

Connecting Apache Spark to Azure DocumentDB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure DocumentDB.  With the Spark to DocumentDB conector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.   

The Spark to DocumentDB connector utilizes the [Azure DocumentDB Java SDK](https://github.com/Azure/azure-documentdb-java) will utilize the following flow:

<img style="align: left;" src="https://raw.githubusercontent.com/dennyglee/notebooks/master/images/Azure-DocumentDB-Spark_Connector_600x266.png">



The data flow is as follows:

1. Connection is made from Spark master node to DocumentDB gateway node to obtain the partition map. Note, user only specifies Spark and DocumentDB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.
2. This information is provided back to the Spark master node. At this point, we should be able to parse the query to determine which partitions (and their locations) within DocumentDB we need to access.
3. This information is transmitted to the Spark worker nodes ...
4. Thus allowing the Spark worker nodes to connect directly to the DocumentDB partitions directly to extract the data that is needed and bring the data back to the Spark partitions within the Spark worker nodes.


In [1]:
%%configure
{ "jars": ["wasb:///example/jars/azure-documentdb-1.10.0.jar","wasb:///example/jars/azure-documentdb-spark-0.0.1.jar"],
  "conf": {
    "spark.jars.packages": "graphframes:graphframes:0.3.0-spark2.0-s_2.11",   
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
17,application_1489125951706_0016,spark,busy,Link,Link,


In [2]:
// Import Spark to DocumentDB Connector
import com.microsoft.azure.documentdb.spark.schema._
import com.microsoft.azure.documentdb.spark._
import com.microsoft.azure.documentdb.spark.config.Config

// Connect to DocumentDB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll", 
"SamplingRatio" -> "1.0"))

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
18,application_1489125951706_0017,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
readConfig2: com.microsoft.azure.documentdb.spark.config.Config = com.microsoft.azure.documentdb.spark.config.ConfigBuilder$$anon$1@4848afe

In [3]:
// Create collection connection 
val coll = spark.sqlContext.read.DocumentDB(readConfig2)
coll.createOrReplaceTempView("c")

## Query 1: Flights departing from Seattle (Top 100)

In [4]:
// Run, get row count, and time query
val top100 = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA' LIMIT 100")
top100.createOrReplaceTempView("top100")

In [5]:
%%sql
select * from top100 limit 10

## Query 2: Flights departing from Seattle

In [6]:
// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")

### Determine the number of flights departing from Seattle (in this dataset)

In [7]:
%%sql
select count(1) from originSEA

### Total delay grouped by destination
Not just `counts` but with Spark SQL and DocumentDB, can easily do `GROUP BY`

In [8]:
%%sql
select destination, sum(delay) as TotalDelays 
from originSEA 
group by destination 
order by sum(delay) desc limit 10

### Get distinct ordered destination airports departing from Seattle

In [9]:
%%sql
select distinct destination from originSEA order by destination limit 5

### Top 5 delayed destination cities departing from Seattle (by Total Delay)

In [10]:
%%sql
select destination, sum(delay) 
from originSEA
where delay < 0 
group by destination 
order by sum(delay) limit 5

### Calculate median delays by destination cities departing from Seattle

In [11]:
%%sql
select destination, percentile_approx(delay, 0.5) as median_delay 
from originSEA
where delay < 0 
group by destination 
order by percentile_approx(delay, 0.5)

### Query 3: Access all data (~1.39M rows) or Access data from key airports (e.g. SEA, SFO, SJC, JFK, ATL, etc.)

In [12]:
// Run, get row count, and time query (filtering out test data)
val departureDelays = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin IN ('SEA', 'SFO', 'SJC', 'JFK', 'LAX', 'LAS', 'BOS', 'IAD', 'ORD', 'DFW', 'MSP', 'DTW', 'DEN', 'ATL')")
departureDelays.createOrReplaceTempView("departureDelays")

In [13]:
// Checking the number of rows (and cache)
departureDelays.cache()
departureDelays.count()

res20: Long = 547350

## Using GraphFrames for Apache Spark to run motif queries
Using GraphFrames to build a graph against the flights data stored within DocumentDB.

### Prepare the data

In [14]:
// Set File Paths
val airportsnaFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/airport-codes-na.txt"

// Obtain airport information
val airportsna = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter", "\t").load(airportsnaFilePath)
airportsna.createOrReplaceTempView("airports_na")

// Available IATA codes from the departuredelays sample dataset
val tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

// Only include airports with atleast one trip from the departureDelays dataset
val airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

// Build `departureDelays_geo` DataFrame
val departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

// Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

res37: departureDelays_geo.type = [tripid: int, localdate: timestamp ... 8 more fields]

In [15]:
// Check number of flights
departureDelays_geo.count()

res39: Long = 541524

### Build the graphFrame

In [16]:
// import graphframes package
import org.graphframes._

// Create Vertices (airports) and Edges (flights)
val tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

// Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

// Build tripGraph GraphFrame
//   This GraphFrame builds up on the vertices and edges based on our trips (flights)
val tripGraph = GraphFrame(tripVertices, tripEdges)

tripGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])

### Determine the number of airports and trips

In [17]:
// Number of airports
tripGraph.vertices.count()

res51: Long = 262

In [18]:
// Number of flights
tripGraph.edges.count()

res53: Long = 541524

### What flights departing SEA are most likely to have significant delays

In [19]:
val flightDelays = tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)"))
flightDelays.createOrReplaceTempView("flightDelays")

In [20]:
%%sql
select * from flightDelays order by `avg(delay)` desc limit 10

### Which is the most important airport (in terms of connections)
Note this is from this *filtered* dataset

In [21]:
val airportConnections = tripGraph.degrees.sort(desc("degree"))
airportConnections.createOrReplaceTempView("airportConnections")

In [22]:
%%sql
select id, degree from airportConnections order by degree desc limit 10

### Direct flights between Seattle and San Jose?

In [23]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SEA'").toExpr("id = 'SJC'").maxPathLength(1).run()
filteredPaths.show()

+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SEA,Seattle,WA,USA]|[1010600,-2,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1012030,-4,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1011215,-6,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1011855,-3,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1010710,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1020600,2,SEA,SJ...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1022030,-3,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021600,-2,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021215,-9,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021855,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1020710,-9,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1030600,-5,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[10

### Direct flights between Buffalo and San Jose?

In [24]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SJC'").toExpr("id = 'BUF'").maxPathLength(1).run()
filteredPaths.show()

+---+----+-----+-------+
| id|City|State|Country|
+---+----+-----+-------+
+---+----+-----+-------+

No direct flights, but how about one layover?

In [25]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SJC'").toExpr("id = 'BUF'").maxPathLength(2).run()
filteredPaths.show()

+--------------------+--------------------+-------------------+--------------------+--------------------+
|                from|                  e0|                 v1|                  e1|                  to|
+--------------------+--------------------+-------------------+--------------------+--------------------+
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1010635,-6,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1011059,13,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1011427,19,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1020635,-4,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1021059,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BOS,Boston,MA,USA]|[1021427,194,BOS,...|[BUF,Buffalo,NY,USA]|
|[SJC,San Jose,CA,...|[1012124,16,SJC,B...|[BO

### What is the most common transfer point?

In [28]:
val commonTransferPoint = filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count"))
commonTransferPoint.createOrReplaceTempView("commonTransferPoint")

In [30]:
%%sql
select * from commonTransferPoint limit 10