From 328016eaeebe1e5865eb9ae456cb2f33b6a8a495 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Sat, 12 Dec 2020 13:54:56 +0100 Subject: [PATCH] NIFI-8085 Use poll(Duration) in ConsumeKafka_2_x processors --- .../processors/kafka/pubsub/ConsumerLease.java | 3 ++- .../processors/kafka/pubsub/ConsumerPoolTest.java | 15 +++++++-------- .../processors/kafka/pubsub/ConsumerLease.java | 3 ++- .../processors/kafka/pubsub/ConsumerPoolTest.java | 13 +++++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 3ecec49a9daa..729c8010c8d7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -48,6 +48,7 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -178,7 +179,7 @@ void poll() { * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0. */ try { - final ConsumerRecords records = kafkaConsumer.poll(10); + final ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(10)); lastPollEmpty = records.count() == 0; processRecords(records); } catch (final ProcessException pe) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 3414420cf725..195d2cbf410a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -24,8 +24,6 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processors.kafka.pubsub.ConsumerLease; -import org.apache.nifi.processors.kafka.pubsub.ConsumerPool; import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats; import org.apache.nifi.provenance.ProvenanceReporter; import org.junit.Before; @@ -33,6 +31,7 @@ import org.mockito.Mockito; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -43,7 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -112,7 +111,7 @@ protected Consumer createKafkaConsumer() { @Test public void validatePoolSimpleCreateClose() throws Exception { - when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } @@ -144,7 +143,7 @@ public void validatePoolSimpleCreatePollClose() throws Exception { }; final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); @@ -160,7 +159,7 @@ public void validatePoolSimpleCreatePollClose() throws Exception { @Test public void validatePoolSimpleBatchCreateClose() throws Exception { - when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); for (int i = 0; i < 100; i++) { try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { for (int j = 0; j < 100; j++) { @@ -187,7 +186,7 @@ public void validatePoolBatchCreatePollClose() throws Exception { }; final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); @@ -204,7 +203,7 @@ public void validatePoolBatchCreatePollClose() throws Exception { @Test public void validatePoolConsumerFails() throws Exception { - when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops")); + when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops")); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { try { lease.poll(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index c3846a2b13fd..e3e612436654 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -48,6 +48,7 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -178,7 +179,7 @@ void poll() { * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0. */ try { - final ConsumerRecords records = kafkaConsumer.poll(10); + final ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(10)); lastPollEmpty = records.count() == 0; processRecords(records); } catch (final ProcessException pe) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 18e188cc3361..195d2cbf410a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -31,6 +31,7 @@ import org.mockito.Mockito; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -41,7 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -110,7 +111,7 @@ protected Consumer createKafkaConsumer() { @Test public void validatePoolSimpleCreateClose() throws Exception { - when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } @@ -142,7 +143,7 @@ public void validatePoolSimpleCreatePollClose() throws Exception { }; final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); @@ -158,7 +159,7 @@ public void validatePoolSimpleCreatePollClose() throws Exception { @Test public void validatePoolSimpleBatchCreateClose() throws Exception { - when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); for (int i = 0; i < 100; i++) { try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { for (int j = 0; j < 100; j++) { @@ -185,7 +186,7 @@ public void validatePoolBatchCreatePollClose() throws Exception { }; final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + when(consumer.poll(any(Duration.class))).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); @@ -202,7 +203,7 @@ public void validatePoolBatchCreatePollClose() throws Exception { @Test public void validatePoolConsumerFails() throws Exception { - when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops")); + when(consumer.poll(any(Duration.class))).thenThrow(new KafkaException("oops")); try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { try { lease.poll();