Skip to content

Azure Cosmos DB Spark Connector User Guide

Fabian Meiswinkel edited this page Oct 13, 2021 · 6 revisions

This project provides a client library that allows Azure Cosmos DB to act as an input source or output sink for Spark jobs. Fast connectivity between Apache Spark and Azure Cosmos DB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure Cosmos DB. With the Spark to Cosmos DB connector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.

Note, Official instructions for using the connector are included in the Cosmos DB documentation, in the Accelerate real-time big-data analytics with the Spark to Cosmos DB connector article

Current Version of Spark Connector: 1.0.0

Officially supported versions:

... Version
Apache Spark 2.0.1, 2.1, 2.2
Scala 2.10, 2.11
Azure Cosmos DB Java SDK 1.14.0, 1.15.0

This user guide helps you run some simple samples with the python (via pyDocumentDB) and scala interface.

There will be two approaches for connectivity between Apache Spark and Azure Cosmos DB:

pyDocumentDB

The current pyDocumentDB SDK allows us to connect Spark to DocumentDB via the following diagram flow.

Spark to Cosmos DB Data Flow via pyDocumentDB

The data flow is as follows:

  1. Connection is made from Spark master node to Cosmos DB gateway node via pyDocumentDB. Note, user only specifies Spark and Cosmos DB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.
  2. Query is made against DocuemntDB (via the gateway node) where the query subsequently runs the query against the collection's partitions in the data nodes. The response for those queries is sent back to the gateway node and that resultset is returned to Spark master node.
  3. Any subsequent queries (e.g. against a Spark DataFrame) is sent to the Spark worker nodes for processing.

The important call out is that communication between Spark and Cosmos DB is limited to the Spark master node and Cosmos DB gateway nodes. The queries will go as fast as the transport layer is between these two nodes.

Installing pyDocumentDB

You can install pyDocumentDB on your driver node using pip, i.e:

pip install pyDocumentDB

Connecting Spark to Cosmos DB via pyDocumentDB

But in return for the simplicity of the communication transport, executing a query from Spark to Cosmos DB using pyDocumentDB is relatively simple.

Below is a code snippet on how to use pyDocumentDB within a Spark context.

# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery 
connectionPolicy.PreferredLocations = ["Central US", "East US 2", "Southeast Asia", "Western Europe","Canada Central"]


# Set keys to connect to Cosmos DB 
masterKey = '<YourMasterKey>' 
host = 'https://doctorwho.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)

As noted in the code snippet:

  • The Cosmos DB python SDK contains the all the necessary connection parameters including the preferred locations (i.e. choosing which read replica in what priority order).
  • Just import the necessary libraries and configure your masterKey and host to create the Cosmos DB client (pydocumentdb.document_client).

Executing Spark Queries via pyDocumentDB

Below is an example using the above Cosmos DB instance via the specified read-only keys. This code snippet below connects to the airports.codes collection (in the DoctorWho account as specified earlier) running a query to extract the airport cities in Washington state.

# Configure Database and Collections
databaseId = 'airports'
collectionId = 'codes'

# Configurations the Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId


# Set query parameter
querystr = "SELECT c.City FROM c WHERE c.State='WA'"

# Query documents
query = client.QueryDocuments(collLink, querystr, options=None, partition_key=None)

# Query for partitioned collections
# query = client.QueryDocuments(collLink, query, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Push into list `elements`
elements = list(query)

Once the query has been executed via query, the result is a query_iterable.QueryIterable that is converted into a Python list. A Python list can be easily converted into a Spark DataFrame using the code below.

# Create `df` Spark DataFrame from `elements` Python list
df = spark.createDataFrame(elements)

Scenarios

Connecting Spark to CosmosDB using pyDocumentDB are typically for scenarios where:

  • You want to use python
  • You are returning a relatively small resultset from Cosmos DB to Spark. Note, the underlying dataset within Cosmos DB can be quite large. It is more that you are applying filters - i.e. running predicate filters - against your Cosmos DB source.

.

Spark to Cosmos DB Connector

The Spark to Cosmos DB connector that will utilize the Azure DocumentDB Java SDK will utilize the following flow:

