Skip to content

Commit

Permalink
Address all the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent 96c7a1d commit 4854ee9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
package org.apache.spark.streaming.kafka

import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.Map
import scala.collection.mutable
import scala.reflect.{classTag, ClassTag}

import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.serializer.Decoder
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
import org.I0Itec.zkclient.ZkClient

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.storage.{StreamBlockId, StorageLevel}
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
import org.apache.spark.util.Utils

private[streaming]
class ReliableKafkaReceiver[
Expand All @@ -45,27 +46,33 @@ class ReliableKafkaReceiver[
storageLevel: StorageLevel)
extends Receiver[Any](storageLevel) with Logging {

/** High level consumer to connect to Kafka */
/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null

/** zkClient to connect to Zookeeper to commit the offsets */
/** zkClient to connect to Zookeeper to commit the offsets. */
private var zkClient: ZkClient = null

private val groupId = kafkaParams("group.id")

private lazy val env = SparkEnv.get
private def conf() = SparkEnv.get.conf

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

/** A HashMap to manage the offset for each topic/partition, this HashMap is called in
* synchronized block, so mutable HashMap will not meet concurrency issue */
private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
* synchronized block, so mutable HashMap will not meet concurrency issue.
*/
private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null

/** A concurrent HashMap to store the stream block id and related offset snapshot */
private lazy val blockOffsetMap =
new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]
/** A concurrent HashMap to store the stream block id and related offset snapshot. */
private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null

private lazy val blockGeneratorListener = new BlockGeneratorListener {
/** Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit.
*/
private var blockGenerator: BlockGenerator = null

/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
private final class OffsetCheckpointListener extends BlockGeneratorListener {
override def onStoreData(data: Any, metadata: Any): Unit = {
if (metadata != null) {
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
Expand Down Expand Up @@ -96,10 +103,6 @@ class ReliableKafkaReceiver[
}
}

/** Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit */
private var blockGenerator: BlockGenerator = null

override def onStop(): Unit = {
if (consumerConnector != null) {
consumerConnector.shutdown()
Expand All @@ -111,13 +114,33 @@ class ReliableKafkaReceiver[
zkClient = null
}

blockGenerator.stop()
if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}

override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")

blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
// Initialize the topic-partition / offset hash map.
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]

// Initialize the stream block id / offset snapshot hash map.
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf())

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
Expand All @@ -133,7 +156,7 @@ class ReliableKafkaReceiver[

val consumerConfig = new ConsumerConfig(props)

assert(consumerConfig.autoCommitEnable == false)
assert(!consumerConfig.autoCommitEnable)

logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
consumerConnector = Consumer.create(consumerConfig)
Expand All @@ -156,41 +179,45 @@ class ReliableKafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Executors.newFixedThreadPool(topics.values.sum)
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")

try {
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
executorPool.submit(new Runnable {
override def run(): Unit = {
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
for (msgAndMetadata <- stream) {
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
val metadata = (topicAndPartition, msgAndMetadata.offset)

blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
}
} catch {
case e: Throwable => logError("Error handling message; existing", e)
}
}
})
executorPool.submit(new MessageHandler(stream))
}
}
} finally {
executorPool.shutdown()
}
}

/** A inner class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
for (msgAndMetadata <- stream) {
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
val metadata = (topicAndPartition, msgAndMetadata.offset)

blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
}
} catch {
case e: Throwable => logError("Error handling message; existing", e)
}
}
}

/**
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
* metadata schema in Zookeeper.
*/
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
if (zkClient == null) {
logError(s"zkClient $zkClient should be initialized at started")
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
return
}

Expand All @@ -205,7 +232,7 @@ class ReliableKafkaReceiver[
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
}

logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
s"partition ${topicAndPart.partition}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
ssc.start()
ssc.awaitTermination(3000)

// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
assert(sent.size === result.size)
// Verify whether each message is the same as the data to be verified.
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }

ssc.stop()
}

test("Verify the offset commit") {
// Verify the corretness of offset commit mechanism.
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
Expand All @@ -97,8 +101,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === 0L)

// Do this to consume all the message of this group/topic.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Expand All @@ -109,6 +115,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
ssc.awaitTermination(3000)
ssc.stop()

// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === 29L)
}

Expand Down Expand Up @@ -136,8 +143,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

// Before started, verify all the group/topic/partition offsets are 0.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }

// Consuming all the data sent to the broker which will potential commit the offsets internally.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Expand All @@ -148,9 +157,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
ssc.awaitTermination(3000)
ssc.stop()

// Verify the offset for each group/topic to see whether they are equal to the expected one.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
}

/** Getting partition offset from Zookeeper. */
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
assert(zkClient != null, "Zookeeper client is not initialized")

Expand Down

0 comments on commit 4854ee9

Please sign in to comment.