From 0ec4d8b9fac2f0222122e1d8cb833ef824d84389 Mon Sep 17 00:00:00 2001 From: Hugo Louro Date: Thu, 23 Mar 2017 08:16:40 -0700 Subject: [PATCH] STORM-2432: Storm-Kafka-Client Trident Spout Seeks Incorrect Offset With UNCOMMITTED_LATEST Strategy - Add check to execute first poll offset strategy only for the first poll rather than always --- .../spout/trident/KafkaTridentSpoutEmitter.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java index 860785306b3..87af0cf1315 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -61,6 +61,7 @@ public class KafkaTridentSpoutEmitter implements IOpaquePartitionedTrident // Bookkeeping private final KafkaTridentSpoutManager kafkaManager; + private Set firstPoll = new HashSet<>(); // set of topic-partitions for which first poll has already occurred // Declare some KafkaTridentSpoutManager references for convenience private final long pollTimeoutMs; @@ -149,7 +150,7 @@ private void emitTuples(TridentCollector collector, ConsumerRecords record /** * Determines the offset of the next fetch. For failed batches lastBatchMeta is not null and contains the fetch - * offset of the failed batch. In this scenario the next fetch will take place at the offset of the failed batch. + * offset of the failed batch. In this scenario the next fetch will take place at offset of the failed batch + 1. * When the previous batch is successful, lastBatchMeta is null, and the offset of the next fetch is either the * offset of the last commit to kafka, or if no commit was yet made, the offset dictated by * {@link KafkaSpoutConfig.FirstPollOffsetStrategy} @@ -159,9 +160,10 @@ private void emitTuples(TridentCollector collector, ConsumerRecords record private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { if (lastBatchMeta != null) { kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch - LOG.debug("Seeking fetch offset to next offset after last offset from previous batch"); - } else { - LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit to Kafka"); + LOG.debug("Seeking fetch offset to next offset after last offset from previous batch for topic-partition [{}]", tp); + } else if (isFirstPoll(tp)) { + LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit to Kafka for topic-partition [{}]", tp); + firstPoll.add(tp); final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); if (committedOffset != null) { // offset was committed for this TopicPartition if (firstPollOffsetStrategy.equals(EARLIEST)) { @@ -185,6 +187,10 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBa return fetchOffset; } + private boolean isFirstPoll(TopicPartition tp) { + return !firstPoll.contains(tp); + } + // returns paused topic-partitions. private Collection pauseTopicPartitions(TopicPartition excludedTp) { final Set pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());