# Stream data to from Kafka to Cosmos DB

This notebook uses Spark Structured Streaming to retrieve data from Kafka on HDInsight and store it into Azure Cosmos DB. It uses the [Azure CosmosDB Spark Connector](https://github.com/Azure/azure-cosmosdb-spark) to write to a Cosmos DB SQL API database. For more information on using the connector, see [https://github.com/Azure/azure-cosmosdb-spark](https://github.com/Azure/azure-cosmosdb-spark)

## To use this notebook

Jupyter Notebooks allow you to modify and run the code in this document. To run a section (known as a 'cell',) select it and then use CTRL + ENTER, or select the play button on the toolbar above. Note that each section already has some example output beneath it, so you can see what the results of running a cell will look like.

NOTE: You must run each cell in order, from top to bottom. Running cells out of order can result in an error.

## Requirements

* An Azure Virtual Network
* A Spark on HDInsight 3.6 cluster, inside the virtual network
* A Kafka on HDInsight cluster, inside the virtual network
* A Cosmos DB SQL API database

## Load packages

Run the next cell to load the packages used by this notebook:

* spark-sql-kafka-0-10_2.11, version 2.1.0 - Used to read from Kafka.
* azure-cosmosdb-spark_2.1.0_2.11, version 1.0.0 - The Spark connector used to communicate with Azure Cosmos DB.
* azure-documentdb, version 1.15.1 - The DocumentDB SDK. This is used by the connector to communicate with Cosmos DB.

In [1]:
%%configure -f
{
    "name":"Spark-to-Cosmos_DB_Connector", 
    "executorMemory": "8G", 
    "executorCores": 2, 
    "numExecutors":9,
    "driverMemory" : "2G",
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.microsoft.azure:azure-cosmosdb-spark_2.2.0_2.11:1.0.0,com.microsoft.azure:azure-documentdb:1.15.1", 
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1533323783391_0006,spark,busy,Link,Link,


## Set the Kafka broker hosts information

In the next cell, replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts for your Kafka cluster. This is used to write data to the Kafka cluster. To get the broker host information, use one of the following methods:

* From Bash or other Unix shell:

    ```bash
CLUSTERNAME='the name of your HDInsight cluster'
PASSWORD='the password for your cluster login account'
curl -u admin:$PASSWORD -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    ```

* From Azure Powershell:

    ```powershell
$creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
$clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
$resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
  -Credential $creds `
  -UseBasicParsing
$respObj = ConvertFrom-Json $resp.Content
$brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
($brokerHosts -join ":9092,") + ":9092"
    ```

In [2]:
// The Kafka broker hosts and topic used to read to Kafka
val kafkaBrokers="YOUR_BROKER_HOSTS"
val kafkaTopic="tripdata"

println("broker and topic set.")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1531345171959_0013,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
broker and topic set.

## Configure the Cosmos DB connection information

In the following cell, you must provide the information used to connect to your Cosmos DB. Use the information in [Create a document database using Java and the Azure portal](https://docs.microsoft.com/en-us/azure/cosmos-db/create-sql-api-java) to create a database and collection, then retrieve the endpoint, master key, and preferred region information.

__NOTE__: When following the steps in [Create a document database using Java and the Azure portal](https://docs.microsoft.com/en-us/azure/cosmos-db/create-sql-api-java), you do not need to add sample data to the collection or build the code. You only need to create the database, collection, and retrieve the connection information.
    

In [3]:
// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._

// Current version of the connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config

var configMap = Map(
    "Endpoint" -> "YOUR_COSMOSDB_ENDPOINT",
    "Masterkey" -> "YOUR_MASTER_KEY",
    "Database" -> "kafkadata",
    // use a ';' to delimit multiple regions
    "PreferredRegions" -> "West US;",
    "Collection" -> "kafkacollection"
)

println("Cosmos DB configuration set.")

Cosmos DB configuration set.

## Define the schema and source stream

The following cell creates the stream that reads from Kafka. Data read from Kafka contains several columns. In this case, we only use the `value` column, as it contains the taxi trip data written by the other notebook. To make this data easier to work with, a schema is applied.

In [4]:
// Import bits useed for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Define a schema for the data
val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
// Reproduced here for readability
//val schema = (new StructType)
//   .add("dropoff_latitude", StringType)
//   .add("dropoff_longitude", StringType)
//   .add("extra", StringType)
//   .add("fare_amount", StringType)
//   .add("improvement_surcharge", StringType)
//   .add("lpep_dropoff_datetime", StringType)
//   .add("lpep_pickup_datetime", StringType)
//   .add("mta_tax", StringType)
//   .add("passenger_count", StringType)
//   .add("payment_type", StringType)
//   .add("pickup_latitude", StringType)
//   .add("pickup_longitude", StringType)
//   .add("ratecodeid", StringType)
//   .add("store_and_fwd_flag", StringType)
//   .add("tip_amount", StringType)
//   .add("tolls_amount", StringType)
//   .add("total_amount", StringType)
//   .add("trip_distance", StringType)
//   .add("trip_type", StringType)
//   .add("vendorid", StringType)

// Read from the Kafka stream source
val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets","earliest").load()

// Select the value of the Kafka message and apply the trip schema to it
val taxiData = kafka.select(
    from_json(col("value").cast("string"), schema) as "trip")

// The output of this cell is similar to the following value:
// taxiData: org.apache.spark.sql.DataFrame = [trip: struct<dropoff_latitude: string, dropoff_longitude: string ... 18 more fields>]

taxiData: org.apache.spark.sql.DataFrame = [trip: struct<dropoff_latitude: string, dropoff_longitude: string ... 18 more fields>]

## Write the data to Cosmos DB

The following cell selects the trip data from the stream and writes it to Cosmos DB. This is the data structure that was created in the previous cell by applying a schema to the value data retrieved from kafka.

This stream only runs for 10 seconds (10000ms). Please make sure that the Stream-taxi-data-to-Kafka notebook is actively streaming data into Kafka during this time.

In [5]:
taxiData.select("trip").writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(configMap).option("checkpointLocation", "cosmoscheckpointlocation").start.awaitTermination(10000)
println("Stream finished.")

Stream finished.

## To verify that data is in Cosmos DB

In the [Azure portal](https://portal.azure.com), select your Cosmos DB account, and then select __Document Explorer__. From the dropdown, select the database and collection that the data is written to. You may need to select __Refresh__ before the data appears. Select the id of one of the entries to view the data in Cosmos DB. The document should contain data similar to the following:

```json
{
  "trip": {
    "fare_amount": "14.5",
    "pickup_longitude": "-73.988777160644531",
    "lpep_dropoff_datetime": "2016-01-01T00:43:11.000",
    "lpep_pickup_datetime": "2016-01-01T00:28:24.000",
    "passenger_count": "2",
    "vendorid": "2",
    "tolls_amount": "0",
    "dropoff_latitude": "40.729816436767578",
    "improvement_surcharge": "0.3",
    "trip_distance": "3.66",
    "dropoff_longitude": "-73.996437072753906",
    "payment_type": "2",
    "store_and_fwd_flag": "N",
    "trip_type": "1",
    "ratecodeid": "1",
    "total_amount": "15.8",
    "pickup_latitude": "40.690895080566406",
    "extra": "0.5",
    "tip_amount": "0",
    "mta_tax": "0.5"
  },
  "id": "abfe6ff1-51a7-46a6-9600-1c330166cf12"
}
```