Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8179: add public ConsumerPartitionAssignor interface #7108

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -29,9 +29,9 @@
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
Expand Down Expand Up @@ -2724,7 +2724,7 @@ void handleResponse(AbstractResponse abstractResponse) {
for (DescribedGroupMember groupMember : members) {
Set<TopicPartition> partitions = Collections.emptySet();
if (groupMember.memberAssignment().length > 0) {
final PartitionAssignor.Assignment assignment = ConsumerProtocol.
final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
partitions = new HashSet<>(assignment.partitions());
}
Expand Down
Expand Up @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
* <code>partition.assignment.strategy</code>
*/
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A custom assignor that implements ConsumerPartitionAssignor can be plugged in is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion from @guozhangwang to clarify which interface to implement. We do this for other configs (eg deserializer)


/**
* <code>auto.offset.reset</code>
Expand Down
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.util.Optional;

public class ConsumerGroupMetadata {
private String groupId;
private int generationId;
private String memberId;
Optional<String> groupInstanceId;

public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional<String> groupInstanceId) {
this.groupId = groupId;
this.generationId = generationId;
this.memberId = memberId;
this.groupInstanceId = groupInstanceId;
}

public String groupId() {
return groupId;
}

public int generationId() {
return generationId;
}

public String memberId() {
return memberId;
}

public Optional<String> groupInstanceId() {
return groupInstanceId;
}

}
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;

/**
* This interface is used to define custom partition assignment for use in
* {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
* to the topics they are interested in and forward their subscriptions to a Kafka broker serving
* as the group coordinator. The coordinator selects one member to perform the group assignment and
* propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called
* to perform the assignment and the results are forwarded back to each respective members
*
* In some cases, it is useful to forward additional metadata to the assignor in order to make
* assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
*/
public interface ConsumerPartitionAssignor {

/**
* Return serialized data that will be included in the {@link Subscription} sent to the leader
* and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
*
* @return optional join subscription user data
*/
default ByteBuffer subscriptionUserData(Set<String> topics) {
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

/**
* Perform the group assignment given the member subscriptions and current cluster metadata.
* @param metadata Current topic/broker metadata known by consumer
* @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
* @return A map from the members to their respective assignment. This should have one entry
ableegoldman marked this conversation as resolved.
Show resolved Hide resolved
* for each member in the input subscription map.
*/
GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);

/**
* Callback which is invoked when a group member receives its assignment from the leader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove which is

* @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
* @param metadata Additional metadata on the consumer (optional)
*/
default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
}

/**
* Indicate which rebalance protocol this assignor works with;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/protocol/protocols

* By default it should always work with {@link RebalanceProtocol#EAGER}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not needed.

*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}

/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}

/**
* Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
* to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
* @return non-null unique name
*/
String name();

final class Subscription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this class needs to be final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems safer, maybe in case users thought they could pass more data to assignor by extending this rather than as serialized subscriptionUserData ... doesn't need to be I guess

private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;

public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a new API, do we need all these constructors?

this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
}

public Subscription(List<String> topics, ByteBuffer userData) {
this(topics, userData, Collections.emptyList());
}

public Subscription(List<String> topics) {
this(topics, null, Collections.emptyList());
}

public List<String> topics() {
return topics;
}

public ByteBuffer userData() {
return userData;
}

public List<TopicPartition> ownedPartitions() {
return ownedPartitions;
}

public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}

public Optional<String> groupInstanceId() {
return groupInstanceId;
}
}

final class Assignment {
private List<TopicPartition> partitions;
private ByteBuffer userData;

public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions;
this.userData = userData;
}

public Assignment(List<TopicPartition> partitions) {
this(partitions, null);
}

public List<TopicPartition> partitions() {
return partitions;
}

public ByteBuffer userData() {
return userData;
}
}

final class GroupSubscription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the purposes of GroupSubscription and GroupAssignment classes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for possible future extensibility if we want to add new fields: with raw types (map) we'd have to change the signature of the API.

private final Map<String, Subscription> subscriptions;

public GroupSubscription(Map<String, Subscription> subscriptions) {
this.subscriptions = subscriptions;
}

public Map<String, Subscription> groupSubscription() {
return subscriptions;
}
}

final class GroupAssignment {
private final Map<String, Assignment> assignments;

public GroupAssignment(Map<String, Assignment> assignments) {
this.assignments = assignments;
}

public Map<String, Assignment> groupAssignment() {
return assignments;
}
}

enum RebalanceProtocol {
EAGER((byte) 0), COOPERATIVE((byte) 1);

private final byte id;

RebalanceProtocol(byte id) {
this.id = id;
}

public byte id() {
return id;
}

public static RebalanceProtocol forId(byte id) {
switch (id) {
case 0:
return EAGER;
case 1:
return COOPERATIVE;
default:
throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
}
}
}

}
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -581,7 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final long requestTimeoutMs;
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;
private List<ConsumerPartitionAssignor> assignors;

// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
Expand Down Expand Up @@ -768,7 +767,7 @@ else if (enableAutoCommit)
heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
ConsumerPartitionAssignor.class);

// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
Expand Down Expand Up @@ -833,7 +832,7 @@ else if (enableAutoCommit)
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
List<PartitionAssignor> assignors,
List<ConsumerPartitionAssignor> assignors,
String groupId) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
Expand Down
Expand Up @@ -365,18 +365,17 @@ private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptio
}

@Override
public void onAssignment(Assignment assignment, int generation) {
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = generation;
this.generation = metadata.generationId();
}

@Override
public Subscription subscription(Set<String> topics) {
public ByteBuffer subscriptionUserData(Set<String> topics) {
if (memberAssignment == null)
return new Subscription(new ArrayList<>(topics));
return null;

return new Subscription(new ArrayList<>(topics),
serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation))));
return serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)));
}

@Override
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
Expand All @@ -33,7 +34,7 @@
* Abstract assignor implementation which does some common grunt work (in particular collecting
* partition counts which are always needed in assignors).
*/
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor {
private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);

/**
Expand All @@ -47,12 +48,8 @@ public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> pa
Map<String, Subscription> subscriptions);

@Override
public Subscription subscription(Set<String> topics) {
return new Subscription(new ArrayList<>(topics));
}

@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) {
Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
Set<String> allSubscribedTopics = new HashSet<>();
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
Expand All @@ -72,12 +69,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
return assignments;
}

@Override
public void onAssignment(Assignment assignment) {
// this assignor maintains no internal state, so nothing to do
return new GroupAssignment(assignments);
}

protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
Expand Down