From d70b475456e2d7cb83e80b45560f48b58dffd4d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sun, 6 May 2018 10:16:25 +0200 Subject: [PATCH] STORM-3059: Fix NPE when the processing guarantee is not AT_LEAST_ONCE and the spout filters out a null tuple --- .../apache/storm/kafka/spout/KafkaSpout.java | 8 ++- .../storm/kafka/NullRecordTranslator.java | 42 +++++++++++++ .../KafkaSpoutMessagingGuaranteeTest.java | 59 ++++++++++++++----- .../kafka/spout/KafkaSpoutNullTupleTest.java | 36 +---------- 4 files changed, 95 insertions(+), 50 deletions(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index e7f51568293..e8ecb3e2e73 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -503,9 +503,11 @@ private boolean emitOrRetryTuple(ConsumerRecord record) { /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately * to allow its offset to be commited to Kafka*/ LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record); - msgId.setNullTuple(true); - offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); - ack(msgId); + if (isAtLeastOnceProcessing()) { + msgId.setNullTuple(true); + offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); + ack(msgId); + } } } return false; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java new file mode 100644 index 00000000000..f2b2f98c0b0 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/NullRecordTranslator.java @@ -0,0 +1,42 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka; + +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.tuple.Fields; + +public class NullRecordTranslator implements RecordTranslator { + + @Override + public List apply(ConsumerRecord record) { + return null; + + } + + @Override + public Fields getFieldsFor(String stream) { + return new Fields("topic", "key", "value"); + } + + @Override + public List streams() { + return Collections.singletonList("default"); + } +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 082cc58fd4c..ca162374009 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -21,8 +21,10 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; @@ -44,6 +46,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.NullRecordTranslator; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.spout.SpoutOutputCollector; @@ -64,7 +67,7 @@ public class KafkaSpoutMessagingGuaranteeTest { @Captor private ArgumentCaptor> commitCapture; - + private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map conf = new HashMap<>(); @@ -96,7 +99,7 @@ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { inOrder.verify(consumerMock).poll(anyLong()); inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); - + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE); Map committedOffsets = commitCapture.getValue(); assertThat(committedOffsets.get(partition).offset(), is(0L)); @@ -191,7 +194,7 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { .setTupleTrackingEnforced(true) .build(); try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -204,13 +207,13 @@ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); spout.ack(msgIdCaptor.getValue()); - + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); - + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.>>emptyMap())); - + spout.nextTuple(); - + verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher>() { @Override public boolean matches(Object arg) { @@ -228,27 +231,27 @@ public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) .setTupleTrackingEnforced(true) .build(); - + try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); spout.nextTuple(); - + when(consumerMock.position(partition)).thenReturn(1L); ArgumentCaptor msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); - + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); - + spout.nextTuple(); - + verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class)); - + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE); Map committedOffsets = commitCapture.getValue(); assertThat(committedOffsets.get(partition).offset(), is(1L)); @@ -256,4 +259,32 @@ public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { } } + private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) { + //STORM-3059 + KafkaSpoutConfig spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(processingGuaranteee) + .setTupleTrackingEnforced(true) + .setRecordTranslator(new NullRecordTranslator()) + .build(); + + KafkaSpout spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + verify(collectorMock, never()).emit(anyString(), anyList(), any()); + } + + @Test + public void testAtMostOnceModeCanFilterNullTuples() { + doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE); + } + + @Test + public void testNoGuaranteeModeCanFilterNullTuples() { + doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE); + } + } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java index 159366bc6ce..54393f77b8f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java @@ -18,13 +18,10 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Time; import org.junit.Test; -import java.util.List; import java.util.regex.Pattern; import static org.mockito.Matchers.any; @@ -33,6 +30,8 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import org.apache.storm.kafka.NullRecordTranslator; + public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest { public KafkaSpoutNullTupleTest() { @@ -42,11 +41,10 @@ public KafkaSpoutNullTupleTest() { @Override KafkaSpoutConfig createSpoutConfig() { - return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) - .setRecordTranslator(new NullRecordExtractor()) + .setRecordTranslator(new NullRecordTranslator()) .build(); } @@ -72,32 +70,4 @@ public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() throws Excepti verifyAllMessagesCommitted(messageCount); } - private class NullRecordExtractor implements RecordTranslator { - - @Override - public List apply(ConsumerRecord record) { - return null; - - } - - @Override - public Fields getFieldsFor(String stream) { - return new Fields("topic", "key", "value"); - } - - /** - * @return the list of streams that this will handle. - */ - @Override - public List streams() { - return null; - } - - @Override - public Object apply(Object record) { - return null; - } - } - - }