Skip to content

Commit

Permalink
KAFKA-9987: optimize sticky assignment algorithm for same-subscriptio…
Browse files Browse the repository at this point in the history
…n case (#8668)

Motivation and pseudo code algorithm in the ticket.

Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.

Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)

Should be cherry-picked to 2.6, 2.5, and 2.4

Reviewers: Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
A. Sophie Blee-Goldman committed Jun 1, 2020
1 parent 66fdb59 commit c6633a1
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 173 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>

<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor).java"/>

<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -62,16 +61,26 @@ protected MemberData memberData(Subscription subscription) {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);

final Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
adjustAssignment(subscriptions, assignments);
Map<TopicPartition, String> partitionsTransferringOwnership = super.partitionsTransferringOwnership == null ?
computePartitionsTransferringOwnership(subscriptions, assignments) :
super.partitionsTransferringOwnership;

adjustAssignment(assignments, partitionsTransferringOwnership);
return assignments;
}

// Following the cooperative rebalancing protocol requires removing partitions that must first be revoked from the assignment
private void adjustAssignment(final Map<String, Subscription> subscriptions,
final Map<String, List<TopicPartition>> assignments) {
private void adjustAssignment(Map<String, List<TopicPartition>> assignments,
Map<TopicPartition, String> partitionsTransferringOwnership) {
for (Map.Entry<TopicPartition, String> partitionEntry : partitionsTransferringOwnership.entrySet()) {
assignments.get(partitionEntry.getValue()).remove(partitionEntry.getKey());
}
}

private Map<TopicPartition, String> computePartitionsTransferringOwnership(Map<String, Subscription> subscriptions,
Map<String, List<TopicPartition>> assignments) {
Map<TopicPartition, String> allAddedPartitions = new HashMap<>();
Set<TopicPartition> allRevokedPartitions = new HashSet<>();

Expand All @@ -81,25 +90,20 @@ private void adjustAssignment(final Map<String, Subscription> subscriptions,
List<TopicPartition> ownedPartitions = subscriptions.get(consumer).ownedPartitions();
List<TopicPartition> assignedPartitions = entry.getValue();

List<TopicPartition> addedPartitions = new ArrayList<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
for (TopicPartition tp : addedPartitions) {
allAddedPartitions.put(tp, consumer);
Set<TopicPartition> ownedPartitionsSet = new HashSet<>(ownedPartitions);
for (TopicPartition tp : assignedPartitions) {
if (!ownedPartitionsSet.contains(tp))
allAddedPartitions.put(tp, consumer);
}

final Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
allRevokedPartitions.addAll(revokedPartitions);
}

// remove any partitions to be revoked from the current assignment
for (TopicPartition tp : allRevokedPartitions) {
// if partition is being migrated to another consumer, don't assign it there yet
if (allAddedPartitions.containsKey(tp)) {
String assignedConsumer = allAddedPartitions.get(tp);
assignments.get(assignedConsumer).remove(tp);
Set<TopicPartition> assignedPartitionsSet = new HashSet<>(assignedPartitions);
for (TopicPartition tp : ownedPartitions) {
if (!assignedPartitionsSet.contains(tp))
allRevokedPartitions.add(tp);
}
}
}

allAddedPartitions.keySet().retainAll(allRevokedPartitions);
return allAddedPartitions;
}
}
Loading

0 comments on commit c6633a1

Please sign in to comment.