From 20a82602ffda6795ced9d34e413464d7720f660c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 14 Mar 2017 22:12:50 +0100 Subject: [PATCH] STORM-2413: Make new Kafka spout respect tuple retry limit --- .../apache/storm/kafka/spout/KafkaSpout.java | 2 +- .../kafka/spout/KafkaSpoutMessageId.java | 4 +-- .../KafkaSpoutRetryExponentialBackoff.java | 14 ++++++++ .../kafka/spout/KafkaSpoutRetryService.java | 13 +++++++- .../kafka/spout/KafkaSpoutRebalanceTest.java | 13 ++++++-- .../spout/SingleTopicKafkaSpoutTest.java | 32 ++++++++++++++++--- .../KafkaSpoutTopologyMainNamedTopics.java | 1 - .../src/test/resources/log4j2.xml | 32 +++++++++++++++++++ 8 files changed, 100 insertions(+), 11 deletions(-) create mode 100755 external/storm-kafka-client/src/test/resources/log4j2.xml diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 72b8d14c5a8..09795ed7930 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -332,7 +332,7 @@ private void emit() { */ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + final KafkaSpoutMessageId msgId = retryService.getMessageId(record); if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index dd6411ade84..1a60723b51d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.TopicPartition; public class KafkaSpoutMessageId { - private transient TopicPartition topicPart; - private transient long offset; + private final transient TopicPartition topicPart; + private final transient long offset; private transient int numFails = 0; /** * true if the record was emitted using a form of collector.emit(...). false diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 60c34dc6350..60a707d00ca 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.storm.utils.Time; import org.slf4j.Logger; @@ -287,6 +288,19 @@ public int readyMessageCount() { return count; } + @Override + public KafkaSpoutMessageId getMessageId(ConsumerRecord record) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + if (toRetryMsgs.contains(msgId)) { + for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { + if (originalMsgId.equals(msgId)) { + return originalMsgId; + } + } + } + return msgId; + } + // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE private long nextTime(KafkaSpoutMessageId msgId) { final long currentTimeNanos = Time.nanoTime(); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index 04e4ae7aca7..a1caf2ce870 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Map; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** * Represents the logic that manages the retrial of failed tuples. @@ -75,6 +76,16 @@ public interface KafkaSpoutRetryService extends Serializable { * Returns false is this message is not scheduled for retrial */ boolean isScheduled(KafkaSpoutMessageId msgId); - + + /** + * @return The number of messages that are ready for retry + */ int readyMessageCount(); + + /** + * Gets the {@link KafkaSpoutMessageId} for the given record. + * @param record The record to fetch the id for + * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry. + */ + KafkaSpoutMessageId getMessageId(ConsumerRecord record); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index cdae4dd55fc..2d55520d643 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -58,15 +58,17 @@ public class KafkaSpoutRebalanceTest { private ArgumentCaptor> commitCapture; private final long offsetCommitPeriodMs = 2_000; - private final TopologyContext contextMock = mock(TopologyContext.class); - private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); + private TopologyContext contextMock; + private SpoutOutputCollector collectorMock; private KafkaConsumer consumerMock; private KafkaConsumerFactory consumerFactory; @Before public void setUp() { MockitoAnnotations.initMocks(this); + contextMock = mock(TopologyContext.class); + collectorMock = mock(SpoutOutputCollector.class); consumerMock = mock(KafkaConsumer.class); consumerFactory = (kafkaSpoutConfig) -> consumerMock; } @@ -159,9 +161,16 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); + when(retryServiceMock.getMessageId(anyObject())) + .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0)) + .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); + //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + //Check that only two message ids were generated + verify(retryServiceMock, times(2)).getMessageId(anyObject()); + //Fail both emitted tuples spout.fail(emittedMessageIds.get(0)); spout.fail(emittedMessageIds.get(1)); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index ecbab87ebf9..7f0973b6d2f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -17,13 +17,13 @@ */ package org.apache.storm.kafka.spout; - import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -57,7 +57,6 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; - public class SingleTopicKafkaSpoutTest { @Rule @@ -73,12 +72,15 @@ public class SingleTopicKafkaSpoutTest { private KafkaConsumer consumerSpy; private KafkaConsumerFactory consumerFactory; private KafkaSpout spout; + private int maxRetries = 3; @Before public void setUp() { MockitoAnnotations.initMocks(this); KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), + maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) .build(); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; @@ -90,8 +92,8 @@ void populateTopicData(String topicName, int msgCount) throws InterruptedExcepti for (int i = 0; i < msgCount; i++) { ProducerRecord producerRecord = new ProducerRecord<>( - topicName, Integer.toString(i), - Integer.toString(i)); + topicName, Integer.toString(i), + Integer.toString(i)); kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord); } } @@ -300,4 +302,26 @@ public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { expectedReemitIds.add(capturedMessageIds.get(2)); assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds)); } + + @Test + public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception { + //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted + int messageCount = 1; + initializeSpout(messageCount); + + //Emit and fail the same tuple until we've reached retry limit + for (int i = 0; i <= maxRetries; i++) { + ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + spout.nextTuple(); + verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture()); + KafkaSpoutMessageId msgId = messageIdFailed.getValue(); + spout.fail(msgId); + assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); + reset(collector); + } + + //Verify that the tuple is not emitted again + spout.nextTuple(); + verify(collector, never()).emit(anyObject(), anyObject(), anyObject()); + } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index 2492e330fa4..9459d4bd38c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -42,7 +42,6 @@ public class KafkaSpoutTopologyMainNamedTopics { private static final String TOPIC_0_1_STREAM = "test_0_1_stream"; private static final String[] TOPICS = new String[]{"test","test1","test2"}; - public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainNamedTopics().runMain(args); } diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml new file mode 100755 index 00000000000..393dd2ccfbc --- /dev/null +++ b/external/storm-kafka-client/src/test/resources/log4j2.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file