In [None]:
%AddDeps org.apache.spark spark-streaming-kafka-0-10_2.11 2.0.2

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.util.LongAccumulator
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

// Parameters to connect to Message Hub using the Kafka client
// Note the sasl.jaas.config attribute which is new in Kafka 0.10.2
// >>>>> REPLACE $MH_USER_ID$ and $MH_PASSWORD$ with your Message Hub credentials
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "streaminganalysis2",
  "security.protocol" -> "SASL_SSL",
  "sasl.mechanism" -> "PLAIN",
  "ssl.protocol" -> "TLSv1.2",
  "ssl.enabled.protocols" -> "TLSv1.2",
  "ssl.endpoint.identification.algorithm" -> "HTTPS",
  "sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$MH_USER_ID$\" password=\"$MH_PASSWORD$\";",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

// The topic name for the events
val topics = Array("carevents")


// An accumulator to sum the number of refills reported in the incoming Kafka stream
object RefillsAccumulator {
    @volatile private var instance: LongAccumulator = null
    
    def getInstance(sc: SparkContext): LongAccumulator = {
        if (instance == null) {
            synchronized {
                if (instance == null) {
                    instance = sc.longAccumulator("RefillsAccumulator")
                }
            }
        }
        instance
    }
}


// Function to create StreamingContext properly
def creatingFunc(sc: SparkContext): StreamingContext = {
  // Batch interval is 5 seconds
  val ssc = new StreamingContext(sc, Seconds(5))

  // Subscribe to the topic and generate a stream of messages every batch interval
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  // The stream consists of Kafka ConsumerRecord elements. Just want the value from the messages.
  val events = stream.map(record => (record.value))

  // Events look like this:
  //  {  "vehicleNumber" : "car8",  "fuelRemaining" : 4.3912726149684245,  "fuelCapacity" : 11.0,  "distance" : 77060.38816165098,  "refills" : 0,  "ts" : "2017-04-03T08:51:38.903"}

  // Strip out the JSON formatting characters to give plain strings to look like this:
  //  vehicleNumber:car8,fuelRemaining:4.3912726149684245,fuelCapacity:11.0,distance:77060.38816165098,refills:0,ts:2017-04-03T08:51:38.903
  val eventsAsStrings = events.map(_.replaceAll("[{ }\"\u00A0]", ""))

  // Split to give an array of strings per value to look like this:
  //  Array(vehicleNumber:car8, fuelRemaining:4.3912726149684245, fuelCapacity:11.0, distance:77060.38816165098, refills:0, ts:2017-04-03T08:51:38.903)
  val eventsAsPerValueStrings = eventsAsStrings.map(_.split(','))

  // Split each string into another array of strings to look like this:
  //  Array(Array(vehicleNumber, car8),
  //        Array(fuelRemaining, 4.3912726149684245),
  //        Array(fuelCapacity, 11.0),
  //        Array(distance, 77060.38816165098),
  //        Array(refills, 0),
  //        Array(ts, 2017-04-03T08, 51, 38.903))
  val eventsAsSeparateStrings = eventsAsPerValueStrings.map(_.map(_.split(':')))

  // And finally take each array of strings with exactly two members, convert into a Tuple, and then a Map to look like this:
  //  Map(vehicleNumber -> car8,
  //      fuelRemaining -> 4.3912726149684245,
  //      fuelCapacity -> 11.0,
  //      distance -> 77060.38816165098,
  //      refills -> 0)
  val eventsAsMaps = eventsAsSeparateStrings.map(_.collect({case Array(s0,s1) => (s0,s1)}).toMap)

  // Extract the refills information from the map and create a stream consisting only of Long integers
  val refills = eventsAsMaps.map(_("refills").toLong)

  // Accumulate the number of refills over time
  refills.foreachRDD{ (rdd: RDD[Long], time: Time) =>
      val refillsAccumulator = RefillsAccumulator.getInstance(rdd.sparkContext)
      rdd.foreach{r => refillsAccumulator.add(r)}
      println("Refills: " + refillsAccumulator.value);
  }

  ssc
}

// Main initialisation, picking up context if it already exists
val sc = SparkContext.getOrCreate()
val ssc = StreamingContext.getActiveOrCreate(() => creatingFunc(sc))

// Start the processing
ssc.start()
ssc.awaitTermination()

In [None]:
// This cell can be used to stop existing StreamingContexts. May be helpful when stopping and restarting the analytics.
StreamingContext.getActive.foreach{_.stop(stopSparkContext = false)}
println("Done")