Skip to content

Aggregations Examples

Denny Lee edited this page Jan 7, 2018 · 9 revisions

Below are some examples of how you can do distributed aggregations and analytics using Apache Spark and Azure Cosmos DB together. Note, Azure Cosmos DB already has support for aggregations (link to blog goes here) so here is how you can take it to the next level with Apache Spark.

Note, these aggregations are in reference to the Spark to Cosmos DB Connector Notebook

Connecting to Flights Sample Data

For these aggregations examples, we are accessing some flight performance data stored in our DoctorWho Cosmos DB database. To connect to it, you will need to utilize the following code snippet below:

// Import Spark to Cosmos DB Connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Connect to Cosmos DB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll", 
"SamplingRatio" -> "1.0"))

// Create collection connection 
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

With this, we will also run a base query which transfer the filtered set of data we want from Cosmos DB to Spark (where the latter can perform distributed aggregates). In this case, we are asking for flights departing from Seattle (SEA).

// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")

The results below are from running the queries using Jupyter notebook service. Note, all of these code snippets are generic and not specific to any service.

Running LIMIT and COUNT queries

Just like you're used to in SQL / Spark SQL, let's start off with a LIMIT query:

The next query being a simple and fast COUNT query:

GROUP BY query

In this next set, now we can easily run GROUP BY queries against our DocumentDB database:

select destination, sum(delay) as TotalDelays 
from originSEA 
group by destination 
order by sum(delay) desc limit 10

DISTINCT, ORDER BY query

And here is a DISTINCT, ORDER BY query:

Continuing Flights Data Analysis

Below are some example queries to continue the analysis of our flights data:

Top 5 Delayed Destinations (Cities) departing from Seattle

select destination, sum(delay) 
from originSEA
where delay < 0 
group by destination 
order by sum(delay) limit 5

Calculate median delays by destination cities departing from Seattle

select destination, percentile_approx(delay, 0.5) as median_delay 
from originSEA
where delay < 0 
group by destination 
order by percentile_approx(delay, 0.5)
You can’t perform that action at this time.