The data flow is as follows:

  1. Connection is made from Spark master node to Cosmos DB gateway node to obtain the partition map. Note, user only specifies Spark and Cosmos DB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.
  2. This information is provided back to the Spark master node. At this point, we should be able to parse the query to determine which partitions (and their locations) within Cosmos DB we need to access.
  3. This information is transmitted to the Spark worker nodes ...
  4. Thus allowing the Spark worker nodes to connect directly to the Cosmos DB partitions directly to extract the data that is needed and bring the data back to the Spark partitions within the Spark worker nodes.

The important call out is that communication between Spark and Cosmos DB is significantly faster because the data movement is between the Spark worker nodes and the Cosmos DB data nodes (partitions).

Building the Azure Cosmos DB Spark Connector

Currently, this connector project uses maven so to build without dependencies, you can run:

mvn clean package

You can also download the latest versions of the jar within the releases folder. The current version of the Spark connector which you can download directly is azure-cosmosdb-spark_1.0.0_2.1.0_2.11.

Including the Azure DocumentDB Spark JAR

Prior to executing any code, you will first need to include the Azure DocumentDB Spark JAR. If you are using the spark-shell, then you can include the JAR using the --jars option.

spark-shell --master yarn --jars /$location/azure-cosmosdb-spark_2.1.0_2.11-1.0.0-uber.jar

or if you want to execute the jar without dependencies:

spark-shell --master yarn --jars /$location/azure-cosmosdb-spark_2.1.0_2.11-1.0.0.jar,/$location/azure-documentdb-1.14.0.jar,/$location/azure-documentdb-rx-0.9.0-rc2.jar,/$location/json-20140107.jar,/$location/rxjava-1.3.0.jar,/$location/rxnetty-0.4.20.jar 

If you are using a notebook service such as Azure HDInsight Jupyter notebook service, you can use the spark magic commands:

%%configure
{ "name":"Spark-to-Cosmos_DB_Connector", 
  "jars": ["wasb:///example/jars/1.0.0/azure-cosmosdb-spark_2.1.0_2.11-1.0.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-1.14.0.jar", "wasb:///example/jars/1.0.0/azure-documentdb-rx-0.9.0-rc2.jar", "wasb:///example/jars/1.0.0/json-20140107.jar", "wasb:///example/jars/1.0.0/rxjava-1.3.0.jar", "wasb:///example/jars/1.0.0/rxnetty-0.4.20.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

The jars command allows you to include the two jars needed for azure-cosmosdb-spark (itself and the Azure DocumentDB Java SDK) and excludes scala-reflect so it does not interfere with the Livy calls made (Jupyter notebook > Livy > Spark).

Connecting Spark to Cosmos DB via the azure-cosmosdb-spark

While the communication transport is a little more complicated, executing a query from Spark to Cosmos DB using azure-cosmosdb-spark is significantly faster.

Below is a code snippet on how to use azure-cosmosdb-spark within a Spark context.

Python

# Base Configuration
flightsConfig = {
"Endpoint" : "https://doctorwho.documents.azure.com:443/",
"Masterkey" : "<YourMasterKey>",
"Database" : "DepartureDelays",
"preferredRegions" : "Central US;East US2",
"Collection" : "flights_pcoll", 
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
}

# Connect via Spark connector to create Spark DataFrame
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**flightsConfig).load()
flights.count()

Scala

// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Configure connection to your collection
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "<YourMasterKey>",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US2;",
"Collection" -> "flights_pcoll", 
"query_custom" -> "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
"SamplingRatio" -> "1.0"))
 
// Create collection connection 
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

As noted in the code snippet:

  • azure-cosmosdb-spark contains the all the necessary connection parameters including the preferred locations (i.e. choosing which read replica in what priority order).
  • Just import the necessary libraries and configure your masterKey and host to create the Cosmos DB client.

Executing Spark Queries via azure-cosmosdb-spark

Below is an example using the above Cosmos DB instance via the specified read-only keys. This code snippet below connects to the DepartureDelays.flights_pcoll collection (in the DoctorWho account as specified earlier) running a query to extract the flight delay information of flights departing from Seattle.

// Queries
var query = "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.destination = 'SFO'"
val df = spark.sql(query)

// Run DF query (count)
df.count()

// Run DF query (show)
df.show()

Scenarios

Connecting Spark to Cosmos DB using azure-cosmosdb-spark are typically for scenarios where:

  • You want to use Python and/or Scala
  • You have a large amount of data to transfer between Apache Spark and Cosmos DB

To give you an idea of the query performance difference, please refer to Query Test Runs in this wiki.