# Spark structured streaming with Kafka on HDInsight

This notebook demonstrates how to use Spark Structured Streaming with Kafka on HDInsight. It uses data on taxi trips, which is provided by New York City.

The data set used by this notebook is from [2016 Green Taxi Trip Data](https://data.cityofnewyork.us/Transportation/2016-Green-Taxi-Trip-Data/hvrh-b6nb).

## To use this notebook

Jupyter Notebooks allow you to modify and run the code in this document. To run a section (known as a 'cell',) select it and then use CTRL + ENTER, or select the play button on the toolbar above. Note that each section already has some example output beneath it, so you can see what the results of running a cell will look like.

NOTE: You must run each cell in order, from top to bottom. Running cells out of order can result in an error.

## Requirements

* An Azure Virtual Network
* A Spark (2.2.0) on HDInsight 3.6 cluster, inside the virtual network
* A Kafka on HDInsight 3.6 cluster, inside the virtual network

## Load packages

Run the next cell to load packages used by this notebook:

* spark-streaming-kafka-0-8_2.10, version 2.2.0 - Used to read and write data with Kafka.

__NOTE__: The first time you run this block, it may take a minute or longer. This happens because the Spark cluster must retrieve the packages from the Maven repository on the internet.

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1522866477790_0008,spark,idle,Link,Link,
7,application_1522866477790_0012,spark,idle,Link,Link,


## Create the Kafka topic

In the next cell, you must provide the Zookeeper host information for your Kafka cluster. Use the following steps to get this information:

* From __Bash__ or other Unix shell:

    ```bash
CLUSTERNAME='the name of your HDInsight cluster'
PASSWORD='the password for your cluster login account'
curl -u admin:$PASSWORD -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
    ```

* From __Azure PowerShell__:

    ```powershell
$creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
$clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
$resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
    -Credential $creds `
    -UseBasicParsing
$respObj = ConvertFrom-Json $resp.Content
$zkHosts = $respObj.host_components.HostRoles.host_name[0..1]
($zkHosts -join ":2181,") + ":2181"
    ````

The return value is similar to the following example:

`zk0-kafka.ztgnbfvxu2mudoa5h5zzc1uncg.cx.internal.cloudapp.net:2181,zk1-kafka.ztgnbfvxu2mudoa5h5zzc1uncg.cx.internal.cloudapp.net:2181`

Replace the `YOUR_ZOOKEEPER_HOSTS` in the next cell with the returned value, and then run the cell

In [None]:
%%bash 
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper 'YOUR_ZOOKEEPER_HOSTS'

## Retrieve data on taxi trips

Run the next cell to load data on taxi trips in New York City. The data is loaded into a dataframe and then the dataframe is displayed as the cell output.

In [3]:
import spark.implicits._
// Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
val result = scala.io.Source.fromURL(url).mkString

// Create a dataframe from the JSON data
val taxiDF = spark.read.json(Seq(result).toDS)

// Display the dataframe containing trip data
taxiDF.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1522866477790_0013,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
+------------------+-------------------+-----+-----------+---------------------+---------------------+--------------------+-------+---------------+------------+------------------+-------------------+----------+------------------+----------+------------+------------+-------------+---------+--------+
|  dropoff_latitude|  dropoff_longitude|extra|fare_amount|improvement_surcharge|lpep_dropoff_datetime|lpep_pickup_datetime|mta_tax|passenger_count|payment_type|   pickup_latitude|   pickup_longitude|ratecodeid|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|trip_distance|trip_type|vendorid|
+------------------+-------------------+-----+-----------+---------------------+---------------------+--------------------+-------+---------------+------------+------------------+-------------------+----------+------------------+----------+------------+------------+-------------+---------+--------+
|40.698043823242188|-73.924278259277344|  0.5|          8|       

## Set the Kafka broker hosts information

In the next cell, replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts for your Kafka cluster. This is used to write data to the Kafka cluster. To get the broker host information, use one of the following methods:

* From Bash or other Unix shell:

    ```bash
CLUSTERNAME='the name of your HDInsight cluster'
PASSWORD='the password for your cluster login account'
curl -u admin:$PASSWORD -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2
    ```

* From Azure Powershell:

    ```powershell
$creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
$clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
$resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
  -Credential $creds -UseBasicParsing
$respObj = ConvertFrom-Json $resp.Content
$brokerHosts = $respObj.host_components.HostRoles.host_name[0..1]
($brokerHosts -join ":9092,") + ":9092"
    ```

In [4]:
// The Kafka broker hosts and topic used to write to Kafka
val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
val kafkaTopic="tripdata"

println("Finished setting Kafka broker and topic configuration.")

Finished setting Kafka broker and topic configuration.

## Send the data to Kafka

In the following cell, the `vendorid` field is used as the key value for the Kafka message. The key is used by Kafka when partitioning data. All of the fields are stored in the Kafka message as a JSON string value.

Run the following cell save the data to Kafka using a batch query. 

In [5]:
// Select the vendorid as the key and save the JSON string as the value.
val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
println("Data sent to Kafka")

Data sent to Kafka

## Declare a schema

The following cell demonstrates how to use a schema when reading JSON data from kafka.

In [7]:
// Import bits useed for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Define a schema for the data
val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
// Reproduced here for readability
//val schema = (new StructType)
//   .add("dropoff_latitude", StringType)
//   .add("dropoff_longitude", StringType)
//   .add("extra", StringType)
//   .add("fare_amount", StringType)
//   .add("improvement_surcharge", StringType)
//   .add("lpep_dropoff_datetime", StringType)
//   .add("lpep_pickup_datetime", StringType)
//   .add("mta_tax", StringType)
//   .add("passenger_count", StringType)
//   .add("payment_type", StringType)
//   .add("pickup_latitude", StringType)
//   .add("pickup_longitude", StringType)
//   .add("ratecodeid", StringType)
//   .add("store_and_fwd_flag", StringType)
//   .add("tip_amount", StringType)
//   .add("tolls_amount", StringType)
//   .add("total_amount", StringType)
//   .add("trip_distance", StringType)
//   .add("trip_type", StringType)
//   .add("vendorid", StringType)

println("Schema declared")

Schema declared

## Select data and start the stream

The following cell demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. In this example, the select retrieves the message (`value` field) from Kafka and applies the schema to it. The data is then written to HDFS (WASB or ADL) in parquet format.


In [8]:
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data and write to file
val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
println("Wrote data to file")

Wrote data to file

You can verify that the files were created by running the following cell. It lists the files in the /example/tripdata directory.

In [10]:
%%bash
hdfs dfs -ls /example/batchtripdata

Found 9 items
-rw-r--r--   1 livy supergroup          0 2018-04-05 19:06 /example/batchtripdata/_SUCCESS
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00000-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00001-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00002-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00003-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00004-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:06 /example/batchtripdata/part-00005-7b04ccb7-e0cb-46b2-8285-2d26ca6eabd3-c000.snappy.par

While the previous example used a batch query, the following cell demonstrates how to do the same thing using a streaming query. 

In [11]:
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
println("Wrote data to file")

Wrote data to file

Run the following cell to verify that the files were written by the streaming query.

In [12]:
%%bash
hdfs dfs -ls /example/streamingtripdata

Found 9 items
drwxr-xr-x   - livy supergroup          0 2018-04-05 19:07 /example/streamingtripdata/_spark_metadata
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00000-2d412a48-069f-4337-8e09-91d9696953ee-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00001-d54b3575-a3dd-46dd-9f9c-86500eacbedd-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00002-bfbb884e-b4af-4ef4-bc6b-f30ed0760e1f-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00003-00e5d3e6-b42c-4822-a05a-2b82d2b15c3b-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00004-6e91284f-12e7-4a8c-8742-e8b803516329-c000.snappy.parquet
-rw-r--r--   1 livy supergroup       2160 2018-04-05 19:07 /example/streamingtripdata/part-00005-9cd6dafd-4ac2-403

For more information, see the [Structured Streaming + Kafka integration guide](https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html).