From 642f0fd3755830b4d2fdeb38bfa689676d13e7a5 Mon Sep 17 00:00:00 2001 From: wanghongfeng Date: Fri, 11 Sep 2015 21:38:32 +0800 Subject: [PATCH 1/5] support commit offset after consumed --- .../scala/kafka/consumer/ConsumerConfig.scala | 4 ++ .../kafka/consumer/ConsumerIterator.scala | 41 ++++++++++++++++--- .../scala/kafka/consumer/KafkaStream.scala | 4 +- .../consumer/ZookeeperConsumerConnector.scala | 5 ++- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 9ebbee6c16dc8..666086798295e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -181,6 +181,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) + + /** commit offset after consumed */ + val commitAfterConsumed = props.getBoolean("manual.commit.after.consumed",false); + validate(this) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 78fbf75651583..fb1570135e1c2 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -23,6 +23,13 @@ import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference import kafka.message.{MessageAndOffset, MessageAndMetadata} import kafka.common.{KafkaException, MessageSizeTooLargeException} +import kafka.common.TopicAndPartition +import kafka.utils.Pool +import kafka.common.TopicAndPartition +import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition +import kafka.common.TopicAndPartition +import kafka.common.TopicAndPartition /** @@ -34,19 +41,26 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String,val commitAfterConsumed: Boolean) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) + private val topicPartition2Info = new Pool[TopicAndPartition, PartitionTopicInfo] + private val iteratorOffsetMap = new Pool[TopicAndPartition, Long] override def next(): MessageAndMetadata[K, V] = { val item = super.next() if(consumedOffset < 0) throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) + if(!commitAfterConsumed){ + currentTopicInfo.resetConsumeOffset(consumedOffset) + }else{ + iteratorOffsetMap.put(TopicAndPartition(item.topic,item.partition), consumedOffset); + } + topicPartition2Info.put(TopicAndPartition(currentTopicInfo.topic,currentTopicInfo.partitionId), currentTopicInfo); val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() @@ -77,9 +91,11 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk val cdcFetchOffset = currentDataChunk.fetchOffset val ctiConsumeOffset = currentTopicInfo.getConsumeOffset if (ctiConsumeOffset < cdcFetchOffset) { - error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" +// error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" +// .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) +// currentTopicInfo.resetConsumeOffset(cdcFetchOffset) + info("consumed offset: %d doesn't match fetch offset: %d for %s;\n " .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) - currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } localCurrent = currentDataChunk.messages.iterator @@ -93,7 +109,9 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk } var item = localCurrent.next() // reject the messages that have already been consumed - while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { + + var iteratorOffset = iteratorOffsetMap.get(TopicAndPartition(currentTopicInfo.topic, currentTopicInfo.partitionId)); + while (item.offset < iteratorOffset && localCurrent.hasNext) { item = localCurrent.next() } consumedOffset = item.nextOffset @@ -109,6 +127,19 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk current.set(null) } } + + def resetConsumeOffset(topic:String,partitionId:Int,offset:Long) { + if (commitAfterConsumed) { + val targetTopicPartitionInfo = topicPartition2Info.get(TopicAndPartition(topic,partitionId)); + targetTopicPartitionInfo.resetConsumeOffset(offset); + } + } + + def resetConsumeOffset() { + if (!commitAfterConsumed) + currentTopicInfo.resetConsumeOffset(consumedOffset) + } + } class ConsumerTimeoutException() extends RuntimeException() diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index 805e91677034e..a6c8c8592ea1d 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,11 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val clientId: String) + val clientId: String,val commitAfterConsumed: Boolean) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId,commitAfterConsumed); /** * Create an iterator over messages in the stream. diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 3e1718bc7ca6c..edcbe087df157 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -230,7 +230,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId,config.commitAfterConsumed) (queue, stream) }) ).flatten.toList @@ -279,6 +279,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { if (checkpointedZkOffsets.get(topicPartition) != offset) { + info("offset commiting " + topicPartition +" offset = "+ offset.toString()); val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) checkpointedZkOffsets.put(topicPartition, offset) @@ -922,7 +923,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.consumerTimeoutMs, keyDecoder, valueDecoder, - config.clientId) + config.clientId,config.commitAfterConsumed) (queue, stream) }).toList From c9a095278ece479c5b9b9a76f7899919030234fc Mon Sep 17 00:00:00 2001 From: wanghongfeng Date: Sat, 12 Sep 2015 02:26:01 +0800 Subject: [PATCH 2/5] modify test case --- .../scala/unit/kafka/consumer/ConsumerIteratorTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index c0355cc0135c6..c906d9737686f 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -76,7 +76,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { consumerConfig.consumerTimeoutMs, new StringDecoder(), new StringDecoder(), - clientId = "") + clientId = "", + false) val receivedMessages = (0 until 5).map(i => iter.next.message).toList assertFalse(iter.hasNext) @@ -99,7 +100,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { ConsumerConfig.ConsumerTimeoutMs, new FailDecoder(), new FailDecoder(), - clientId = "") + clientId = "", + false) val receivedMessages = (0 until 5).map{ i => assertTrue(iter.hasNext) From 8df79ad7f27d05a215bc1e9c3442c7243a59bec6 Mon Sep 17 00:00:00 2001 From: wanghongfeng Date: Sat, 12 Sep 2015 13:29:31 +0800 Subject: [PATCH 3/5] modify consumerIterater to go through unit test --- .../kafka/consumer/ConsumerIterator.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index fb1570135e1c2..75713503a2fa1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -91,11 +91,14 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk val cdcFetchOffset = currentDataChunk.fetchOffset val ctiConsumeOffset = currentTopicInfo.getConsumeOffset if (ctiConsumeOffset < cdcFetchOffset) { -// error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" -// .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) -// currentTopicInfo.resetConsumeOffset(cdcFetchOffset) - info("consumed offset: %d doesn't match fetch offset: %d for %s;\n " - .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + if (!commitAfterConsumed) { + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + currentTopicInfo.resetConsumeOffset(cdcFetchOffset) + } else { + info("consumed offset: %d doesn't match fetch offset: %d for %s;\n " + .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + } } localCurrent = currentDataChunk.messages.iterator @@ -110,7 +113,10 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk var item = localCurrent.next() // reject the messages that have already been consumed - var iteratorOffset = iteratorOffsetMap.get(TopicAndPartition(currentTopicInfo.topic, currentTopicInfo.partitionId)); + var iteratorOffset = iteratorOffsetMap.get(TopicAndPartition(currentTopicInfo.topic, currentTopicInfo.partitionId)); + if(!commitAfterConsumed){ + iteratorOffset = currentTopicInfo.getConsumeOffset; + } while (item.offset < iteratorOffset && localCurrent.hasNext) { item = localCurrent.next() } From 9c19b28c726455c3183f2764f82b3aeb6d490c30 Mon Sep 17 00:00:00 2001 From: wanghongfeng Date: Sat, 12 Sep 2015 23:47:38 +0800 Subject: [PATCH 4/5] modify consumerConfig comment --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 666086798295e..c5ef1f228eb34 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -182,7 +182,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy) - /** commit offset after consumed */ + /** commit offset after consumed switch,default false*/ val commitAfterConsumed = props.getBoolean("manual.commit.after.consumed",false); validate(this) From 3de4ccd14bd7a1c91487b4b9b50fec18e12214ac Mon Sep 17 00:00:00 2001 From: wanghongfeng Date: Thu, 17 Sep 2015 11:38:23 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=97=A5=E5=BF=97=E7=BA=A7=E5=88=AB?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=8Cpom=E6=96=87=E4=BB=B6=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/consumer/ConsumerIterator.scala | 2 +- pom.xml | 91 +++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 pom.xml diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 75713503a2fa1..4a26e8bb55b75 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -96,7 +96,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } else { - info("consumed offset: %d doesn't match fetch offset: %d for %s;\n " + debug("consumed offset: %d doesn't match fetch offset: %d for %s;\n " .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) } } diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000000..20f4672f8c963 --- /dev/null +++ b/pom.xml @@ -0,0 +1,91 @@ + + + 4.0.0 + com.xiaoju.nova.strategy + kafka_0.8.2.1_nova + 1.0.0-SNAPSHOT + POM was created from install:install-file + + + org.objenesis + objenesis + 1.2 + test + + + org.slf4j + slf4j-log4j12 + 1.7.6 + test + + + junit + junit + 4.1 + test + + + com.yammer.metrics + metrics-core + 2.2.0 + compile + + + org.scala-lang + scala-library + 2.10.4 + compile + + + org.scalatest + scalatest_2.10 + 1.9.1 + test + + + org.easymock + easymock + 3.0 + test + + + org.apache.kafka + kafka-clients + 0.8.2.1 + compile + + + org.apache.zookeeper + zookeeper + 3.4.6 + compile + + + net.sf.jopt-simple + jopt-simple + 3.2 + compile + + + com.101tec + zkclient + 0.3 + compile + + + + + + nexus-releases-id + Nexus Release Repository + http://10.10.65.4:8081/nexus/content/repositories/releases/ + + + nexus-snapshots-id + Nexus Snapshot Repository + http://10.10.65.4:8081/nexus/content/repositories/snapshots/ + + + +