From 9b95f3da608760b334bb8a9a272e64e0fb9fb3ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sat, 12 Aug 2017 16:56:45 +0200 Subject: [PATCH 1/2] STORM-2666: Fix storm-kafka-client spout sometimes emitting messages that were already committed. Expand tests, add some runtime validation, minor refactoring to increase code readability. Ensure OffsetManager commits as many offsets as possible when an offset void (deleted offsets) occurs, rather than just up to the gap. --- .../apache/storm/kafka/spout/KafkaSpout.java | 58 ++-- .../KafkaSpoutRetryExponentialBackoff.java | 37 +-- .../kafka/spout/KafkaSpoutRetryService.java | 9 +- .../kafka/spout/internal/OffsetManager.java | 8 +- .../kafka/spout/KafkaSpoutCommitTest.java | 23 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 56 +++- ...KafkaSpoutRetryExponentialBackoffTest.java | 287 ++++++++++++++++++ .../spout/SingleTopicKafkaSpoutTest.java | 62 +++- .../spout/internal/OffsetManagerTest.java | 84 +++++ 9 files changed, 555 insertions(+), 69 deletions(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index fbd869c5edb..4f9dacb335c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -34,7 +34,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; - +import org.apache.commons.lang.Validate; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -147,10 +147,13 @@ private boolean isAtLeastOnce() { // =========== Consumer Rebalance Listener - On the same thread as the caller =========== private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + private Collection previousAssignment = new HashSet<>(); + @Override public void onPartitionsRevoked(Collection partitions) { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + previousAssignment = partitions; if (isAtLeastOnce() && initialized) { initialized = false; commitOffsetsForAckedTuples(); @@ -175,20 +178,14 @@ private void initialize(Collection partitions) { * Emitted messages for partitions that are no longer assigned to this spout can't * be acked and should not be retried, hence remove them from emitted collection. */ - Set partitionsSet = new HashSet<>(partitions); - Iterator msgIdIterator = emitted.iterator(); - while (msgIdIterator.hasNext()) { - KafkaSpoutMessageId msgId = msgIdIterator.next(); - if (!partitionsSet.contains(msgId.getTopicPartition())) { - msgIdIterator.remove(); - } - } + emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition())); } - for (TopicPartition tp : partitions) { + Set newPartitions = new HashSet<>(partitions); + newPartitions.removeAll(previousAssignment); + for (TopicPartition tp : newPartitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); - // Add offset managers for the new partitions. // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) { offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); @@ -202,18 +199,14 @@ private void initialize(Collection partitions) { * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { - long fetchOffset; if (committedOffset != null) { // offset was committed for this TopicPartition if (firstPollOffsetStrategy.equals(EARLIEST)) { kafkaConsumer.seekToBeginning(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); - fetchOffset = kafkaConsumer.position(tp); } else { // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. - fetchOffset = committedOffset.offset() + 1; - kafkaConsumer.seek(tp, fetchOffset); + kafkaConsumer.seek(tp, committedOffset.offset() + 1); } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { @@ -221,9 +214,8 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } - fetchOffset = kafkaConsumer.position(tp); } - return fetchOffset; + return kafkaConsumer.position(tp); } } @@ -231,7 +223,7 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { @Override public void nextTuple() { try { - if (initialized) { + if (initialized) { if (commit()) { commitOffsetsForAckedTuples(); } @@ -339,12 +331,15 @@ private void emit() { */ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - final KafkaSpoutMessageId msgId = retryService.getMessageId(record); + final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { + Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), + "The spout is about to emit a message that has already been committed." + + " This should never occur, and indicates a bug in the spout"); final List tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); @@ -408,6 +403,23 @@ private void commitOffsetsForAckedTuples() { for (Map.Entry tpOffset : nextCommitOffsets.entrySet()) { //Update the OffsetManager for each committed partition, and update numUncommittedOffsets final TopicPartition tp = tpOffset.getKey(); + long position = kafkaConsumer.position(tp); + long committedOffset = tpOffset.getValue().offset(); + if (position < committedOffset) { + /* + * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, + * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. + * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. + * Skip the consumer forward to the committed offset drop the current waiting to emit list, + * since it'll likely contain committed offsets. + */ + LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", + position, committedOffset); + kafkaConsumer.seek(tp, committedOffset); + waitingToEmit = null; + } + + final OffsetManager offsetManager = offsetManagers.get(tp); long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); numUncommittedOffsets -= numCommittedOffsets; @@ -437,6 +449,8 @@ public void ack(Object messageId) { LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); } } else { + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." + + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } @@ -456,6 +470,8 @@ public void fail(Object messageId) { LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + + " This should never occur barring errors in the RetryService implementation or the spout code."); msgId.incrementNumFails(); if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 6b53779563b..81ef9869533 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -32,7 +32,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.commons.lang.Validate; import org.apache.storm.utils.Time; /** @@ -49,6 +49,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService private final TimeInterval maxDelay; private final int maxRetries; + //This class assumes that there is at most one retry schedule per message id in this set at a time. private final Set retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); private final Set toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups @@ -168,7 +169,7 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval this.delayPeriod = delayPeriod; this.maxRetries = maxRetries; this.maxDelay = maxDelay; - LOG.debug("Instantiated {}", this); + LOG.debug("Instantiated {}", this.toStringImpl()); } @Override @@ -196,13 +197,14 @@ public Map earliestRetriableOffsets() { @Override public boolean isReady(KafkaSpoutMessageId msgId) { boolean retry = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { if (retrySchedule.msgId.equals(msgId)) { retry = true; LOG.debug("Found entry to retry {}", retrySchedule); + break; //Stop searching if the message is known to be ready for retry } } else { LOG.debug("Entry to retry not found {}", retrySchedule); @@ -221,14 +223,14 @@ public boolean isScheduled(KafkaSpoutMessageId msgId) { @Override public boolean remove(KafkaSpoutMessageId msgId) { boolean removed = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { + toRetryMsgs.remove(msgId); for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { final RetrySchedule retrySchedule = iterator.next(); if (retrySchedule.msgId().equals(msgId)) { iterator.remove(); - toRetryMsgs.remove(msgId); removed = true; - break; + break; //There is at most one schedule per message id } } } @@ -261,15 +263,8 @@ public boolean schedule(KafkaSpoutMessageId msgId) { LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); return false; } else { - if (toRetryMsgs.contains(msgId)) { - for (Iterator iterator = retrySchedules.iterator(); iterator.hasNext(); ) { - final RetrySchedule retrySchedule = iterator.next(); - if (retrySchedule.msgId().equals(msgId)) { - iterator.remove(); - toRetryMsgs.remove(msgId); - } - } - } + //Remove existing schedule for the message id + remove(msgId); final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); retrySchedules.add(retrySchedule); toRetryMsgs.add(msgId); @@ -294,9 +289,9 @@ public int readyMessageCount() { } @Override - public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { - KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); - if (toRetryMsgs.contains(msgId)) { + public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset); + if (isScheduled(msgId)) { for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { if (originalMsgId.equals(msgId)) { return originalMsgId; @@ -308,6 +303,7 @@ public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE private long nextTime(KafkaSpoutMessageId msgId) { + Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once"); final long currentTimeNanos = Time.nanoTime(); final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... ? currentTimeNanos + initialDelay.lengthNanos @@ -317,6 +313,11 @@ private long nextTime(KafkaSpoutMessageId msgId) { @Override public String toString() { + return toStringImpl(); + } + + private String toStringImpl() { + //This is here to avoid an overridable call in the constructor return "KafkaSpoutRetryExponentialBackoff{" + "delay=" + initialDelay + ", ratio=" + delayPeriod + diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index 5147752b588..b70acd7c7b2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; /** * Represents the logic that manages the retrial of failed tuples. @@ -84,9 +84,10 @@ public interface KafkaSpoutRetryService extends Serializable { int readyMessageCount(); /** - * Gets the {@link KafkaSpoutMessageId} for the given record. - * @param record The record to fetch the id for + * Gets the {@link KafkaSpoutMessageId} for the record on the given topic partition and offset. + * @param topicPartition The topic partition of the record + * @param offset The offset of the record * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry. */ - KafkaSpoutMessageId getMessageId(ConsumerRecord record); + KafkaSpoutMessageId getMessageId(TopicPartition topicPartition, long offset); } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 51390720714..b6d36d83b6f 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -92,7 +92,7 @@ public OffsetAndMetadata findNextCommitOffset() { first element after committedOffset in the ascending ordered emitted set. */ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { found = true; nextCommitMsg = currAckedMsg; @@ -103,9 +103,9 @@ public OffsetAndMetadata findNextCommitOffset() { } } } else { - //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); + throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed " + + "offset [" + committedOffset + "] for [" + tp + "]." + + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index afc9b825b6b..c9c684f1e93 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -42,6 +42,8 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import org.mockito.stubbing.OngoingStubbing; + public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -94,7 +96,7 @@ public void testCommitSuccessWithOffsetVoids() { spout.ack(messageId); } - // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4 + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); Map>> emptyConsumerRecords = Collections.emptyMap(); when(consumerMock.poll(anyLong())) @@ -105,25 +107,8 @@ public void testCommitSuccessWithOffsetVoids() { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 4 was last committed offset - //the offset void should be bridged in the next commit - Map commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(4, commits.get(partition).offset()); - - //Trigger second kafka consumer commit - reset(consumerMock); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(emptyConsumerRecords)); - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - spout.nextTuple(); - - inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 9 was last committed offset - commits = commitCapture.getValue(); + Map commits = commitCapture.getValue(); assertTrue(commits.containsKey(partition)); assertEquals(9, commits.get(partition).offset()); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index ab570528de0..b16ba5d96ea 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -35,7 +35,6 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -55,6 +54,11 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.eq; + +import java.util.HashSet; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; public class KafkaSpoutRebalanceTest { @@ -181,7 +185,7 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class))) .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0)) .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); - + //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); @@ -197,4 +201,52 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); } + + @Test + public void testReassignPartitionSeeksForOnlyNewPartitions() { + /* + * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions. + * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those. + */ + + ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); + KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .build(), consumerFactory); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition assignedPartition = new TopicPartition(topic, 1); + TopicPartition newPartition = new TopicPartition(topic, 2); + + //Setup spout with mock consumer so we can get at the rebalance listener + spout.open(conf, contextMock, collectorMock); + spout.activate(); + + //Assign partitions to the spout + ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); + Set assignedPartitions = new HashSet<>(); + assignedPartitions.add(assignedPartition); + consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + reset(consumerMock); + + //Set up committed so it looks like some messages have been committed on each partition + long committedOffset = 500; + when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + + //Now rebalance and add a new partition + consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); + Set newAssignedPartitions = new HashSet<>(); + newAssignedPartitions.add(assignedPartition); + newAssignedPartitions.add(newPartition); + consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions); + + //This partition was previously assigned, so the consumer position shouldn't change + verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); + //This partition is new, and should start at the committed offset + verify(consumerMock).seek(newPartition, committedOffset + 1); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java new file mode 100644 index 00000000000..c543f8b7bca --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java @@ -0,0 +1,287 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; + +public class KafkaSpoutRetryExponentialBackoffTest { + + private final TopicPartition testTopic = new TopicPartition("topic", 0); + private final TopicPartition testTopic2 = new TopicPartition("other-topic", 0); + + private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), 1, TimeInterval.seconds(0)); + } + + private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), TimeInterval.seconds(0), 1, TimeInterval.seconds(1)); + } + + @Test + public void testCanScheduleRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must schedule the message for retry", scheduled, is(true)); + KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(testTopic, offset); + assertThat("The service should return the original message id when asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.isReady(msgId), is(true)); + assertThat(retryService.readyMessageCount(), is(1)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + } + + @Test + public void testCanRescheduleRetry() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must be able to reschedule an already scheduled id", scheduled, is(true)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry yet since it was rescheduled", retryService.isReady(msgId), is(false)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + Time.advanceTime(500); + assertThat("The message should be ready for retry once the full delay has passed", retryService.isReady(msgId), is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + assertThat(retryService.readyMessageCount(), is(1)); + } + } + + @Test + public void testCannotContainMultipleSchedulesForId() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + retryService.remove(msgId); + assertThat("The message should no longer be scheduled", retryService.isScheduled(msgId), is(false)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry because it isn't scheduled", retryService.isReady(msgId), is(false)); + } + } + + @Test + public void testCanRemoveRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + boolean removed = retryService.remove(msgId); + + assertThat(removed, is(true)); + assertThat(retryService.isScheduled(msgId), is(false)); + assertThat(retryService.isReady(msgId), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + } + + @Test + public void testCanHandleMultipleTopics() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are messages from multiple partitions scheduled + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(testTopic2, offset); + msgIdTp1.incrementNumFails(); + msgIdTp2.incrementNumFails(); + + boolean scheduledOne = retryService.schedule(msgIdTp1); + Time.advanceTime(500); + boolean scheduledTwo = retryService.schedule(msgIdTp2); + + //The retry schedules for two messages should be unrelated + assertThat(scheduledOne, is(true)); + assertThat(retryService.isScheduled(msgIdTp1), is(true)); + assertThat(scheduledTwo, is(true)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp1), is(true)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, offset))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp2), is(true)); + Map earliestOffsets = new HashMap<>(); + earliestOffsets.put(testTopic, offset); + earliestOffsets.put(testTopic2, offset); + assertThat(retryService.earliestRetriableOffsets(), is(earliestOffsets)); + + //The service must be able to remove retry schedules for unnecessary partitions + retryService.retainAll(Collections.singleton(testTopic2)); + assertThat(retryService.isScheduled(msgIdTp1), is(false)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic2, offset))); + } + } + + @Test + public void testCanHandleMultipleMessagesOnPartition() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are multiple messages scheduled on a partition + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(testTopic, offset + 1); + msgIdEarliest.incrementNumFails(); + msgIdLatest.incrementNumFails(); + + retryService.schedule(msgIdEarliest); + Time.advanceTime(500); + retryService.schedule(msgIdLatest); + + assertThat(retryService.isScheduled(msgIdEarliest), is(true)); + assertThat(retryService.isScheduled(msgIdLatest), is(true)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + retryService.remove(msgIdEarliest); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdLatest.offset()))); + } + } + + @Test + public void testMaxRetries() { + try (SimulatedTime time = new SimulatedTime()) { + int maxRetries = 3; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + for (int i = 0; i < maxRetries; i++) { + msgId.incrementNumFails(); + } + + //Should be allowed to retry 3 times, in addition to original try + boolean scheduled = retryService.schedule(msgId); + + assertThat(scheduled, is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + + retryService.remove(msgId); + msgId.incrementNumFails(); + boolean rescheduled = retryService.schedule(msgId); + + assertThat("The message should not be allowed to retry once the limit is reached", rescheduled, is(false)); + assertThat(retryService.isScheduled(msgId), is(false)); + } + } + + @Test + public void testMaxDelay() { + try (SimulatedTime time = new SimulatedTime()) { + int maxDelaySecs = 2; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + + Time.advanceTimeSecs(maxDelaySecs); + assertThat("The message should be ready for retry after the max delay", retryService.isReady(msgId), is(true)); + } + } + + private void validateBackoff(int expectedBackoffSeconds, KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) { + Time.advanceTimeSecs(expectedBackoffSeconds - 1); + assertThat("The message should not be ready for retry until the backoff has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat(retryService.isReady(msgId), is(true)); + } + + @Test + public void testExponentialBackoff() { + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(4), Integer.MAX_VALUE, TimeInterval.seconds(Integer.MAX_VALUE)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + msgId.incrementNumFails(); + msgId.incrementNumFails(); //First failure is the initial delay, so not interesting + + //Expecting 4*2^(failCount-1) + List expectedBackoffsSecs = Arrays.asList(new Integer[]{8, 16, 32}); + + for (Integer expectedBackoffSecs : expectedBackoffsSecs) { + retryService.schedule(msgId); + + Time.advanceTimeSecs(expectedBackoffSecs - 1); + assertThat("The message should not be ready for retry until backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat("The message should be ready for retry once backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(true)); + + msgId.incrementNumFails(); + retryService.remove(msgId); + } + } + } + +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index cbbb39161bb..8d7da7f960f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -59,6 +59,11 @@ import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.hamcrest.Matchers; public class SingleTopicKafkaSpoutTest { @@ -76,6 +81,7 @@ public class SingleTopicKafkaSpoutTest { private KafkaConsumer consumerSpy; private KafkaConsumerFactory consumerFactory; private KafkaSpout spout; + private final int maxPollRecords = 10; @Before public void setUp() { @@ -84,6 +90,7 @@ public void setUp() { .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .build(); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = new KafkaConsumerFactory() { @@ -114,6 +121,59 @@ private void verifyAllMessagesCommitted(long messageCount) { assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); } + @Test + public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = maxPollRecords * 2; + prepareSpout(messageCount); + + //Emit all messages and fail the first one while acking the rest + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIdCaptor.capture()); + List messageIds = messageIdCaptor.getAllValues(); + for (int i = 1; i < messageIds.size(); i++) { + spout.ack(messageIds.get(i)); + } + KafkaSpoutMessageId failedTuple = messageIds.get(0); + spout.fail(failedTuple); + + //Advance the time and replay the failed tuple. + reset(collector); + spout.nextTuple(); + ArgumentCaptor failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector).emit(anyObject(), anyObject(), failedIdReplayCaptor.capture()); + + assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); + + /* Ack the tuple, and commit. + * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. + */ + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.ack(failedIdReplayCaptor.getValue()); + spout.nextTuple(); + verify(consumerSpy).commitSync(commitCapture.capture()); + + Map capturedCommit = commitCapture.getValue(); + TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1)); + + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ + reset(collector); + //Just do a few polls to check that nothing more is emitted + for(int i = 0; i < 3; i++) { + spout.nextTuple(); + } + verify(collector, never()).emit(anyString(), anyList(), anyObject()); + } + } + @Test public void shouldContinueWithSlowDoubleAcks() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { @@ -271,7 +331,7 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { verifyAllMessagesCommitted(messageCount); } } - + @Test public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { //The spout must reemit retriable tuples, even if they fail out of order. diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java new file mode 100644 index 00000000000..e8896c9bd7e --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +import java.util.NoSuchElementException; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class OffsetManagerTest { + + @Rule + public ExpectedException expect = ExpectedException.none(); + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { + /*If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked + * then any prior message will have been emitted at least once. + * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. + */ + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + manager.addToEmitMsgs(0); + manager.addToEmitMsgs(1); + manager.addToEmitMsgs(2); + //3, 4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2)); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(2L)); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + + @Test + public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { + + TopicPartition tp = new TopicPartition("test", 0); + OffsetManager manager = new OffsetManager(tp, 0); + + //0-4 compacted away + manager.addToEmitMsgs(5); + manager.addToEmitMsgs(6); + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); + + manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + + assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", + manager.findNextCommitOffset().offset(), is(6L)); + } + +} From 54fdcbc03be47c95b8e9d177f525e031afb5e946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Wed, 4 Oct 2017 20:46:46 +0200 Subject: [PATCH 2/2] Backport to Java 7 and revert changes to RetryService API --- .../apache/storm/kafka/spout/KafkaSpout.java | 10 ++++-- .../KafkaSpoutRetryExponentialBackoff.java | 5 +-- .../kafka/spout/KafkaSpoutRetryService.java | 13 ++++---- .../kafka/spout/KafkaSpoutRebalanceTest.java | 4 +-- ...KafkaSpoutRetryExponentialBackoffTest.java | 33 +++++++++++-------- .../spout/SingleTopicKafkaSpoutTest.java | 4 +-- 6 files changed, 41 insertions(+), 28 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4f9dacb335c..3582bdbb894 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -178,7 +178,13 @@ private void initialize(Collection partitions) { * Emitted messages for partitions that are no longer assigned to this spout can't * be acked and should not be retried, hence remove them from emitted collection. */ - emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition())); + Iterator msgIdIterator = emitted.iterator(); + while (msgIdIterator.hasNext()) { + KafkaSpoutMessageId msgId = msgIdIterator.next(); + if (!partitions.contains(msgId.getTopicPartition())) { + msgIdIterator.remove(); + } + } } Set newPartitions = new HashSet<>(partitions); @@ -331,7 +337,7 @@ private void emit() { */ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); + final KafkaSpoutMessageId msgId = retryService.getMessageId(record); if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 81ef9869533..68a6f3fe856 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -33,6 +33,7 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.utils.Time; /** @@ -289,8 +290,8 @@ public int readyMessageCount() { } @Override - public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) { - KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset); + public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); if (isScheduled(msgId)) { for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { if (originalMsgId.equals(msgId)) { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index b70acd7c7b2..12d26dac65a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -18,11 +18,11 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.common.TopicPartition; import java.io.Serializable; import java.util.Collection; import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; /** @@ -84,10 +84,11 @@ public interface KafkaSpoutRetryService extends Serializable { int readyMessageCount(); /** - * Gets the {@link KafkaSpoutMessageId} for the record on the given topic partition and offset. - * @param topicPartition The topic partition of the record - * @param offset The offset of the record - * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry. + * Gets the {@link KafkaSpoutMessageId} for the given record. + * + * @param record The record to fetch the id for + * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for + * retry. */ - KafkaSpoutMessageId getMessageId(TopicPartition topicPartition, long offset); + KafkaSpoutMessageId getMessageId(ConsumerRecord record); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index b16ba5d96ea..3e505067999 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -213,10 +213,10 @@ public void testReassignPartitionSeeksForOnlyNewPartitions() { Subscription subscriptionMock = mock(Subscription.class); doNothing() .when(subscriptionMock) - .subscribe(any(), rebalanceListenerCapture.capture(), any()); + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); KafkaSpout spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) - .build(), consumerFactory); + .build(), consumerFactoryMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition assignedPartition = new TopicPartition(topic, 1); TopicPartition newPartition = new TopicPartition(topic, 2); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java index c543f8b7bca..f6de6a80261 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.utils.Time; @@ -44,17 +45,21 @@ private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() { return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), TimeInterval.seconds(0), 1, TimeInterval.seconds(1)); } + private ConsumerRecord createRecord(TopicPartition tp, long offset) { + return new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, null); + } + @Test public void testCanScheduleRetry() { KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); boolean scheduled = retryService.schedule(msgId); assertThat("The service must schedule the message for retry", scheduled, is(true)); - KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(createRecord(testTopic, offset)); assertThat("The service should return the original message id when asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId)); assertThat(retryService.isScheduled(msgId), is(true)); assertThat(retryService.isReady(msgId), is(true)); @@ -68,7 +73,7 @@ public void testCanRescheduleRetry() { KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); retryService.schedule(msgId); @@ -79,7 +84,7 @@ public void testCanRescheduleRetry() { Time.advanceTime(500); assertThat("The message should not be ready for retry yet since it was rescheduled", retryService.isReady(msgId), is(false)); assertThat(retryService.isScheduled(msgId), is(true)); - assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); assertThat(retryService.readyMessageCount(), is(0)); Time.advanceTime(500); assertThat("The message should be ready for retry once the full delay has passed", retryService.isReady(msgId), is(true)); @@ -95,7 +100,7 @@ public void testCannotContainMultipleSchedulesForId() { KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); retryService.schedule(msgId); @@ -113,7 +118,7 @@ public void testCannotContainMultipleSchedulesForId() { public void testCanRemoveRetry() { KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); retryService.schedule(msgId); @@ -122,7 +127,7 @@ public void testCanRemoveRetry() { assertThat(removed, is(true)); assertThat(retryService.isScheduled(msgId), is(false)); assertThat(retryService.isReady(msgId), is(false)); - assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.emptyMap())); assertThat(retryService.readyMessageCount(), is(0)); } @@ -133,8 +138,8 @@ public void testCanHandleMultipleTopics() { KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(testTopic, offset); - KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(testTopic2, offset); + KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(createRecord(testTopic2, offset)); msgIdTp1.incrementNumFails(); msgIdTp2.incrementNumFails(); @@ -179,8 +184,8 @@ public void testCanHandleMultipleMessagesOnPartition() { KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); long offset = 0; - KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(testTopic, offset); - KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(testTopic, offset + 1); + KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(createRecord(testTopic, offset + 1)); msgIdEarliest.incrementNumFails(); msgIdLatest.incrementNumFails(); @@ -213,7 +218,7 @@ public void testMaxRetries() { KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0)); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); for (int i = 0; i < maxRetries; i++) { msgId.incrementNumFails(); } @@ -240,7 +245,7 @@ public void testMaxDelay() { KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs)); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); retryService.schedule(msgId); @@ -263,7 +268,7 @@ public void testExponentialBackoff() { KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(4), Integer.MAX_VALUE, TimeInterval.seconds(Integer.MAX_VALUE)); long offset = 0; - KafkaSpoutMessageId msgId = retryService.getMessageId(testTopic, offset); + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); msgId.incrementNumFails(); msgId.incrementNumFails(); //First failure is the initial delay, so not interesting diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 8d7da7f960f..75fa6b8c7ca 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -132,7 +132,7 @@ public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() spout.nextTuple(); } ArgumentCaptor messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIdCaptor.capture()); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); List messageIds = messageIdCaptor.getAllValues(); for (int i = 1; i < messageIds.size(); i++) { spout.ack(messageIds.get(i)); @@ -144,7 +144,7 @@ public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() reset(collector); spout.nextTuple(); ArgumentCaptor failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector).emit(anyObject(), anyObject(), failedIdReplayCaptor.capture()); + verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));