From cfe11249da59182e09e7469baaaf665f2dcb3482 Mon Sep 17 00:00:00 2001 From: cleroux Date: Fri, 27 Apr 2018 22:36:05 +0000 Subject: [PATCH] [STORM-3043] Fix NullPointerException when apply() returns null --- .../kafka/spout/SimpleRecordTranslator.java | 6 +- .../spout/ByTopicRecordTranslatorTest.java | 15 +++++ .../spout/SimpleRecordTranslatorTest.java | 59 +++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java index 46c28496bf3..41f9f4ea528 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java @@ -41,8 +41,12 @@ public SimpleRecordTranslator(Func, List> func, Fie @Override public List apply(ConsumerRecord record) { + List vals = func.apply(record); + if (vals == null) { + return null; + } KafkaTuple ret = new KafkaTuple(); - ret.addAll(func.apply(record)); + ret.addAll(vals); return ret.routedTo(stream); } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java index abc58f0e8e5..22845f01a93 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java @@ -49,6 +49,13 @@ public List apply(ConsumerRecord record) { return new Values(record.key(), record.value()); } }; + + public static Func, List> NULL_FUNC = new Func, List>() { + @Override + public List apply(ConsumerRecord record) { + return null; + } + }; @Test public void testBasic() { @@ -74,6 +81,14 @@ public void testBasic() { assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream")); assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3)); } + + @Test + public void testNullTranslation() { + ByTopicRecordTranslator trans = + new ByTopicRecordTranslator<>(NULL_FUNC, new Fields("key")); + ConsumerRecord cr = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(null, trans.apply(cr)); + } @Test(expected = IllegalArgumentException.class) public void testFieldCollision() { diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java new file mode 100644 index 00000000000..03bbc78092e --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SimpleRecordTranslatorTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.junit.Test; + +public class SimpleRecordTranslatorTest { + public static Func, List> JUST_VALUE_FUNC = new Func, List>() { + @Override + public List apply(ConsumerRecord record) { + return new Values(record.value()); + } + }; + + public static Func, List> NULL_FUNC = new Func, List>() { + @Override + public List apply(ConsumerRecord record) { + return null; + } + }; + + @Test + public void testBasic() { + SimpleRecordTranslator trans = + new SimpleRecordTranslator<>(JUST_VALUE_FUNC, new Fields("value")); + assertEquals(Arrays.asList("default"), trans.streams()); + ConsumerRecord cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr)); + } + + @Test + public void testNullTranslation() { + SimpleRecordTranslator trans = + new SimpleRecordTranslator<>(NULL_FUNC, new Fields("key")); + assertEquals(null, trans.apply(null)); + } +}