Skip to content

Commit

Permalink
NIFI-8021: Fixed bug in ConsumeKafka_2_6 and ConsumeKafkaRecord_2_6 w…
Browse files Browse the repository at this point in the history
…here explicit partition assignment causes issues with more than 1 concurrent task. Also fixed bug that prevented more nifi nodes than partitions because it didn't properly handle empty string for the list of partitions

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#4744.
  • Loading branch information
markap14 authored and driesva committed Mar 19, 2021
1 parent 4bed324 commit 7dd0d8c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ private synchronized ConsumerPool getConsumerPool(final ProcessContext context)

final ConsumerPool consumerPool = createConsumerPool(context, getLogger());

final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
if (numAssignedPartitions > 0) {
final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
if (explicitAssignment) {
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());

// Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
// all of the partitions assigned.
final int partitionCount = consumerPool.getPartitionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ private synchronized ConsumerPool getConsumerPool(final ProcessContext context)

final ConsumerPool consumerPool = createConsumerPool(context, getLogger());

final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
if (numAssignedPartitions > 0) {
final boolean explicitAssignment = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());
if (explicitAssignment) {
final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());

// Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
// all of the partitions assigned.
final int partitionCount = consumerPool.getPartitionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -102,7 +102,7 @@ public ConsumerPool(
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = demarcator;
Expand All @@ -119,6 +119,7 @@ public ConsumerPool(
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}

public ConsumerPool(
Expand All @@ -136,7 +137,7 @@ public ConsumerPool(
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = demarcator;
Expand All @@ -153,6 +154,7 @@ public ConsumerPool(
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}

public ConsumerPool(
Expand All @@ -171,7 +173,7 @@ public ConsumerPool(
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
Expand All @@ -188,6 +190,7 @@ public ConsumerPool(
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}

public ConsumerPool(
Expand All @@ -206,7 +209,7 @@ public ConsumerPool(
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
Expand All @@ -223,6 +226,7 @@ public ConsumerPool(
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
enqueueLeases(partitionsToConsume);
}

public int getPartitionCount() {
Expand Down Expand Up @@ -282,16 +286,8 @@ public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessC
consumer.subscribe(topicPattern, lease);
}
} else {
final List<TopicPartition> topicPartitions = new ArrayList<>();

for (final String topic : topics) {
for (final int partition : partitionsToConsume) {
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitions.add(topicPartition);
}
}

consumer.assign(topicPartitions);
logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
return null;
}
}
lease.setProcessSession(session, processContext);
Expand All @@ -300,6 +296,32 @@ public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessC
return lease;
}

private SimpleConsumerLease createConsumerLease(final int partition) {
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (final String topic : topics) {
final TopicPartition topicPartition = new TopicPartition(topic, partition);
topicPartitions.add(topicPartition);
}

final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
consumer.assign(topicPartitions);

final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
return lease;
}

private void enqueueLeases(final int[] partitionsToConsume) {
if (partitionsToConsume == null) {
return;
}

for (final int partition : partitionsToConsume) {
final SimpleConsumerLease lease = createConsumerLease(partition);
pooledLeases.add(lease);
}
}

/**
* Exposed as protected method for easier unit testing
*
Expand Down

0 comments on commit 7dd0d8c

Please sign in to comment.