## To use this notebook

Jupyter Notebooks allow you to modify and run the code in this document. To run a section (known as a 'cell',) select it and then use CTRL + ENTER, or select the play button on the toolbar above. Note that each section already has some example output beneath it, so you can see what the results of running a cell will look like.

NOTE: You must run each cell in order, from top to bottom. Running cells out of order can result in an error.

## Create the Kafka topic

This notebook uses a Kafka topic named `sparktest`. The following cell will create this topic on the Kafka cluster.

NOTE: You must replace `YOUR_KAFKA_ZOOKEEPER_HOSTS` in the following cell with the Zookeeper information for your Kafka cluster. See https://github.com/Azure-Samples/hdinsight-spark-scala-kafka for information on how to get this value from the Kafka cluster.

In [1]:
%%sh
KAFKAZKHOSTS=YOUR_KAFKA_ZOOKEEPER_HOSTS
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic sparktest --zookeeper $KAFKAZKHOSTS

Created topic "sparktest".


## Load the Kafka streaming package

The following cell loads the Kafka streaming package for Spark from the Maven repository. Please note the version number (`1.6.2`) at the end of the second line; this must match the version of Spark that you are using. Currently it is set to Spark version 1.6.2.

In [2]:
%%configure
{ "conf": {"spark.jars.packages": "org.apache.spark:spark-streaming-kafka_2.10:1.6.2" }}

## Import classes

The following cell imports classes that are used in this example. The primary ones required to use Spark streaming with Kafka are those in the `org.apache.spark.streaming` and `org.apache.kafka.clients` namespce.

In [3]:
import java.util.HashMap
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1478787730229_0004,spark,idle,Link,Link,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.
import scala.util.Random

## Kafka configuration

In the following cell, replace the value of the `kafkaZkHosts` and `kafkaBrokers` with the Zookeeper hosts and Kafka broker values for your Kafka on HDInsight cluster. See [https://github.com/Azure-Samples/hdinsight-spark-scala-kafka](https://github.com/Azure-Samples/hdinsight-spark-scala-kafka) for information on how to get these values from the Kafka cluster.

In [4]:
// The Kafka topic used to write, and then read from
val topic="sparktest"
// The Zookeeper hosts for the Kafka cluster. This is used when reading from Kafka
val kafkaZkHosts="###ADD YOUR KAFKA ZOOKEEPER HOST INFO HERE###"
// The Kafka broker hosts for the Kafka cluster
val kafkaBrokers="###ADD YOUR KAFKA BROKERS INFO HERE###"
// The consumer group used when reading from kafka
val group="mygroup"
// Create a map containing the topic name and how many consumer threads to create when reading
val topicMap = Map(topic -> 5)
// The batching interval when reading from Kafka
val batchInterval = 2

// The number of messages to write to the Kafka topic
val numMsgs = 1000
// Sentences that will be randomly written to Kafka
val sentences: List[String] = List(
        "the cow jumped over the moon", 
        "an apple a day keeps the doctor away", 
        "four score and seven years ago", 
        "snow white and the seven dwarfs", 
        "i am at two with nature")

sentences: List[String] = List(the cow jumped over the moon, an apple a day keeps the doctor away, four score and seven years ago, snow white and the seven dwarfs, i am at two with nature)

## Create a StreamingContext

The following cell creates a new `StreamingContext` that reads a batch of messages from Kafka, breaks each into individual words, and then counts the words. The result is saved to a temporary table.

Note that this runs as a background process, however it will timeout after `batchInterval * 5 * 1000` seconds. If you want to stop it before the timeout, you can use `StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }`.

In [5]:
// A function that creates a streaming context
def createStreamingContext(): StreamingContext = {
    // Create a new StreamingContext from the default context.
    val ssc = new StreamingContext(sc, Seconds(batchInterval))
    // Create the stream from the Kafka topic
    val messageStream = KafkaUtils.createStream(ssc, kafkaZkHosts, group, topicMap).map(_._2)
    // Split the data on space to extract the words
    val wordStream = messageStream.flatMap(_.split(" "))
    // A function to update/store the count for each word
    val updateCount = (values: Seq[Int], state: Option[Int]) => {
        val current = values.sum
        val previous = state.getOrElse(0)
        Some(current + previous)
    }
    // Get a running count
    val runningCountStream = wordStream.map { x => (x, 1) }.updateStateByKey(updateCount)
    // Save the data to a temporary table
    runningCountStream.foreachRDD { rdd => 
                                  sqlContext.createDataFrame(rdd).toDF("word", "count").registerTempTable("wordcount")
                                  rdd.take(1)
                                  }
    // Tell the stream to keep the data around for a minute, so it's there when we query later
    ssc.remember(Minutes(1))
    // Checkpoint for fault-tolerance
    ssc.checkpoint("/")
    // Return the StreamingContext
    ssc
}

// Stop any existing StreamingContext 
val stopActiveContext = true
if (stopActiveContext) {    
  StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
} 

// Get or create a StreamingContext
val ssc = StreamingContext.getActiveOrCreate(createStreamingContext)

// This starts the StreamingContext in the background. 
ssc.start()

// Set the stream to run with a timeout of batchInterval * 5 * 1000 seconds
ssc.awaitTerminationOrTimeout(batchInterval * 5 * 1000)

res22: Boolean = false

## Send messages to Kafka

Now that the stream to count words is started, use the next cell to write some random sentences out to Kafka. These will be picked up by the stream started in the previous cell and counts are logged to the `batch_word_count` temporary table.

In [6]:
// Configure the producer properties - these are used to write to Kafka
val props = new HashMap[String, Object]()
// Set the broker hosts
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
// Configure the serializers
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
// Create the producer
val producer = new KafkaProducer[String, String](props)

// Send numMsgs to the Kafka topic
(1 to numMsgs).foreach { messageNum => 
    // Randomly pick a sentence
    val sentence = Random.shuffle(sentences).take(1)(0)
    // Create the record
    val message = new ProducerRecord[String, String](topic, null, sentence)
    // Send the item to the topic
    producer.send(message)
}

## View the word counts

Finally, the following cell selects the count of words from the temporary table.

In [7]:
%%sql
select * from wordcount