Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.producer.Partitioner;

Expand All @@ -46,20 +44,18 @@
*/
class KafkaPublisher implements Closeable {

private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);

private final Producer<byte[], byte[]> kafkaProducer;

private long ackWaitTime = 30000;

private ProcessorLog processLog;
private final ComponentLog componentLog;

private final Partitioner partitioner;

private final int ackCheckSize;

KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
this(kafkaProperties, 100, componentLog);
}

/**
Expand All @@ -71,7 +67,7 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
Expand All @@ -85,6 +81,7 @@ class KafkaPublisher implements Closeable {
} catch (Exception e) {
throw new IllegalStateException("Failed to create partitioner", e);
}
this.componentLog = componentLog;
}

/**
Expand Down Expand Up @@ -220,28 +217,14 @@ public void close() {
this.kafkaProducer.close();
}

/**
* Will set {@link ProcessorLog} as an additional logger to forward log
* messages to NiFi bulletin
*/
void setProcessLog(ProcessorLog processLog) {
this.processLog = processLog;
}

/**
*
*/
private void warnOrError(String message, Exception e) {
if (e == null) {
logger.warn(message);
if (this.processLog != null) {
this.processLog.warn(message);
}
this.componentLog.warn(message);
} else {
logger.error(message, e);
if (this.processLog != null) {
this.processLog.error(message, e);
}
this.componentLog.error(message);
}
}

Expand All @@ -262,7 +245,7 @@ public int getLastMessageAcked() {
}

public boolean isAllAcked() {
return this.messagesSent - 1 == this.lastMessageAcked;
return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ public Set<Relationship> getRelationships() {
@Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
throws ProcessException {
KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context));
kafkaPublisher.setProcessLog(this.getLogger());
KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger());
return kafkaPublisher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand All @@ -29,6 +30,7 @@
import java.util.Properties;

import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
Expand Down Expand Up @@ -70,7 +72,7 @@ public void validateSuccessfulSendAsWhole() throws Exception {
String topicName = "validateSuccessfulSendAsWhole";

Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
KafkaPublisherResult result = publisher.publish(publishingContext);
Expand All @@ -96,7 +98,7 @@ public void validateSuccessfulSendAsDelimited() throws Exception {
String topicName = "validateSuccessfulSendAsDelimited";

Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -132,7 +134,7 @@ public void validateRetries() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

// simulates the first re-try
int lastAckedMessageIndex = 1;
Expand Down Expand Up @@ -179,7 +181,7 @@ public void validateRetriesWithWrongIndex() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

// simulates the first re-try
int lastAckedMessageIndex = 3;
Expand Down Expand Up @@ -221,7 +223,7 @@ public void validateWithMultiByteCharactersNoDelimiter() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);

publisher.publish(publishingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,24 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
* with sending contents of the {@link FlowFile}s to Kafka.
*/
class KafkaPublisher implements Closeable {

private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);

private final Producer<byte[], byte[]> kafkaProducer;

private volatile long ackWaitTime = 30000;

private volatile ProcessorLog processLog;
private final ComponentLog componentLog;

private final int ackCheckSize;

KafkaPublisher(Properties kafkaProperties) {
this(kafkaProperties, 100);
KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
this(kafkaProperties, 100, componentLog);
}

/**
Expand All @@ -65,9 +60,10 @@ class KafkaPublisher implements Closeable {
* instance of {@link Properties} used to bootstrap
* {@link KafkaProducer}
*/
KafkaPublisher(Properties kafkaProperties, int ackCheckSize) {
KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
this.ackCheckSize = ackCheckSize;
this.componentLog = componentLog;
}

/**
Expand Down Expand Up @@ -199,28 +195,14 @@ public void close() {
this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
}

/**
* Will set {@link ProcessorLog} as an additional logger to forward log
* messages to NiFi bulletin
*/
void setProcessLog(ProcessorLog processLog) {
this.processLog = processLog;
}

/**
*
*/
private void warnOrError(String message, Exception e) {
if (e == null) {
logger.warn(message);
if (this.processLog != null) {
this.processLog.warn(message);
}
this.componentLog.warn(message);
} else {
logger.error(message, e);
if (this.processLog != null) {
this.processLog.error(message, e);
}
this.componentLog.error(message);
}
}

Expand All @@ -244,7 +226,7 @@ public int getLastMessageAcked() {
}

public boolean isAllAcked() {
return this.messagesSent - 1 == this.lastMessageAcked;
return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSessi
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
return new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger());
return publisher;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
Expand Down Expand Up @@ -71,7 +73,7 @@ public void validateSuccessfulSendAsWhole() throws Exception {
String topicName = "validateSuccessfulSendAsWhole";

Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
KafkaPublisherResult result = publisher.publish(publishingContext);
Expand All @@ -97,7 +99,7 @@ public void validateSuccessfulSendAsDelimited() throws Exception {
String topicName = "validateSuccessfulSendAsDelimited";

Properties kafkaProperties = this.buildProducerProperties();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -133,7 +135,7 @@ public void validateRetries() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

// simulates the first re-try
int lastAckedMessageIndex = 1;
Expand Down Expand Up @@ -180,7 +182,7 @@ public void validateRetriesWithWrongIndex() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));

// simulates the first re-try
int lastAckedMessageIndex = 3;
Expand Down Expand Up @@ -221,7 +223,7 @@ public void validateWithMultiByteCharactersNoDelimiter() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();

KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);

publisher.publish(publishingContext);
Expand All @@ -240,7 +242,7 @@ public void validateWithNonDefaultPartitioner() throws Exception {

Properties kafkaProperties = this.buildProducerProperties();
kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");

final String text = "Hello World\nGoodbye\nfail\n2";
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
Expand All @@ -164,6 +164,7 @@ public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
Producer<byte[], byte[]> producer = putKafka.getProducer();
verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
putKafka.destroy();
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -191,6 +192,35 @@ public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
runner.shutdown();
}

@SuppressWarnings("unchecked")
@Test
public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
String topicName = "validateSendFailureAndThenResendSuccess";
StubPublishKafka putKafka = new StubPublishKafka(100);

TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(PublishKafka.CLIENT_ID, "foo");
runner.setProperty(PublishKafka.KEY, "key1");
runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");

final String text = "futurefail\nHello World\nGoodbye\n2";
runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
assertNotNull(ff);
runner.enqueue(ff);

runner.run(1, false);
assertEquals(0, runner.getQueueSize().getObjectCount());
Producer<byte[], byte[]> producer = putKafka.getProducer();
// 6 sends due to duplication
verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
runner.shutdown();
}

@SuppressWarnings("unchecked")
@Test
public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
Expand Down
Loading