From 2d5b8c7267eca6e0f234eeaf9e3553bdfba75da9 Mon Sep 17 00:00:00 2001 From: gardellajuanpablo Date: Fri, 29 Sep 2017 10:49:21 -0300 Subject: [PATCH] NIFI-4330 ConsumeKafka* throw NullPointerException if Kafka message has a null value It is possible null values to be stored in Kafka topics. Fixed handle this scenario. Notice without this fix, the consumer is unable to consume more messages (at least without removing messages from the queue). --- .../kafka/pubsub/ConsumerLease.java | 19 ++++++++++++++----- .../kafka/pubsub/ConsumerLease.java | 19 ++++++++++++++----- .../kafka/pubsub/ConsumerLease.java | 14 ++++++++++---- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 1f58ab3965ed..711c2bc6d42e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -387,9 +387,12 @@ private void writeData(final ProcessSession session, ConsumerRecord { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -418,7 +421,10 @@ private void writeDemarcatedData(final ProcessSession session, final List consumerRec attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index de196480d1b4..2d893d8f6fec 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -396,9 +396,12 @@ private void writeData(final ProcessSession session, ConsumerRecord { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -436,7 +439,10 @@ private void writeDemarcatedData(final ProcessSession session, final List consumerRec FlowFile failureFlowFile = session.create(); - failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 53e7a23752a9..43dfd17dfa27 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -358,9 +358,12 @@ private void writeData(final ProcessSession session, ConsumerRecord { - out.write(record.value()); - }); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } tracker.updateFlowFile(flowFile); populateAttributes(tracker); session.transfer(tracker.flowFile, REL_SUCCESS); @@ -387,7 +390,10 @@ private void writeData(final ProcessSession session, final List