From b53cf6001cbcd4acdff48f60690ac64cbdce3821 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 2 May 2017 00:18:47 -0400 Subject: [PATCH] NIFI-3739: Pass the proper InputStream to RecordSetWriterFactory in order to obtain RecordSetWriter; also fix error handling so that we don't kill kafka client if unable to create writer, since we roll back the offsets --- .../nifi/processors/kafka/pubsub/ConsumerLease.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index b2665aeda096..ac24e1f4b9ad 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -171,6 +171,8 @@ void poll() { final ConsumerRecords records = kafkaConsumer.poll(10); lastPollEmpty = records.count() == 0; processRecords(records); + } catch (final ProcessException pe) { + throw pe; } catch (final Throwable t) { this.poison(); throw t; @@ -405,11 +407,15 @@ private void rollback(final TopicPartition topicPartition) { } private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { + if (records.isEmpty()) { + return; + } + FlowFile flowFile = session.create(); try { final RecordSetWriter writer; try { - writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new byte[0])); + writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value())); } catch (final Exception e) { logger.error( "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "