From 409ad1051fc44a0ce8acf1a59ba8dac5261a688e Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 24 Mar 2016 07:37:58 -0400 Subject: [PATCH 1/4] NIFI-1684 fixed NPE, added tests --- .../kafka/SplittableMessageContext.java | 22 +++++-- .../kafka/SplittableMessageContextTest.java | 64 +++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java index 99674044835a..d597a0589bde 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java @@ -46,6 +46,9 @@ final class SplittableMessageContext { * - "(\\W)\\Z". */ SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) { + if (topicName == null || topicName.trim().length() == 0){ + throw new IllegalArgumentException("'topicName' must not be null or empty"); + } this.topicName = topicName; this.keyBytes = keyBytes; this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z"; @@ -60,20 +63,25 @@ public String toString() { } /** - * + * Will set failed segments from an array of integers */ void setFailedSegments(int... failedSegments) { - this.failedSegments = new BitSet(); - for (int failedSegment : failedSegments) { - this.failedSegments.set(failedSegment); + if (failedSegments != null) { + this.failedSegments = new BitSet(); + for (int failedSegment : failedSegments) { + this.failedSegments.set(failedSegment); + } } } /** - * + * Will set failed segments from an array of bytes that will be used to + * construct the final {@link BitSet} representing failed segments */ void setFailedSegmentsAsByteArray(byte[] failedSegments) { - this.failedSegments = BitSet.valueOf(failedSegments); + if (failedSegments != null) { + this.failedSegments = BitSet.valueOf(failedSegments); + } } /** @@ -102,7 +110,7 @@ String getDelimiterPattern() { * Returns the key bytes as String */ String getKeyBytesAsString() { - return new String(this.keyBytes); + return this.keyBytes != null ? new String(this.keyBytes) : null; } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java new file mode 100644 index 000000000000..b12464a32786 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.nio.charset.StandardCharsets; + +import org.junit.Test; + +public class SplittableMessageContextTest { + + @Test(expected = IllegalArgumentException.class) + public void failNullEmptyTopic() { + new SplittableMessageContext(null, null, null); + } + + @Test + public void validateFullSetting() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", "hello".getBytes(), "\n"); + ctx.setFailedSegments(1, 3, 6); + assertEquals("\n", ctx.getDelimiterPattern()); + assertEquals("hello", new String(ctx.getKeyBytes(), StandardCharsets.UTF_8)); + assertEquals("foo", ctx.getTopicName()); + assertEquals("topic: 'foo'; delimiter: '\n'", ctx.toString()); + } + + + @Test + public void validateToString() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); + assertEquals("topic: 'foo'; delimiter: '(\\W)\\Z'", ctx.toString()); + } + + @Test + public void validateNoNPEandNoSideffectsOnSetsGets() { + SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); + ctx.setFailedSegments(null); + assertNull(ctx.getFailedSegments()); + + ctx.setFailedSegmentsAsByteArray(null); + assertNull(ctx.getFailedSegments()); + + assertEquals("(\\W)\\Z", ctx.getDelimiterPattern());; + assertNull(ctx.getKeyBytes()); + assertNull(ctx.getKeyBytesAsString()); + assertEquals("foo", ctx.getTopicName()); + } +} From 3ff3cd2e9821573bcbbf83fe23aa860463696b6e Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 24 Mar 2016 14:35:40 -0400 Subject: [PATCH 2/4] NIFI-1684 fixed NPE in PutKafka when retrieving key attribute bytes --- .../main/java/org/apache/nifi/processors/kafka/PutKafka.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 4510038cd202..513f4f315061 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -408,7 +408,7 @@ private SplittableMessageContext buildMessageContext(FlowFile flowFile, ProcessC String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS); if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) { topicName = flowFile.getAttribute(ATTR_TOPIC); - key = flowFile.getAttribute(ATTR_KEY).getBytes(); + key = flowFile.getAttribute(ATTR_KEY) == null ? null : flowFile.getAttribute(ATTR_KEY).getBytes(); delimiterPattern = flowFile.getAttribute(ATTR_DELIMITER); } else { failedSegmentsString = null; From 40e77e01d7cc779cce3b6b242fd578c86adcc1fd Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 24 Mar 2016 15:29:35 -0400 Subject: [PATCH 3/4] NIFI-1684 added penalization on failure --- .../main/java/org/apache/nifi/processors/kafka/PutKafka.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 513f4f315061..44f04aab6a55 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -306,7 +306,7 @@ public void process(InputStream contentStream) throws IOException { session.transfer(flowFile, REL_SUCCESS); } else { flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext)); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); } } else { From c1a0168eb9d53467f1716f58e1471e915a568c8b Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Sat, 26 Mar 2016 10:47:13 -0400 Subject: [PATCH 4/4] NIFI-1684 fixed random partitioner initialization --- .../main/java/org/apache/nifi/processors/kafka/PutKafka.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 44f04aab6a55..6d1749367370 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -55,8 +55,6 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import kafka.producer.DefaultPartitioner; - @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " @@ -454,7 +452,7 @@ private Properties buildKafkaConfigProperties(final ProcessContext context) { if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { - partitionerClass = DefaultPartitioner.class.getName(); + partitionerClass = Partitioners.RandomPartitioner.class.getName(); } properties.setProperty("partitioner.class", partitionerClass);