From 0da02dfcd14d33be03d0c5a6a6d6ce8136cae6d3 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 14:09:43 -0700 Subject: [PATCH 01/11] new ConsumerPartitionAssignor interface --- .../kafka/clients/admin/KafkaAdminClient.java | 4 +- .../consumer/ConsumerPartitionAssignor.java | 234 ++++++++++++++++++ .../kafka/clients/consumer/KafkaConsumer.java | 7 +- .../clients/consumer/StickyAssignor.java | 11 +- .../internals/AbstractPartitionAssignor.java | 13 +- .../internals/ConsumerCoordinator.java | 76 ++---- .../consumer/internals/ConsumerProtocol.java | 95 +++---- .../consumer/internals/PartitionAssignor.java | 142 +---------- .../consumer/internals/SubscriptionState.java | 8 + .../clients/admin/KafkaAdminClientTest.java | 6 +- .../clients/consumer/KafkaConsumerTest.java | 70 +++--- .../clients/consumer/RangeAssignorTest.java | 2 +- .../consumer/RoundRobinAssignorTest.java | 2 +- .../clients/consumer/StickyAssignorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 31 +-- .../internals/ConsumerProtocolTest.java | 38 +-- .../group/GroupMetadataManagerTest.scala | 2 +- .../internals/StreamsPartitionAssignor.java | 17 +- .../StreamsPartitionAssignorTest.java | 122 ++++----- .../streams/tests/StreamsUpgradeTest.java | 18 +- 20 files changed, 465 insertions(+), 435 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 349fc2a4d3d4..f2ee21e9da61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; @@ -2724,7 +2724,7 @@ void handleResponse(AbstractResponse abstractResponse) { for (DescribedGroupMember groupMember : members) { Set partitions = Collections.emptySet(); if (groupMember.memberAssignment().length > 0) { - final PartitionAssignor.Assignment assignment = ConsumerProtocol. + final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol. deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java new file mode 100644 index 000000000000..e4455387d1da --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; + +/** + * This interface is used to define custom partition assignment for use in + * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe + * to the topics they are interested in and forward their subscriptions to a Kafka broker serving + * as the group coordinator. The coordinator selects one member to perform the group assignment and + * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called + * to perform the assignment and the results are forwarded back to each respective members + * + * In some cases, it is useful to forward additional metadata to the assignor in order to make + * assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom + * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation + * can use this user data to forward the rackId belonging to each member. + */ +public interface ConsumerPartitionAssignor { + + /** + * Return serialized data that will be included in the serializable subscription object sent in the + * joinGroup and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) + * + * @return Non-null optional join subscription user data + */ + default ByteBuffer subscriptionUserData(Set topics) { + return null; + } + + /** + * Perform the group assignment given the member subscriptions and current cluster metadata. + * @param metadata Current topic/broker metadata known by consumer + * @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)} + * @return A map from the members to their respective assignment. This should have one entry + * for all members who in the input subscription map. + */ + GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions); + + /** + * Callback which is invoked when a group member receives its assignment from the leader. + * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)} + */ + void onAssignment(Assignment assignment); + + /** + * Callback which is invoked when a group member receives its assignment from the leader. + * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)} + * @param metadata Additional metadata on the consumer (optional) + */ + default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + onAssignment(assignment); + } + + /** + * Indicate which rebalance protocol this assignor works with; + * By default it should always work with {@link RebalanceProtocol#EAGER}. + */ + default List supportedProtocols() { + return Collections.singletonList(RebalanceProtocol.EAGER); + } + + /** + * Return the version of the assignor which indicates how the user metadata encodings + * and the assignment algorithm gets evolved. + */ + default short version() { + return (short) 0; + } + + /** + * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") + * @return non-null unique name + */ + String name(); + + final class Subscription { + private final List topics; + private final ByteBuffer userData; + private final List ownedPartitions; + private Optional groupInstanceId; + + public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { + this.topics = topics; + this.userData = userData; + this.ownedPartitions = ownedPartitions; + this.groupInstanceId = Optional.empty(); + } + + public Subscription(List topics, ByteBuffer userData) { + this(topics, userData, Collections.emptyList()); + } + + public Subscription(List topics) { + this(topics, null, Collections.emptyList()); + } + + public List topics() { + return topics; + } + + public ByteBuffer userData() { + return userData; + } + + public List ownedPartitions() { + return ownedPartitions; + } + + public void setGroupInstanceId(Optional groupInstanceId) { + this.groupInstanceId = groupInstanceId; + } + + public Optional groupInstanceId() { + return groupInstanceId; + } + } + + final class Assignment { + private List partitions; + private ByteBuffer userData; + + public Assignment(List partitions, ByteBuffer userData) { + this.partitions = partitions; + this.userData = userData; + } + + public Assignment(List partitions) { + this(partitions, null); + } + + public List partitions() { + return partitions; + } + + public ByteBuffer userData() { + return userData; + } + } + + class ConsumerGroupMetadata { + private String groupId; + private int generationId; + private String memberId; + private Optional groupInstanceId; + + public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { + this.groupId = groupId; + this.generationId = generationId; + this.memberId = memberId; + this.groupInstanceId = groupInstanceId; + } + + public ConsumerGroupMetadata(int generationId) { + this(null, generationId, null, Optional.empty()); + } + + public int generationId() { + return generationId; + } + } + + final class GroupSubscription { + private final Map subscriptions; + + public GroupSubscription(Map subscriptions) { + this.subscriptions = subscriptions; + } + + public Map groupSubscription() { + return subscriptions; + } + } + + final class GroupAssignment { + private final Map assignments; + + public GroupAssignment(Map assignments) { + this.assignments = assignments; + } + + public Map groupAssignment() { + return assignments; + } + } + + enum RebalanceProtocol { + EAGER((byte) 0), COOPERATIVE((byte) 1); + + private final byte id; + + RebalanceProtocol(byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static RebalanceProtocol forId(byte id) { + switch (id) { + case 0: + return EAGER; + case 1: + return COOPERATIVE; + default: + throw new IllegalArgumentException("Unknown rebalance protocol id: " + id); + } + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fa5cc99a1887..30944b330da6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -581,7 +580,7 @@ public class KafkaConsumer implements Consumer { private final long requestTimeoutMs; private final int defaultApiTimeoutMs; private volatile boolean closed = false; - private List assignors; + private List assignors; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -768,7 +767,7 @@ else if (enableAutoCommit) heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, - PartitionAssignor.class); + ConsumerPartitionAssignor.class); // no coordinator will be constructed for the default (null) group id this.coordinator = groupId == null ? null : @@ -833,7 +832,7 @@ else if (enableAutoCommit) long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, - List assignors, + List assignors, String groupId) { this.log = logContext.logger(getClass()); this.clientId = clientId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 3c7d010f5f7a..3311cd890940 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -365,18 +365,17 @@ private void prepopulateCurrentAssignments(Map subscriptio } @Override - public void onAssignment(Assignment assignment, int generation) { + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { memberAssignment = assignment.partitions(); - this.generation = generation; + this.generation = metadata.generationId(); } @Override - public Subscription subscription(Set topics) { + public ByteBuffer subscriptionUserData(Set topics) { if (memberAssignment == null) - return new Subscription(new ArrayList<>(topics)); + return null; - return new Subscription(new ArrayList<>(topics), - serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)))); + return serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation))); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java index 2487daa4eaa5..ae047a1cd156 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -33,7 +34,7 @@ * Abstract assignor implementation which does some common grunt work (in particular collecting * partition counts which are always needed in assignors). */ -public abstract class AbstractPartitionAssignor implements PartitionAssignor { +public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor { private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class); /** @@ -47,12 +48,8 @@ public abstract Map> assign(Map pa Map subscriptions); @Override - public Subscription subscription(Set topics) { - return new Subscription(new ArrayList<>(topics)); - } - - @Override - public Map assign(Cluster metadata, Map subscriptions) { + public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) { + Map subscriptions = groupSubscriptions.groupSubscription(); Set allSubscribedTopics = new HashSet<>(); for (Map.Entry subscriptionEntry : subscriptions.entrySet()) allSubscribedTopics.addAll(subscriptionEntry.getValue().topics()); @@ -72,7 +69,7 @@ public Map assign(Cluster metadata, Map assignments = new HashMap<>(); for (Map.Entry> assignmentEntry : rawAssignments.entrySet()) assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); - return assignments; + return new GroupAssignment(assignments); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4866986d37e8..156bf5fe3ad3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -18,13 +18,16 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -78,7 +81,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final GroupRebalanceConfig rebalanceConfig; private final Logger log; - private final List assignors; + private final List assignors; private final ConsumerMetadata metadata; private final ConsumerCoordinatorMetrics sensors; private final SubscriptionState subscriptions; @@ -128,7 +131,7 @@ private boolean sameRequest(final Set currentRequest, final Gene public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, LogContext logContext, ConsumerNetworkClient client, - List assignors, + List assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, Metrics metrics, @@ -170,13 +173,13 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, if (!assignors.isEmpty()) { List supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols()); - for (PartitionAssignor assignor : assignors) { + for (ConsumerPartitionAssignor assignor : assignors) { supportedProtocols.retainAll(assignor.supportedProtocols()); } if (supportedProtocols.isEmpty()) { throw new IllegalArgumentException("Specified assignors " + - assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) + + assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) + " do not have commonly supported rebalance protocol"); } @@ -201,8 +204,10 @@ protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { this.joinedSubscription = subscriptions.subscription(); JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); - for (PartitionAssignor assignor : assignors) { - Subscription subscription = assignor.subscription(joinedSubscription); + for (ConsumerPartitionAssignor assignor : assignors) { + Subscription subscription = new Subscription(new ArrayList<>(joinedSubscription), + assignor.subscriptionUserData(joinedSubscription), + subscriptions.assignedPartitionsList()); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol() @@ -220,8 +225,8 @@ public void updatePatternSubscription(Cluster cluster) { metadata.requestUpdateForNewTopics(); } - private PartitionAssignor lookupAssignor(String name) { - for (PartitionAssignor assignor : this.assignors) { + private ConsumerPartitionAssignor lookupAssignor(String name) { + for (ConsumerPartitionAssignor assignor : this.assignors) { if (assignor.name().equals(name)) return assignor; } @@ -261,7 +266,7 @@ protected void onJoinComplete(int generation, if (!isLeader) assignmentSnapshot = null; - PartitionAssignor assignor = lookupAssignor(assignmentStrategy); + ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); @@ -285,7 +290,8 @@ protected void onJoinComplete(int generation, maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment - assignor.onAssignment(assignment, generation); + ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(generation); + assignor.onAssignment(assignment, metadata); // reschedule the auto commit starting from now if (autoCommitEnabled) @@ -314,10 +320,6 @@ protected void onJoinComplete(int generation, case COOPERATIVE: assignAndRevoke(listener, assignedPartitions, ownedPartitions); - if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) { - requestRejoin(); - } - break; } @@ -470,7 +472,7 @@ private void updateGroupSubscription(Set topics) { protected Map performAssignment(String leaderId, String assignmentStrategy, List allSubscriptions) { - PartitionAssignor assignor = lookupAssignor(assignmentStrategy); + ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); @@ -494,14 +496,14 @@ protected Map performAssignment(String leaderId, log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions); - Map assignments = assignor.assign(metadata.fetch(), subscriptions); + Map assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment(); switch (protocol) { case EAGER: break; case COOPERATIVE: - adjustAssignment(ownedPartitions, assignments); + // TODO need to validate assignment -- make sure no partitions to be revoked were also assigned during this rebalance break; } @@ -547,40 +549,6 @@ protected Map performAssignment(String leaderId, return groupAssignment; } - private void adjustAssignment(final Map ownedPartitions, - final Map assignments) { - boolean revocationsNeeded = false; - Set assignedPartitions = new HashSet<>(); - for (final Map.Entry entry : assignments.entrySet()) { - final Assignment assignment = entry.getValue(); - assignedPartitions.addAll(assignment.partitions()); - - // update the assignment if the partition is owned by another different owner - List updatedPartitions = assignment.partitions().stream() - .filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp))) - .collect(Collectors.toList()); - if (!updatedPartitions.equals(assignment.partitions())) { - assignment.updatePartitions(updatedPartitions); - revocationsNeeded = true; - } - } - - // for all owned but not assigned partitions, blindly add them to assignment - for (final Map.Entry entry : ownedPartitions.entrySet()) { - final TopicPartition tp = entry.getKey(); - if (!assignedPartitions.contains(tp)) { - assignments.get(entry.getValue()).partitions().add(tp); - } - } - - // if revocations are triggered, tell everyone to re-join immediately. - if (revocationsNeeded) { - for (final Assignment assignment : assignments.values()) { - assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN); - } - } - } - @Override protected void onJoinPrepare(int generation, String memberId) { // commit offsets prior to rebalance if auto-commit enabled diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index d05d5b06906a..e852e62cd46b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.Collections; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -49,7 +52,6 @@ * Topic => String * Partitions => [int32] * UserData => Bytes - * ErrorCode => [int16] * * * Version 0 format: @@ -85,11 +87,11 @@ public class ConsumerProtocol { public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions"; public static final String USER_DATA_KEY_NAME = "user_data"; - public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code"); - public static final short CONSUMER_PROTOCOL_V0 = 0; public static final short CONSUMER_PROTOCOL_V1 = 1; + public static final short CONSUMER_PROTOCL_LATEST_VERSION = CONSUMER_PROTOCOL_V1; + public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( new Field(VERSION_KEY_NAME, Type.INT16)); private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) @@ -116,36 +118,9 @@ public class ConsumerProtocol { public static final Schema ASSIGNMENT_V1 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES), - ERROR_CODE); - - public enum AssignmentError { - NONE(0), - NEED_REJOIN(1); + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); - private final short code; - - AssignmentError(final int code) { - this.code = (short) code; - } - - public short code() { - return code; - } - - public static AssignmentError fromCode(final short code) { - switch (code) { - case 0: - return NONE; - case 1: - return NEED_REJOIN; - default: - throw new IllegalArgumentException("Unknown error code: " + code); - } - } - } - - public static ByteBuffer serializeSubscriptionV0(PartitionAssignor.Subscription subscription) { + public static ByteBuffer serializeSubscriptionV0(Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); struct.set(USER_DATA_KEY_NAME, subscription.userData()); struct.set(TOPICS_KEY_NAME, subscription.topics().toArray()); @@ -157,7 +132,7 @@ public static ByteBuffer serializeSubscriptionV0(PartitionAssignor.Subscription return buffer; } - public static ByteBuffer serializeSubscriptionV1(PartitionAssignor.Subscription subscription) { + public static ByteBuffer serializeSubscriptionV1(Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V1); struct.set(USER_DATA_KEY_NAME, subscription.userData()); struct.set(TOPICS_KEY_NAME, subscription.topics().toArray()); @@ -178,8 +153,12 @@ public static ByteBuffer serializeSubscriptionV1(PartitionAssignor.Subscription return buffer; } - public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { - switch (subscription.version()) { + public static ByteBuffer serializeSubscription(Subscription subscription) { + return serializeSubscription(subscription, CONSUMER_PROTOCL_LATEST_VERSION); + } + + public static ByteBuffer serializeSubscription(Subscription subscription, short version) { + switch (version) { case CONSUMER_PROTOCOL_V0: return serializeSubscriptionV0(subscription); @@ -192,17 +171,17 @@ public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription su } } - public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) { + public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) { Struct struct = SUBSCRIPTION_V0.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); for (Object topicObj : struct.getArray(TOPICS_KEY_NAME)) topics.add((String) topicObj); - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData); + return new Subscription(topics, userData, Collections.emptyList()); } - public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) { + public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) { Struct struct = SUBSCRIPTION_V1.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); @@ -218,10 +197,10 @@ public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffe } } - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); + return new Subscription(topics, userData, ownedPartitions); } - public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) { + public static Subscription deserializeSubscription(ByteBuffer buffer) { Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); Short version = header.getShort(VERSION_KEY_NAME); @@ -241,7 +220,7 @@ public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer } } - public static ByteBuffer serializeAssignmentV0(PartitionAssignor.Assignment assignment) { + public static ByteBuffer serializeAssignmentV0(Assignment assignment) { Struct struct = new Struct(ASSIGNMENT_V0); struct.set(USER_DATA_KEY_NAME, assignment.userData()); List topicAssignments = new ArrayList<>(); @@ -261,7 +240,7 @@ public static ByteBuffer serializeAssignmentV0(PartitionAssignor.Assignment assi return buffer; } - public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assignment) { + public static ByteBuffer serializeAssignmentV1(Assignment assignment) { Struct struct = new Struct(ASSIGNMENT_V1); struct.set(USER_DATA_KEY_NAME, assignment.userData()); List topicAssignments = new ArrayList<>(); @@ -273,7 +252,6 @@ public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assi topicAssignments.add(topicAssignment); } struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); - struct.set(ERROR_CODE, assignment.error().code); ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct)); CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer); @@ -282,8 +260,12 @@ public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assi return buffer; } - public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) { - switch (assignment.version()) { + public static ByteBuffer serializeAssignment(Assignment assignment) { + return serializeAssignment(assignment, CONSUMER_PROTOCL_LATEST_VERSION); + } + + public static ByteBuffer serializeAssignment(Assignment assignment, short version) { + switch (version) { case CONSUMER_PROTOCOL_V0: return serializeAssignmentV0(assignment); @@ -296,7 +278,7 @@ public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assign } } - public static PartitionAssignor.Assignment deserializeAssignmentV0(ByteBuffer buffer) { + public static Assignment deserializeAssignmentV0(ByteBuffer buffer) { Struct struct = ASSIGNMENT_V0.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List partitions = new ArrayList<>(); @@ -307,27 +289,14 @@ public static PartitionAssignor.Assignment deserializeAssignmentV0(ByteBuffer bu partitions.add(new TopicPartition(topic, (Integer) partitionObj)); } } - return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V0, partitions, userData); + return new Assignment(partitions, userData); } - public static PartitionAssignor.Assignment deserializeAssignmentV1(ByteBuffer buffer) { - Struct struct = ASSIGNMENT_V1.read(buffer); - ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); - List partitions = new ArrayList<>(); - for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) { - Struct assignment = (Struct) structObj; - String topic = assignment.getString(TOPIC_KEY_NAME); - for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) { - partitions.add(new TopicPartition(topic, (Integer) partitionObj)); - } - } - - AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE)); - - return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error); + public static Assignment deserializeAssignmentV1(ByteBuffer buffer) { + return deserializeAssignmentV0(buffer); } - public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) { + public static Assignment deserializeAssignment(ByteBuffer buffer) { Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); Short version = header.getShort(VERSION_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java index c26f68462ea4..b3f2ada5c419 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -18,18 +18,12 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.SchemaException; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0; -import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1; - /** * This interface is used to define custom partition assignment for use in * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe @@ -43,6 +37,7 @@ * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation * can use this user data to forward the rackId belonging to each member. */ +@Deprecated public interface PartitionAssignor { /** @@ -79,21 +74,6 @@ default void onAssignment(Assignment assignment, int generation) { onAssignment(assignment); } - /** - * Indicate which rebalance protocol this assignor works with; - * By default it should always work with {@link RebalanceProtocol#EAGER}. - */ - default List supportedProtocols() { - return Collections.singletonList(RebalanceProtocol.EAGER); - } - - /** - * Return the version of the assignor which indicates how the user metadata encodings - * and the assignment algorithm gets evolved. - */ - default short version() { - return (short) 0; - } /** * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") @@ -101,156 +81,52 @@ default short version() { */ String name(); - enum RebalanceProtocol { - EAGER((byte) 0), COOPERATIVE((byte) 1); - - private final byte id; - - RebalanceProtocol(byte id) { - this.id = id; - } - - public byte id() { - return id; - } - - public static RebalanceProtocol forId(byte id) { - switch (id) { - case 0: - return EAGER; - case 1: - return COOPERATIVE; - default: - throw new IllegalArgumentException("Unknown rebalance protocol id: " + id); - } - } - } - class Subscription { - private final Short version; private final List topics; private final ByteBuffer userData; - private final List ownedPartitions; - private Optional groupInstanceId; - Subscription(Short version, - List topics, - ByteBuffer userData, - List ownedPartitions) { - this.version = version; + public Subscription(List topics, ByteBuffer userData) { this.topics = topics; this.userData = userData; - this.ownedPartitions = ownedPartitions; - this.groupInstanceId = Optional.empty(); - - if (version < CONSUMER_PROTOCOL_V0) - throw new SchemaException("Unsupported subscription version: " + version); - - if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty()) - throw new IllegalArgumentException("Subscription version smaller than 1 should not have owned partitions"); - } - - Subscription(Short version, List topics, ByteBuffer userData) { - this(version, topics, userData, Collections.emptyList()); - } - - public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { - this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); - } - - public Subscription(List topics, ByteBuffer userData) { - this(CONSUMER_PROTOCOL_V1, topics, userData); } public Subscription(List topics) { this(topics, ByteBuffer.wrap(new byte[0])); } - Short version() { - return version; - } - public List topics() { return topics; } - public List ownedPartitions() { - return ownedPartitions; - } - public ByteBuffer userData() { return userData; } - public void setGroupInstanceId(Optional groupInstanceId) { - this.groupInstanceId = groupInstanceId; - } - - public Optional groupInstanceId() { - return groupInstanceId; - } - @Override public String toString() { return "Subscription(" + - "version=" + version + - ", topics=" + topics + - ", ownedPartitions=" + ownedPartitions + - ", group.instance.id=" + groupInstanceId + ")"; + "topics=" + topics + + ')'; } } class Assignment { - private final Short version; - private List partitions; + private final List partitions; private final ByteBuffer userData; - private ConsumerProtocol.AssignmentError error; - Assignment(Short version, List partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) { - this.version = version; + public Assignment(List partitions, ByteBuffer userData) { this.partitions = partitions; this.userData = userData; - this.error = error; - - if (version < CONSUMER_PROTOCOL_V0) - throw new SchemaException("Unsupported subscription version: " + version); - - if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE) - throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code."); - } - - Assignment(Short version, List partitions, ByteBuffer userData) { - this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE); - } - - public Assignment(List partitions, ByteBuffer userData) { - this(CONSUMER_PROTOCOL_V1, partitions, userData); } public Assignment(List partitions) { this(partitions, ByteBuffer.wrap(new byte[0])); } - Short version() { - return version; - } - public List partitions() { return partitions; } - public ConsumerProtocol.AssignmentError error() { - return error; - } - - public void updatePartitions(List partitions) { - this.partitions = partitions; - } - - public void setError(ConsumerProtocol.AssignmentError error) { - this.error = error; - } - public ByteBuffer userData() { return userData; } @@ -258,10 +134,8 @@ public ByteBuffer userData() { @Override public String toString() { return "Assignment(" + - "version=" + version + - ", partitions=" + partitions + - ", error=" + error + - ')'; + "partitions=" + partitions + + ')'; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 3f1cf98be9ee..af834ce71b1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.ArrayList; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; @@ -387,6 +388,13 @@ public synchronized Set assignedPartitions() { return new HashSet<>(this.assignment.partitionSet()); } + /** + * @return a modifiable copy of the currently assigned partitions as a list + */ + public synchronized List assignedPartitionsList() { + return new ArrayList<>(this.assignment.partitionSet()); + } + /** * Provides the number of assigned partitions in a thread safe manner. * @return the number of assigned partitions. diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 711c8f92de84..769f58c3caf4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -21,9 +21,9 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; @@ -1222,7 +1222,7 @@ public void testDescribeConsumerGroups() throws Exception { topicPartitions.add(1, myTopicPartition1); topicPartitions.add(2, myTopicPartition2); - final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); @@ -1282,7 +1282,7 @@ public void testDescribeMultipleConsumerGroups() throws Exception { topicPartitions.add(1, myTopicPartition1); topicPartitions.add(2, myTopicPartition2); - final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c1adf1932ec6..ce9931a1c6e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; @@ -28,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -396,7 +396,7 @@ public void verifyHeartbeatSent() throws Exception { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -430,7 +430,7 @@ public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -465,7 +465,7 @@ public void verifyPollTimesOutDuringMetadataUpdate() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -489,7 +489,7 @@ public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -512,7 +512,7 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singleton(tp0)); @@ -587,7 +587,7 @@ public void testMissingOffsetNoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -611,7 +611,7 @@ public void testResetToCommittedOffset() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -636,7 +636,7 @@ public void testResetUsingAutoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -663,7 +663,7 @@ public void testOffsetIsValidAfterSeek() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, Optional.empty()); @@ -686,7 +686,7 @@ public void testCommitsFetchedDuringAssign() { initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -724,7 +724,7 @@ public void testAutoCommitSentBeforePositionUpdate() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -764,7 +764,7 @@ public void testRegexSubscription() { initMetadata(client, partitionCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); @@ -782,7 +782,7 @@ public void testRegexSubscription() { @Test public void testChangingRegexSubscription() { - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); @@ -828,7 +828,7 @@ public void testWakeupWithFetchDataAvailable() throws Exception { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -878,7 +878,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -908,7 +908,7 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); @@ -948,7 +948,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1062,7 +1062,7 @@ public void testSubscriptionChangesWithAutoCommitDisabled() { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1124,7 +1124,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1180,7 +1180,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1234,7 +1234,7 @@ public void testOffsetOfPausedPartitions() { initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1429,7 +1429,7 @@ public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -1457,7 +1457,7 @@ public boolean matches(AbstractRequest body) { coordinator); // join group - final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic))); + final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(singletonList(topic))); // This member becomes the leader String memberId = "memberId"; @@ -1512,7 +1512,7 @@ private void consumerCloseTest(final long closeTimeoutMs, initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty()); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -1649,7 +1649,7 @@ private KafkaConsumer consumerWithPendingAuthenticationError() { initMetadata(client, singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); client.createPendingAuthenticationError(node, 0); return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1675,7 +1675,7 @@ private ConsumerMetadata createMetadata(SubscriptionState subscription) { subscription, new LogContext(), new ClusterResourceListeners()); } - private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, PartitionAssignor assignor, List partitions, Node coordinator) { + private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, ConsumerPartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); @@ -1692,7 +1692,7 @@ public boolean matches(AbstractRequest body) { assertTrue(protocolIterator.hasNext()); ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata); return subscribedTopics.equals(new HashSet<>(subscription.topics())); } }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); @@ -1703,7 +1703,7 @@ public boolean matches(AbstractRequest body) { return coordinator; } - private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List partitions, Node coordinator) { + private Node prepareRebalance(MockClient client, Node node, ConsumerPartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); @@ -1764,7 +1764,7 @@ private OffsetCommitResponse offsetCommitResponse(Map re return new OffsetCommitResponse(responseData); } - private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { + private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse( new JoinGroupResponseData() .setErrorCode(error.code()) @@ -1777,7 +1777,7 @@ private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, } private SyncGroupResponse syncGroupResponse(List partitions, Errors error) { - ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions)); return new SyncGroupResponse( new SyncGroupResponseData() .setErrorCode(error.code()) @@ -1848,7 +1848,7 @@ private KafkaConsumer newConsumer(Time time, KafkaClient client, SubscriptionState subscription, ConsumerMetadata metadata, - PartitionAssignor assignor, + ConsumerPartitionAssignor assignor, boolean autoCommitEnabled, Optional groupInstanceId) { return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId); @@ -1865,7 +1865,7 @@ private KafkaConsumer newConsumer(Time time, KafkaClient client, SubscriptionState subscription, ConsumerMetadata metadata, - PartitionAssignor assignor, + ConsumerPartitionAssignor assignor, boolean autoCommitEnabled, String groupId, Optional groupInstanceId) { @@ -1885,7 +1885,7 @@ private KafkaConsumer newConsumer(Time time, Deserializer keyDeserializer = new StringDeserializer(); Deserializer valueDeserializer = new StringDeserializer(); - List assignors = singletonList(assignor); + List assignors = singletonList(assignor); ConsumerInterceptors interceptors = new ConsumerInterceptors<>(Collections.emptyList()); Metrics metrics = new Metrics(); @@ -1985,7 +1985,7 @@ public void testSubscriptionOnInvalidTopic() { initMetadata(client, Collections.singletonMap(topic, 1)); Cluster cluster = metadata.fetch(); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); String invalidTopicName = "topic abc"; // Invalid topic name due to space diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java index f08ca144bf65..118e60a3d128 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.consumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java index fa6840649a16..02fb9ffba403 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 89f0d37ff0cc..6dd306200006 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -30,7 +30,7 @@ import java.util.Set; import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a0878fb8899f..a81e73e022df 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest { private final MockTime time = new MockTime(); private GroupRebalanceConfig rebalanceConfig; - private final PartitionAssignor.RebalanceProtocol protocol; + private final ConsumerPartitionAssignor.RebalanceProtocol protocol; private final MockPartitionAssignor partitionAssignor; - private final List assignors; + private final List assignors; private MockClient client; private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap() { { @@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest { private MockCommitCallback mockOffsetCommitCallback; private ConsumerCoordinator coordinator; - public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) { + public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) { this.protocol = protocol; this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)); this.assignors = Collections.singletonList(partitionAssignor); @@ -153,7 +154,7 @@ public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protoco @Parameterized.Parameters(name = "rebalance protocol = {0}") public static Collection data() { final List values = new ArrayList<>(); - for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) { + for (final ConsumerPartitionAssignor.RebalanceProtocol protocol: ConsumerPartitionAssignor.RebalanceProtocol.values()) { values.add(new Object[]{protocol}); } return values; @@ -198,20 +199,20 @@ public void teardown() { @Test public void testSelectRebalanceProtcol() { - List assignors = new ArrayList<>(); - assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER))); - assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE))); + List assignors = new ArrayList<>(); + assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER))); + assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); // no commonly supported protocols assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)); assignors.clear(); - assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE))); - assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE))); + assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); + assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); // select higher indexed (more advanced) protocols try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) { - assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol()); + assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol()); } } @@ -553,7 +554,7 @@ public boolean matches(AbstractRequest body) { final int addCount = 1; // with eager protocol we will call revoke on the old assignment as well - if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) { + if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) { revokeCount += 1; } @@ -670,7 +671,7 @@ public boolean matches(AbstractRequest body) { JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next(); ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata); metadata.rewind(); return subscription.topics().containsAll(updatedSubscription); } @@ -2326,7 +2327,7 @@ public boolean matches(AbstractRequest body) { private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig, final Metrics metrics, - final List assignors, + final List assignors, final boolean autoCommitEnabled) { return new ConsumerCoordinator( rebalanceConfig, @@ -2385,7 +2386,7 @@ private JoinGroupResponse joinGroupLeaderResponse(int generationId, Errors error) { List metadata = new ArrayList<>(); for (Map.Entry> subscriptionEntry : subscriptions.entrySet()) { - PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue()); + ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue()); ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription); metadata.add(new JoinGroupResponseData.JoinGroupResponseMember() .setMemberId(subscriptionEntry.getKey()) @@ -2416,7 +2417,7 @@ private JoinGroupResponse joinGroupFollowerResponse(int generationId, String mem } private SyncGroupResponse syncGroupResponse(List partitions, Errors error) { - ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions)); return new SyncGroupResponse( new SyncGroupResponseData() .setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 9e601b034e52..3e25ac8dabf5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -85,8 +85,8 @@ public void serializeDeserializeNullSubscriptionUserData() { @Test public void deserializeOldSubscriptionVersion() { - Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null); - ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null); + ByteBuffer buffer = ConsumerProtocol.serializeSubscriptionV0(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(parsedSubscription.topics(), parsedSubscription.topics()); assertNull(parsedSubscription.userData()); @@ -95,7 +95,7 @@ public void deserializeOldSubscriptionVersion() { @Test public void deserializeNewSubscriptionWithOldVersion() { - Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0 Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); @@ -160,29 +160,6 @@ public void deserializeNullAssignmentUserData() { assertNull(parsedAssignment.userData()); } - @Test - public void deserializeOldAssignmentVersion() { - List partitions = Arrays.asList(tp1, tp2); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 0, partitions, null)); - Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); - assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); - assertNull(parsedAssignment.userData()); - assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error()); - } - - @Test - public void deserializeNewAssignmentWithOldVersion() { - List partitions = Collections.singletonList(tp1); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN)); - // ignore the version assuming it is the old byte code, as it will blindly deserialize as 0 - Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); - header.getShort(VERSION_KEY_NAME); - Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer); - assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); - assertNull(parsedAssignment.userData()); - assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error()); - } - @Test public void deserializeFutureAssignmentVersion() { // verify that a new version which adds a field is still parseable @@ -191,7 +168,6 @@ public void deserializeFutureAssignmentVersion() { Schema assignmentSchemaV100 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), new Field(USER_DATA_KEY_NAME, Type.BYTES), - ERROR_CODE, new Field("foo", Type.STRING)); Struct assignmentV100 = new Struct(assignmentSchemaV100); @@ -200,7 +176,6 @@ public void deserializeFutureAssignmentVersion() { .set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic()) .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})}); assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0])); - assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code()); assignmentV100.set("foo", "bar"); Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA); @@ -212,8 +187,7 @@ public void deserializeFutureAssignmentVersion() { buffer.flip(); - PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); + Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions())); - assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error()); } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 6761b0cad94c..4e0671667790 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManag import kafka.server.HostedPartition import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.internals.ConsumerProtocol -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 714bb0ea2104..7e07a5365699 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -16,8 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import java.nio.ByteBuffer; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; @@ -56,7 +57,7 @@ import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; -public class StreamsPartitionAssignor implements PartitionAssignor, Configurable { +public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable { final static int UNKNOWN = -1; private final static int VERSION_ONE = 1; @@ -309,7 +310,7 @@ public String name() { } @Override - public Subscription subscription(final Set topics) { + public ByteBuffer subscriptionUserData(Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -327,7 +328,7 @@ public Subscription subscription(final Set topics) { taskManager.updateSubscriptionsFromMetadata(topics); - return new Subscription(new ArrayList<>(topics), data.encode()); + return data.encode(); } private Map errorAssignment(final Map clientsMetadata, @@ -371,8 +372,8 @@ private Map errorAssignment(final Map * 3. within each client, tasks are assigned to consumer clients in round-robin manner. */ @Override - public Map assign(final Cluster metadata, - final Map subscriptions) { + public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) { + final Map subscriptions = groupSubscriptions.groupSubscription(); // construct the client metadata from the decoded subscription info final Map clientMetadataMap = new HashMap<>(); final Set futureConsumers = new HashSet<>(); @@ -446,7 +447,7 @@ public Map assign(final Cluster metadata, !metadata.topics().contains(topic)) { log.error("Missing source topic {} durign assignment. Returning error {}.", topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); - return errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); + return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code)); } } for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { @@ -644,7 +645,7 @@ public Map assign(final Cluster metadata, assignment = computeNewAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); } - return assignment; + return new GroupAssignment(assignment); } private Map computeNewAssignment(final Map clientsMetadata, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 5fa6653ba3ac..5d5611acd735 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,7 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -146,7 +147,7 @@ private void mockTaskManager(final Set prevTasks, EasyMock.replay(taskManager); } - private Map subscriptions; + private Map subscriptions; @Before public void setUp() { @@ -200,7 +201,9 @@ public void testSubscription() { mockTaskManager(prevTasks, cachedTasks, processId, builder); configurePartitionAssignor(Collections.emptyMap()); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); + + final Set topics = Utils.mkSet("topic1", "topic2"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); @@ -236,16 +239,16 @@ public void testAssignBasic() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), @@ -320,13 +323,13 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(localMetadata, subscriptions); + final Map assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)), @@ -365,10 +368,10 @@ public void testAssignWithPartialTopology() { // will throw exception if it fails subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode() )); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assignment info final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); @@ -399,12 +402,12 @@ public void testAssignEmptyMetadata() { configurePartitionAssignor(Collections.emptyMap()); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode() )); // initially metadata is empty - Map assignments = partitionAssignor.assign(emptyMetadata, subscriptions); + Map assignments = partitionAssignor.assign(emptyMetadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Collections.emptySet(), @@ -417,7 +420,7 @@ public void testAssignEmptyMetadata() { assertEquals(0, allActiveTasks.size()); // then metadata gets populated - assignments = partitionAssignor.assign(metadata, subscriptions); + assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)), Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()))); @@ -455,16 +458,16 @@ public void testAssignWithNewTasks() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and @@ -521,16 +524,16 @@ public void testAssignWithStates() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match assertEquals(2, assignments.get("consumer10").partitions().size()); @@ -609,16 +612,16 @@ public void testAssignWithStandbyReplicas() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // the first consumer final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); @@ -666,7 +669,7 @@ public void testOnAssignment() { standbyTasks.put(task2, Utils.mkSet(t3p2)); final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState); - final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); + final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); final Capture capturedCluster = EasyMock.newCapture(); taskManager.setPartitionsByHostState(hostState); @@ -704,10 +707,10 @@ public void testAssignWithInternalTopics() { partitionAssignor.setInternalTopicManager(internalTopicManager); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check prepared internal topics assertEquals(1, internalTopicManager.readyTopics.size()); @@ -738,10 +741,10 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { partitionAssignor.setInternalTopicManager(internalTopicManager); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check prepared internal topics assertEquals(2, internalTopicManager.readyTopics.size()); @@ -790,11 +793,11 @@ public void shouldGenerateTasksForAllCreatedPartitions() { partitionAssignor.setInternalTopicManager(mockInternalTopicManager); subscriptions.put(client, - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( asList("topic1", "topic3"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); final Map expectedCreatedInternalTopics = new HashMap<>(); expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); @@ -841,7 +844,8 @@ public void shouldAddUserDefinedEndPointToSubscription() { uuid1, builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); + final Set topics = Utils.mkSet("input"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); } @@ -863,11 +867,11 @@ public void shouldMapUserEndPointToTopicPartitions() { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer1", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); - final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + final ConsumerPartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); final Set topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); assertEquals( @@ -961,11 +965,11 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas partitionAssignor.setInternalTopicManager(mockInternalTopicManager); subscriptions.put(client, - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("unknownTopic"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true)); @@ -1015,18 +1019,18 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { mockClientSupplier.restoreConsumer)); subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); subscriptions.put("consumer2", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()) ); final Set allPartitions = Utils.mkSet(t1p0, t1p1, t1p2); - final Map assign = partitionAssignor.assign(metadata, subscriptions); - final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); + final Map assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + final ConsumerPartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); final Set consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); final Set consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); @@ -1109,12 +1113,12 @@ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, final int otherVersion) { subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()) ); subscriptions.put("consumer2", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) @@ -1126,7 +1130,7 @@ private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions UUID.randomUUID(), builder); partitionAssignor.configure(configProps()); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(assignment.size(), equalTo(2)); assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); @@ -1142,7 +1146,8 @@ public void shouldDownGradeSubscriptionToVersion1() { builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final Set topics = Utils.mkSet("topic1"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics),partitionAssignor.subscriptionUserData(topics)); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } @@ -1180,7 +1185,8 @@ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final Set topics = Utils.mkSet("topic1"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); } @@ -1200,12 +1206,12 @@ public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForF }; subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode()) ); subscriptions.put("future-consumer", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), encodeFutureSubscription()) ); @@ -1216,7 +1222,7 @@ public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForF UUID.randomUUID(), builder); partitionAssignor.configure(configProps()); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(assignment.size(), equalTo(2)); assertThat( @@ -1252,12 +1258,12 @@ private ByteBuffer encodeFutureSubscription() { private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()) ); subscriptions.put("future-consumer", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), encodeFutureSubscription()) ); @@ -1270,24 +1276,24 @@ private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMi partitionAssignor.configure(configProps()); try { - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); fail("Should have thrown IllegalStateException"); } catch (final IllegalStateException expected) { // pass } } - private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { + private ConsumerPartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), firstHostState); - return new PartitionAssignor.Assignment( + return new ConsumerPartitionAssignor.Assignment( Collections.emptyList(), info.encode()); } private AssignmentInfo checkAssignment(final Set expectedTopics, - final PartitionAssignor.Assignment assignment) { + final ConsumerPartitionAssignor.Assignment assignment) { // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group. diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 27bee810d85f..06adf62f0d97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -18,8 +18,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -113,7 +113,7 @@ public FutureStreamsPartitionAssignor() { } @Override - public Subscription subscription(final Set topics) { + public ByteBuffer subscriptionUserData(final Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -133,11 +133,11 @@ public Subscription subscription(final Set topics) { taskManager.updateSubscriptionsFromMetadata(topics); - return new Subscription(new ArrayList<>(topics), data.encode()); + return data.encode(); } @Override - public void onAssignment(final PartitionAssignor.Assignment assignment) { + public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment) { try { super.onAssignment(assignment); return; @@ -183,15 +183,15 @@ public void onAssignment(final PartitionAssignor.Assignment assignment) { } @Override - public Map assign(final Cluster metadata, - final Map subscriptions) { + public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) { + final Map subscriptions = groupSubscription.groupSubscription(); Map assignment = null; final Map downgradedSubscriptions = new HashMap<>(); for (final Subscription subscription : subscriptions.values()) { final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) { - assignment = super.assign(metadata, subscriptions); + assignment = super.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); break; } } @@ -219,7 +219,7 @@ public Map assign(final Cluster metadata, info.userEndPoint()) .encode())); } - assignment = super.assign(metadata, downgradedSubscriptions); + assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment(); bumpUsedVersion = true; bumpSupportedVersion = true; } @@ -238,7 +238,7 @@ public Map assign(final Cluster metadata, .encode())); } - return newAssignment; + return new GroupAssignment(newAssignment); } } From 432df4ba5a5d7840bf9d7cd6e1aa77f2834162d8 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 14:20:16 -0700 Subject: [PATCH 02/11] checkstyle --- .../clients/consumer/KafkaConsumerTest.java | 19 +++++++++---------- .../internals/ConsumerProtocolTest.java | 1 - .../internals/StreamsPartitionAssignor.java | 4 ++-- .../StreamsPartitionAssignorTest.java | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index ce9931a1c6e9..1227c27ad603 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; @@ -512,7 +511,7 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singleton(tp0)); @@ -587,7 +586,7 @@ public void testMissingOffsetNoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -611,7 +610,7 @@ public void testResetToCommittedOffset() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -636,7 +635,7 @@ public void testResetUsingAutoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -663,7 +662,7 @@ public void testOffsetIsValidAfterSeek() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, Optional.empty()); @@ -686,7 +685,7 @@ public void testCommitsFetchedDuringAssign() { initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -724,7 +723,7 @@ public void testAutoCommitSentBeforePositionUpdate() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -764,7 +763,7 @@ public void testRegexSubscription() { initMetadata(client, partitionCounts); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); @@ -782,7 +781,7 @@ public void testRegexSubscription() { @Test public void testChangingRegexSubscription() { - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 3e25ac8dabf5..8cf07a53e33a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -39,7 +39,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 7e07a5365699..e90d66c6855c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -310,7 +310,7 @@ public String name() { } @Override - public ByteBuffer subscriptionUserData(Set topics) { + public ByteBuffer subscriptionUserData(final Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -372,7 +372,7 @@ private Map errorAssignment(final Map * 3. within each client, tasks are assigned to consumer clients in round-robin manner. */ @Override - public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) { + public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscriptions) { final Map subscriptions = groupSubscriptions.groupSubscription(); // construct the client metadata from the decoded subscription info final Map clientMetadataMap = new HashMap<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 5d5611acd735..dc6ff6763b6f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1147,7 +1147,7 @@ public void shouldDownGradeSubscriptionToVersion1() { configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100)); final Set topics = Utils.mkSet("topic1"); - final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics),partitionAssignor.subscriptionUserData(topics)); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } From 2c75910942be13e4571edc40ce9e2d3e457f78f5 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 14:37:56 -0700 Subject: [PATCH 03/11] add generation to ConsumerGroupMetadata --- .../consumer/ConsumerPartitionAssignor.java | 18 +++++++++++------- .../internals/ConsumerCoordinator.java | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index e4455387d1da..f0cc4004f481 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -161,20 +161,24 @@ public ByteBuffer userData() { } class ConsumerGroupMetadata { - private String groupId; - private int generationId; private String memberId; + private int generationId; + private String groupId; private Optional groupInstanceId; - public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { - this.groupId = groupId; - this.generationId = generationId; + ConsumerGroupMetadata(String memberId, int generationId, String groupId, Optional groupInstanceId) { this.memberId = memberId; + this.generationId = generationId; + this.groupId = groupId; this.groupInstanceId = groupInstanceId; } - public ConsumerGroupMetadata(int generationId) { - this(null, generationId, null, Optional.empty()); + public ConsumerGroupMetadata(String memberId, int generationId) { + this(memberId, generationId, "", Optional.empty()); + } + + public String memberId() { + return memberId; } public int generationId() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 156bf5fe3ad3..3869e19530df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -290,7 +290,7 @@ protected void onJoinComplete(int generation, maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment - ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(generation); + ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(memberId, generation); assignor.onAssignment(assignment, metadata); // reschedule the auto commit starting from now From 1eeb5050dd03cde06971fe2ef0b7fcfaae24d2b3 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 14:46:43 -0700 Subject: [PATCH 04/11] fix spotbugs --- .../clients/consumer/ConsumerPartitionAssignor.java | 12 ++---------- .../consumer/internals/ConsumerCoordinator.java | 3 --- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index f0cc4004f481..7af93572468c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -44,7 +44,7 @@ public interface ConsumerPartitionAssignor { * Return serialized data that will be included in the serializable subscription object sent in the * joinGroup and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) * - * @return Non-null optional join subscription user data + * @return Optional join subscription user data */ default ByteBuffer subscriptionUserData(Set topics) { return null; @@ -163,18 +163,10 @@ public ByteBuffer userData() { class ConsumerGroupMetadata { private String memberId; private int generationId; - private String groupId; - private Optional groupInstanceId; - ConsumerGroupMetadata(String memberId, int generationId, String groupId, Optional groupInstanceId) { + public ConsumerGroupMetadata(String memberId, int generationId) { this.memberId = memberId; this.generationId = generationId; - this.groupId = groupId; - this.groupInstanceId = groupInstanceId; - } - - public ConsumerGroupMetadata(String memberId, int generationId) { - this(memberId, generationId, "", Optional.empty()); } public String memberId() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 3869e19530df..66aafda12cfd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -478,14 +478,11 @@ protected Map performAssignment(String leaderId, Set allSubscribedTopics = new HashSet<>(); Map subscriptions = new HashMap<>(); - // collect all the owned partitions - Map ownedPartitions = new HashMap<>(); for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) { Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata())); subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId())); subscriptions.put(memberSubscription.memberId(), subscription); allSubscribedTopics.addAll(subscription.topics()); - ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId()))); } // the leader will begin watching for changes to any of the topics the group is interested in, From a5e3eaa0950729bf7545c1ff824a792f6c013a54 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 15:04:24 -0700 Subject: [PATCH 05/11] add groupId and groupInstanceId back --- .../consumer/ConsumerPartitionAssignor.java | 23 +++++++++++++++---- .../internals/ConsumerCoordinator.java | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 7af93572468c..5046dbf3f4ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -161,21 +161,34 @@ public ByteBuffer userData() { } class ConsumerGroupMetadata { - private String memberId; + private String groupId; private int generationId; + private String memberId; + Optional groupInstanceId; - public ConsumerGroupMetadata(String memberId, int generationId) { - this.memberId = memberId; + public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { + this.groupId = groupId; this.generationId = generationId; + this.memberId = memberId; + this.groupInstanceId = groupInstanceId; } - public String memberId() { - return memberId; + public String groupId() { + return groupId; } public int generationId() { return generationId; } + + public String memberId() { + return memberId; + } + + public Optional groupInstanceId() { + return groupInstanceId; + } + } final class GroupSubscription { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 66aafda12cfd..948ac114ac87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -290,7 +290,7 @@ protected void onJoinComplete(int generation, maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment - ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(memberId, generation); + ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId); assignor.onAssignment(assignment, metadata); // reschedule the auto commit starting from now From 6016faa21ae5058c00fffd5d9d9aef64df41b523 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 15:13:40 -0700 Subject: [PATCH 06/11] use empty bytebuffer --- .../kafka/clients/consumer/ConsumerPartitionAssignor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 5046dbf3f4ae..e7a339108d77 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -44,10 +44,10 @@ public interface ConsumerPartitionAssignor { * Return serialized data that will be included in the serializable subscription object sent in the * joinGroup and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) * - * @return Optional join subscription user data + * @return non-null optional join subscription user data */ default ByteBuffer subscriptionUserData(Set topics) { - return null; + return ByteBuffer.wrap(new byte[0]); } /** @@ -114,7 +114,7 @@ public Subscription(List topics, ByteBuffer userData) { } public Subscription(List topics) { - this(topics, null, Collections.emptyList()); + this(topics, ByteBuffer.wrap(new byte[0]), Collections.emptyList()); } public List topics() { @@ -148,7 +148,7 @@ public Assignment(List partitions, ByteBuffer userData) { } public Assignment(List partitions) { - this(partitions, null); + this(partitions, ByteBuffer.wrap(new byte[0])); } public List partitions() { From a24cbddb5816ef43e8281b8b1afbb1a523cc819c Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 16:05:26 -0700 Subject: [PATCH 07/11] github comments --- .../apache/kafka/clients/consumer/ConsumerConfig.java | 2 +- .../clients/consumer/ConsumerPartitionAssignor.java | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7eb34d4aeee7..8a18bd5a77af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig { * partition.assignment.strategy */ public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy"; - private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used"; + private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in"; /** * auto.offset.reset diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index e7a339108d77..77acf79f08b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -59,19 +59,12 @@ default ByteBuffer subscriptionUserData(Set topics) { */ GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions); - /** - * Callback which is invoked when a group member receives its assignment from the leader. - * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)} - */ - void onAssignment(Assignment assignment); - /** * Callback which is invoked when a group member receives its assignment from the leader. * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)} * @param metadata Additional metadata on the consumer (optional) */ default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { - onAssignment(assignment); } /** @@ -91,7 +84,8 @@ default short version() { } /** - * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") + * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required + * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG} * @return non-null unique name */ String name(); From 379a7eeb6deefcbe0c212890c553c4e0e7eaac34 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 16:32:54 -0700 Subject: [PATCH 08/11] remove onAssignment overload --- .../consumer/internals/AbstractPartitionAssignor.java | 5 ----- .../processor/internals/StreamsPartitionAssignor.java | 2 +- .../processor/internals/StreamsPartitionAssignorTest.java | 4 ++-- .../org/apache/kafka/streams/tests/StreamsUpgradeTest.java | 4 ++-- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java index ae047a1cd156..3b966b0736bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -72,11 +72,6 @@ public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscript return new GroupAssignment(assignments); } - @Override - public void onAssignment(Assignment assignment) { - // this assignor maintains no internal state, so nothing to do - } - protected static void put(Map> map, K key, V value) { List list = map.computeIfAbsent(key, k -> new ArrayList<>()); list.add(value); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index e90d66c6855c..4772fb586076 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -778,7 +778,7 @@ List> interleaveTasksByGroupId(final Collection taskIds, fi * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @Override - public void onAssignment(final Assignment assignment) { + public void onAssignment(final Assignment assignment, ConsumerGroupMetadata metadata) { final List partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index dc6ff6763b6f..616deaf39040 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -680,7 +680,7 @@ public void testOnAssignment() { EasyMock.expectLastCall(); EasyMock.replay(taskManager); - partitionAssignor.onAssignment(assignment); + partitionAssignor.onAssignment(assignment, null); EasyMock.verify(taskManager); @@ -989,7 +989,7 @@ public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() { EasyMock.expectLastCall(); EasyMock.replay(taskManager); - partitionAssignor.onAssignment(createAssignment(hostState)); + partitionAssignor.onAssignment(createAssignment(hostState), null); EasyMock.verify(taskManager); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 06adf62f0d97..4cf7d8e19916 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -137,9 +137,9 @@ public ByteBuffer subscriptionUserData(final Set topics) { } @Override - public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment) { + public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata metadata) { try { - super.onAssignment(assignment); + super.onAssignment(assignment, metadata); return; } catch (final TaskAssignmentException cannotProcessFutureVersion) { // continue From 651a2f9f218ed38d146d5dec77ac04aabd667387 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 18:37:05 -0700 Subject: [PATCH 09/11] github feedback --- .../clients/consumer/ConsumerPartitionAssignor.java | 6 +++--- .../clients/consumer/internals/ConsumerCoordinator.java | 9 --------- .../processor/internals/StreamsPartitionAssignor.java | 2 +- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 2 +- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 77acf79f08b9..754bf8eed709 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -41,8 +41,8 @@ public interface ConsumerPartitionAssignor { /** - * Return serialized data that will be included in the serializable subscription object sent in the - * joinGroup and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) + * Return serialized data that will be included in the {@link Subscription} sent to the leader + * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) * * @return non-null optional join subscription user data */ @@ -55,7 +55,7 @@ default ByteBuffer subscriptionUserData(Set topics) { * @param metadata Current topic/broker metadata known by consumer * @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)} * @return A map from the members to their respective assignment. This should have one entry - * for all members who in the input subscription map. + * for each member in the input subscription map. */ GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 948ac114ac87..c0d348a8de8f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -495,15 +495,6 @@ protected Map performAssignment(String leaderId, Map assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment(); - switch (protocol) { - case EAGER: - break; - - case COOPERATIVE: - // TODO need to validate assignment -- make sure no partitions to be revoked were also assigned during this rebalance - break; - } - // user-customized assignor may have created some topics that are not in the subscription list // and assign their partitions to the members; in this case we would like to update the leader's // own metadata with the newly added topics so that it will not trigger a subsequent rebalance diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 4772fb586076..41005dad6277 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -778,7 +778,7 @@ List> interleaveTasksByGroupId(final Collection taskIds, fi * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @Override - public void onAssignment(final Assignment assignment, ConsumerGroupMetadata metadata) { + public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { final List partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 4cf7d8e19916..681601e533bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -137,7 +137,7 @@ public ByteBuffer subscriptionUserData(final Set topics) { } @Override - public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata metadata) { + public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final ConsumerGroupMetadata metadata) { try { super.onAssignment(assignment, metadata); return; From ecfce0a2ec81e93be3d4de9990a54cbf1bacbaee Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jul 2019 18:47:58 -0700 Subject: [PATCH 10/11] default userdata to null --- .../kafka/clients/consumer/ConsumerPartitionAssignor.java | 8 ++++---- .../clients/consumer/internals/ConsumerProtocolTest.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 754bf8eed709..584bef76d32b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -44,10 +44,10 @@ public interface ConsumerPartitionAssignor { * Return serialized data that will be included in the {@link Subscription} sent to the leader * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) * - * @return non-null optional join subscription user data + * @return optional join subscription user data */ default ByteBuffer subscriptionUserData(Set topics) { - return ByteBuffer.wrap(new byte[0]); + return null; } /** @@ -108,7 +108,7 @@ public Subscription(List topics, ByteBuffer userData) { } public Subscription(List topics) { - this(topics, ByteBuffer.wrap(new byte[0]), Collections.emptyList()); + this(topics, null, Collections.emptyList()); } public List topics() { @@ -142,7 +142,7 @@ public Assignment(List partitions, ByteBuffer userData) { } public Assignment(List partitions) { - this(partitions, ByteBuffer.wrap(new byte[0])); + this(partitions, null); } public List partitions() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 8cf07a53e33a..3cf7be51665c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -53,7 +53,7 @@ public class ConsumerProtocolTest { @Test public void serializeDeserializeMetadata() { - Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0])); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(subscription.topics(), parsedSubscription.topics()); @@ -63,7 +63,7 @@ public void serializeDeserializeMetadata() { @Test public void serializeDeserializeMetadataAndGroupInstanceId() { - Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0])); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); @@ -144,7 +144,7 @@ public void deserializeFutureSubscriptionVersion() { @Test public void serializeDeserializeAssignment() { List partitions = Arrays.asList(tp1, tp2); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions)); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions, ByteBuffer.wrap(new byte[0]))); Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); assertEquals(0, parsedAssignment.userData().limit()); From 20e8f3bc5998d975a2888493d114914fe101d291 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 25 Jul 2019 11:05:30 -0700 Subject: [PATCH 11/11] moved consumergroupmetadata to consumer package --- .../consumer/ConsumerGroupMetadata.java | 50 +++++++++++++++++++ .../consumer/ConsumerPartitionAssignor.java | 31 ------------ .../internals/ConsumerCoordinator.java | 2 +- .../internals/StreamsPartitionAssignor.java | 1 + .../streams/tests/StreamsUpgradeTest.java | 1 + 5 files changed, 53 insertions(+), 32 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java new file mode 100644 index 000000000000..e17894b213d0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.util.Optional; + +public class ConsumerGroupMetadata { + private String groupId; + private int generationId; + private String memberId; + Optional groupInstanceId; + + public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { + this.groupId = groupId; + this.generationId = generationId; + this.memberId = memberId; + this.groupInstanceId = groupInstanceId; + } + + public String groupId() { + return groupId; + } + + public int generationId() { + return generationId; + } + + public String memberId() { + return memberId; + } + + public Optional groupInstanceId() { + return groupInstanceId; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 584bef76d32b..72d5d6e806bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -154,37 +154,6 @@ public ByteBuffer userData() { } } - class ConsumerGroupMetadata { - private String groupId; - private int generationId; - private String memberId; - Optional groupInstanceId; - - public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { - this.groupId = groupId; - this.generationId = generationId; - this.memberId = memberId; - this.groupInstanceId = groupInstanceId; - } - - public String groupId() { - return groupId; - } - - public int generationId() { - return generationId; - } - - public String memberId() { - return memberId; - } - - public Optional groupInstanceId() { - return groupInstanceId; - } - - } - final class GroupSubscription { private final Map subscriptions; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c0d348a8de8f..a28119dd4b0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 41005dad6277..fa5f5115d65c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 681601e533bc..0b2d0b319aa5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.Cluster;