From 7040b4b1d1c4f1b8e61cc5c8205e09876c0386c0 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 1 Aug 2016 10:56:46 -0400 Subject: [PATCH 1/2] NIFI-2444 NIFI-2445 fixed PublishKafka - fixed the logging issue NIFI-2444 by ensuring the ProcessLog is added to KafkaPublisher - fixed KafkaPublisher's isAllAcked operation to ensure that it properly reports that the flow file has failed. - added additional test --- .../nifi/processors/kafka/KafkaPublisher.java | 9 +---- .../kafka/pubsub/KafkaPublisher.java | 12 ++---- .../processors/kafka/pubsub/PublishKafka.java | 4 +- .../kafka/pubsub/PublishKafkaTest.java | 32 +++++++++++++++- .../kafka/pubsub/StubPublishKafka.java | 38 ++++++++++++++----- 5 files changed, 68 insertions(+), 27 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index 6acdd62691ca..29347994076b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -33,10 +33,9 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import kafka.producer.Partitioner; @@ -46,8 +45,6 @@ */ class KafkaPublisher implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer kafkaProducer; private long ackWaitTime = 30000; @@ -233,12 +230,10 @@ void setProcessLog(ProcessorLog processLog) { */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); if (this.processLog != null) { this.processLog.warn(message); } } else { - logger.error(message, e); if (this.processLog != null) { this.processLog.error(message, e); } @@ -262,7 +257,7 @@ public int getLastMessageAcked() { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index ed7e21d571ae..3c9d278cda85 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -31,24 +31,20 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor * with sending contents of the {@link FlowFile}s to Kafka. */ class KafkaPublisher implements Closeable { - - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer kafkaProducer; private volatile long ackWaitTime = 30000; - private volatile ProcessorLog processLog; + private volatile ComponentLog processLog; private final int ackCheckSize; @@ -212,12 +208,10 @@ void setProcessLog(ProcessorLog processLog) { */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); if (this.processLog != null) { this.processLog.warn(message); } } else { - logger.error(message, e); if (this.processLog != null) { this.processLog.error(message, e); } @@ -244,7 +238,7 @@ public int getLastMessageAcked() { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 6235f0b170f7..ddd6d3f738a0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -228,7 +228,9 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - return new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + publisher.setProcessLog(this.getLogger()); + return publisher; } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index af550b4a17fa..be9757820c08 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -153,7 +153,7 @@ public void validateOnSendFailureAndThenResendSuccessA() throws Exception { runner.setProperty(PublishKafka.KEY, "key1"); runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis"); final String text = "Hello World\nGoodbye\nfail\n2"; runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); @@ -164,6 +164,7 @@ public void validateOnSendFailureAndThenResendSuccessA() throws Exception { Producer producer = putKafka.getProducer(); verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); runner.shutdown(); + putKafka.destroy(); } @SuppressWarnings("unchecked") @@ -191,6 +192,35 @@ public void validateOnSendFailureAndThenResendSuccessB() throws Exception { runner.shutdown(); } + @SuppressWarnings("unchecked") + @Test + public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception { + String topicName = "validateSendFailureAndThenResendSuccess"; + StubPublishKafka putKafka = new StubPublishKafka(100); + + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + + final String text = "futurefail\nHello World\nGoodbye\n2"; + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); + assertNotNull(ff); + runner.enqueue(ff); + + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer producer = putKafka.getProducer(); + // 6 sends due to duplication + verify(producer, times(5)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + @SuppressWarnings("unchecked") @Test public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index df38aee73386..0893614a5ee5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -22,14 +22,17 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.Properties; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -46,6 +49,8 @@ public class StubPublishKafka extends PublishKafka { private final int ackCheckSize; + private final ExecutorService executor = Executors.newCachedThreadPool(); + StubPublishKafka(int ackCheckSize) { this.ackCheckSize = ackCheckSize; } @@ -54,6 +59,10 @@ public Producer getProducer() { return producer; } + public void destroy() { + this.executor.shutdownNow(); + } + @SuppressWarnings("unchecked") @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) @@ -67,6 +76,7 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi f.setAccessible(true); f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); + publisher.setAckWaitTime(15000); producer = mock(Producer.class); this.instrumentProducer(producer, false); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); @@ -85,21 +95,31 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi @SuppressWarnings("unchecked") private void instrumentProducer(Producer producer, boolean failRandomly) { + when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer>() { - @SuppressWarnings("rawtypes") @Override public Future answer(InvocationOnMock invocation) throws Throwable { ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; - String value = new String(record.value(), StandardCharsets.UTF_8); + final String value = new String(record.value(), StandardCharsets.UTF_8); if ("fail".equals(value) && !StubPublishKafka.this.failed) { StubPublishKafka.this.failed = true; throw new RuntimeException("intentional"); } - Future future = mock(Future.class); - if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - when(future.get(Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenThrow(ExecutionException.class); - } + + Future future = executor.submit(new Callable() { + @Override + public RecordMetadata call() throws Exception { + if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { + // System.out.println("FAIL"); + StubPublishKafka.this.failed = true; + throw new TopicAuthorizationException("Unauthorized"); + } else { + TopicPartition partition = new TopicPartition("foo", 0); + RecordMetadata meta = new RecordMetadata(partition, 0, 0); + return meta; + } + } + }); return future; } }); From 7a9cbe13af348b07e84ff61b29f711848561680d Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 2 Aug 2016 18:27:55 -0400 Subject: [PATCH 2/2] NIFI-2444-0.x addressed PR comments --- .../nifi/processors/kafka/KafkaPublisher.java | 26 +++++-------------- .../nifi/processors/kafka/PutKafka.java | 3 +-- .../processors/kafka/KafkaPublisherTest.java | 12 +++++---- .../kafka/pubsub/KafkaPublisher.java | 26 +++++-------------- .../processors/kafka/pubsub/PublishKafka.java | 3 +-- .../kafka/pubsub/KafkaPublisherTest.java | 14 +++++----- .../kafka/pubsub/StubPublishKafka.java | 6 ++++- 7 files changed, 36 insertions(+), 54 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index 29347994076b..b83edded8683 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; import kafka.producer.Partitioner; @@ -49,14 +48,14 @@ class KafkaPublisher implements Closeable { private long ackWaitTime = 30000; - private ProcessorLog processLog; + private final ComponentLog componentLog; private final Partitioner partitioner; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -68,7 +67,7 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.kafkaProducer = new KafkaProducer<>(kafkaProperties); @@ -82,6 +81,7 @@ class KafkaPublisher implements Closeable { } catch (Exception e) { throw new IllegalStateException("Failed to create partitioner", e); } + this.componentLog = componentLog; } /** @@ -217,26 +217,14 @@ public void close() { this.kafkaProducer.close(); } - /** - * Will set {@link ProcessorLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ProcessorLog processLog) { - this.processLog = processLog; - } - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 3f6aec43524e..12a9b8916a39 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -390,8 +390,7 @@ public Set getRelationships() { @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); - kafkaPublisher.setProcessLog(this.getLogger()); + KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger()); return kafkaPublisher; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index c4fc9a8108d8..5bb7c3cd580e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -29,6 +30,7 @@ import java.util.Properties; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -70,7 +72,7 @@ public void validateSuccessfulSendAsWhole() throws Exception { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -96,7 +98,7 @@ public void validateSuccessfulSendAsDelimited() throws Exception { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -132,7 +134,7 @@ public void validateRetries() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -179,7 +181,7 @@ public void validateRetriesWithWrongIndex() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public void validateWithMultiByteCharactersNoDelimiter() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index 3c9d278cda85..0ba381a0021c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -32,7 +32,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.stream.io.util.StreamDemarcator; /** @@ -44,12 +43,12 @@ class KafkaPublisher implements Closeable { private volatile long ackWaitTime = 30000; - private volatile ComponentLog processLog; + private final ComponentLog componentLog; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -61,9 +60,10 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { this.kafkaProducer = new KafkaProducer<>(kafkaProperties); this.ackCheckSize = ackCheckSize; + this.componentLog = componentLog; } /** @@ -195,26 +195,14 @@ public void close() { this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); } - /** - * Will set {@link ProcessorLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ProcessorLog processLog) { - this.processLog = processLog; - } - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index ddd6d3f738a0..6703c04a5bbd 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -228,8 +228,7 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - publisher.setProcessLog(this.getLogger()); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); return publisher; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java index 2c45d373d278..6b8b042d20bb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -32,6 +33,7 @@ import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -71,7 +73,7 @@ public void validateSuccessfulSendAsWhole() throws Exception { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -97,7 +99,7 @@ public void validateSuccessfulSendAsDelimited() throws Exception { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -133,7 +135,7 @@ public void validateRetries() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -180,7 +182,7 @@ public void validateRetriesWithWrongIndex() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public void validateWithMultiByteCharactersNoDelimiter() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); @@ -240,7 +242,7 @@ public void validateWithNonDefaultPartitioner() throws Exception { Properties kafkaProperties = this.buildProducerProperties(); kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index 0893614a5ee5..780ea4ac5523 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -83,6 +84,10 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi kf.setAccessible(true); kf.set(publisher, producer); + Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog"); + componentLogF.setAccessible(true); + componentLogF.set(publisher, mock(ComponentLog.class)); + Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize"); ackCheckSizeField.setAccessible(true); ackCheckSizeField.set(publisher, this.ackCheckSize); @@ -110,7 +115,6 @@ public Future answer(InvocationOnMock invocation) throws Throwab @Override public RecordMetadata call() throws Exception { if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - // System.out.println("FAIL"); StubPublishKafka.this.failed = true; throw new TopicAuthorizationException("Unauthorized"); } else {