From be192af84ba56961f9b43dee296ef4a9a080b3b4 Mon Sep 17 00:00:00 2001 From: Ritika Reddy Date: Wed, 8 May 2024 17:01:20 -0700 Subject: [PATCH 1/2] Remove rack awareness code --- .../AbstractUniformAssignmentBuilder.java | 199 ------- .../GeneralUniformAssignmentBuilder.java | 138 +---- .../OptimizedUniformAssignmentBuilder.java | 82 +-- .../GeneralUniformAssignmentBuilderTest.java | 485 +--------------- ...OptimizedUniformAssignmentBuilderTest.java | 533 +----------------- 5 files changed, 37 insertions(+), 1400 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java index 591a59bafa1e..bd9b0b00779f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java @@ -19,48 +19,19 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * The assignment builder is used to construct the target assignment based on the members' subscriptions. - * - * This class contains common utility methods and a class for obtaining and storing rack information. */ public abstract class AbstractUniformAssignmentBuilder { protected abstract GroupAssignment buildAssignment(); - /** - * Determines if rack-aware assignment is appropriate based on the provided rack information. - * - * @param memberRacks Racks where members are located. - * @param allPartitionRacks Racks where partitions are located. - * @param racksPerPartition Map of partitions to their associated racks. - * - * @return {@code true} if rack-aware assignment should be applied; {@code false} otherwise. - */ - protected static boolean useRackAwareAssignment( - Set memberRacks, - Set allPartitionRacks, - Map> racksPerPartition - ) { - if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, allPartitionRacks)) - return false; - else { - return !racksPerPartition.values().stream().allMatch(allPartitionRacks::equals); - } - } - /** * Adds the topic's partition to the member's target assignment. */ @@ -94,174 +65,4 @@ protected static Set topicIdPartitions( .mapToObj(i -> new TopicIdPartition(topic, i)) ).collect(Collectors.toSet()); } - - /** - * Processes partitions for the given topic Ids using the provided function. - * - * @param topicIds Collection of topic Ids. - * @param subscribedTopicDescriber Describer to fetch partition counts for topics. - * @param func Function to apply on each {@code TopicIdPartition}. - */ - protected static void processTopicIdPartitions( - Collection topicIds, - SubscribedTopicDescriber subscribedTopicDescriber, - Consumer func - ) { - topicIds.stream() - .flatMap(topic -> IntStream - .range(0, subscribedTopicDescriber.numPartitions(topic)) - .mapToObj(i -> new TopicIdPartition(topic, i)) - ).forEach(func); - } - - /** - * Represents the rack information of members and partitions along with utility methods - * to facilitate rack-aware assignment strategies for a given consumer group. - */ - protected static class RackInfo { - /** - * Map of every member to its rack. - */ - protected final Map memberRacks; - - /** - * Map of every partition to a list of its racks. - */ - protected final Map> partitionRacks; - - /** - * List of members with the same rack as the partition. - */ - protected final Map> membersWithSameRackAsPartition; - - /** - * Indicates if a rack aware assignment can be done. - * True if racks are defined for both members and partitions and there is an intersection between the sets. - */ - protected final boolean useRackStrategy; - - /** - * Constructs rack information based on the assignment specification and subscribed topics. - * - * @param assignmentSpec The current assignment specification. - * @param subscribedTopicDescriber Topic and partition metadata of the subscribed topics. - * @param topicIds List of topic Ids. - */ - public RackInfo( - AssignmentSpec assignmentSpec, - SubscribedTopicDescriber subscribedTopicDescriber, - Set topicIds - ) { - Map> membersByRack = new HashMap<>(); - assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> - assignmentMemberSpec.rackId().filter(r -> !r.isEmpty()).ifPresent( - rackId -> membersByRack.computeIfAbsent(rackId, __ -> new ArrayList<>()).add(memberId) - ) - ); - - Set allPartitionRacks; - Map> racksPerPartition; - - if (membersByRack.isEmpty()) { - allPartitionRacks = Collections.emptySet(); - racksPerPartition = Collections.emptyMap(); - } else { - racksPerPartition = new HashMap<>(); - allPartitionRacks = new HashSet<>(); - processTopicIdPartitions(topicIds, subscribedTopicDescriber, tp -> { - Set racks = subscribedTopicDescriber.racksForPartition(tp.topicId(), tp.partitionId()); - racksPerPartition.put(tp, racks); - if (!racks.isEmpty()) allPartitionRacks.addAll(racks); - }); - } - - if (useRackAwareAssignment(membersByRack.keySet(), allPartitionRacks, racksPerPartition)) { - this.memberRacks = new HashMap<>(assignmentSpec.members().size()); - membersByRack.forEach((rack, rackMembers) -> rackMembers.forEach(c -> memberRacks.put(c, rack))); - this.partitionRacks = racksPerPartition; - useRackStrategy = true; - } else { - this.memberRacks = Collections.emptyMap(); - this.partitionRacks = Collections.emptyMap(); - useRackStrategy = false; - } - - this.membersWithSameRackAsPartition = racksPerPartition.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().stream() - .flatMap(rack -> membersByRack.getOrDefault(rack, Collections.emptyList()).stream()) - .distinct() // Ensure that there are no duplicate members - .collect(Collectors.toList()) - )); - } - - /** - * Determines if there's a mismatch between the member's rack and the partition's replica racks. - * - *

Racks are considered mismatched under the following conditions: (returns {@code true}): - *

    - *
  • Member lacks an associated rack.
  • - *
  • Partition lacks associated replica racks.
  • - *
  • Member's rack isn't among the partition's replica racks.
  • - *
- * - * @param memberId The member Id. - * @param tp The topic partition. - * @return {@code true} for a mismatch; {@code false} if member and partition racks exist and align. - */ - protected boolean racksMismatch(String memberId, TopicIdPartition tp) { - String memberRack = memberRacks.get(memberId); - Set replicaRacks = partitionRacks.get(tp); - return memberRack == null || (replicaRacks == null || !replicaRacks.contains(memberRack)); - } - - /** - * Sort partitions in ascending order by number of members with matching racks. - * - * @param topicIdPartitions The partitions to be sorted. - * @return A sorted list of partitions with potential members in the same rack. - */ - protected List sortPartitionsByRackMembers(Collection topicIdPartitions) { - return topicIdPartitions.stream() - .filter(tp -> membersWithSameRackAsPartition.containsKey(tp) && !membersWithSameRackAsPartition.get(tp).isEmpty()) - .sorted(Comparator.comparing( - (TopicIdPartition tp) -> membersWithSameRackAsPartition.getOrDefault(tp, Collections.emptyList()).size()) - .thenComparing(TopicIdPartition::topicId) - .thenComparing(TopicIdPartition::partitionId)) - .collect(Collectors.toList()); - } - - /** - * @return List of members with the same rack as any of the provided partition's replicas. - * Members are sorted in ascending order of number of partitions in the assignment. - */ - protected List getSortedMembersWithMatchingRack( - TopicIdPartition topicIdPartition, - Map assignment - ) { - List membersList = membersWithSameRackAsPartition.getOrDefault( - topicIdPartition, - Collections.emptyList() - ); - - // Sort the list based on the size of each member's assignment. - membersList.sort((member1, member2) -> { - int sum1 = assignment.get(member1).targetPartitions().values().stream().mapToInt(Set::size).sum(); - int sum2 = assignment.get(member2).targetPartitions().values().stream().mapToInt(Set::size).sum(); - - return Integer.compare(sum1, sum2); - }); - - return membersList; - } - - @Override - public String toString() { - return "RackInfo(" + - "memberRacks=" + memberRacks + - ", partitionRacks=" + partitionRacks + - ")"; - } - } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java index e98bd4ea7a83..f8d165e1bd12 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java @@ -43,13 +43,11 @@ *
  • Balance: Ensure partitions are distributed equally among all members. * The difference in assignments sizes between any two members * should not exceed one partition.
  • - *
  • Rack Matching: When feasible, aim to assign partitions to members - * located on the same rack thus avoiding cross-zone traffic.
  • *
  • Stickiness: Minimize partition movements among members by retaining * as much of the existing assignment as possible.
  • * * This assignment builder prioritizes the above properties in the following order: - * Balance > Rack Matching > Stickiness. + * Balance > Stickiness. */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); @@ -79,11 +77,6 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu */ private final Map targetAssignment; - /** - * Rack information. - */ - private final RackInfo rackInfo; - /** * The partitions that still need to be assigned. */ @@ -104,14 +97,6 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu */ private final TreeSet sortedMembersByAssignmentSize; - /** - * Tracks the owner of each partition in the existing assignment of the member. - * - * Only populated when rack aware strategy is used. - * Contains partitions that weren't retained due to a rack mismatch. - */ - private final Map currentPartitionOwners; - /** * Tracks the owner of each partition in the target assignment. */ @@ -142,12 +127,10 @@ public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, Subscribed targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())); }) ); - this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscribedTopicIds); this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); this.assignedStickyPartitions = new HashSet<>(); this.assignmentManager = new AssignmentManager(this.members, this.subscribedTopicDescriber); this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(members.keySet()); - this.currentPartitionOwners = new HashMap<>(); this.partitionOwnerInTargetAssignment = new HashMap<>(); this.partitionMovements = new PartitionMovements(); } @@ -156,10 +139,7 @@ public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, Subscribed * Here's the step-by-step breakdown of the assignment process: * *
  • Retain partitions from the existing assignments a.k.a sticky partitions.
  • - *
    • If a partition's rack mismatches with its owner, track it for future use.
    - *
  • If rack aware strategy is possible, allocate unassigned partitions to members in the same rack.
  • - *
  • Allocate all the remaining unassigned partitions to the members in a balanced manner. If possible, allocate - * the partition back to it's existing owner in case it was not retained earlier due to a rack mismatch.
  • + *
  • Allocate all the remaining unassigned partitions to the members in a balanced manner.
  • *
  • Iterate through the assignment until it is balanced.
  • */ @Override @@ -169,11 +149,9 @@ protected GroupAssignment buildAssignment() { return new GroupAssignment(Collections.emptyMap()); } - // When rack awareness is enabled, only sticky partitions with matching rack are retained. - // Otherwise, all existing partitions are retained until max assignment size. + // All existing partitions are retained until max assignment size. assignStickyPartitions(); - if (rackInfo.useRackStrategy) rackAwarePartitionAssignment(); unassignedPartitionsAssignment(); balance(); @@ -211,8 +189,6 @@ private List sortTopicIdPartitions(Collection Partitions from topics that are still present in both the new subscriptions and the topic metadata. - *
  • When using a rack-aware strategy, only partitions with member owners in the same rack are retained.
  • - *
  • Track current partition owners when there is a rack mismatch.
  • */ private void assignStickyPartitions() { members.forEach((memberId, assignmentMemberSpec) -> @@ -220,12 +196,8 @@ private void assignStickyPartitions() { if (assignmentMemberSpec.subscribedTopicIds().contains(topicId)) { currentAssignment.forEach(partition -> { TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { - currentPartitionOwners.put(topicIdPartition, memberId); - } else { - assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); - assignedStickyPartitions.add(topicIdPartition); - } + assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId); + assignedStickyPartitions.add(topicIdPartition); }); } else { LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); @@ -234,33 +206,9 @@ private void assignStickyPartitions() { ); } - /** - * Allocates the unassigned partitions to members in the same rack, if available. - */ - private void rackAwarePartitionAssignment() { - // Sort partitions in ascending order by the number of potential members with matching racks. - // Only partitions with potential members in the same rack are returned. - List sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); - - sortedPartitions.forEach(partition -> { - List sortedMembersByAssignmentSize = rackInfo.getSortedMembersWithMatchingRack( - partition, - targetAssignment - ); - - for (String memberId : sortedMembersByAssignmentSize) { - if (assignmentManager.maybeAssignPartitionToMember(partition, memberId)) { - break; - } - } - }); - } - /** * Allocates the remaining unassigned partitions to members in a balanced manner. *
  • Partitions are sorted to maximize the probability of a balanced assignment.
  • - *
  • If there was an assignment that wasn't retained due to a rack mismatch, - * check if the partition can retain its existing assignment.
  • *
  • Sort members in ascending order of their current target assignment sizes * to ensure the least filled member gets the partition first.
  • */ @@ -268,13 +216,6 @@ private void unassignedPartitionsAssignment() { List sortedPartitions = sortTopicIdPartitions(unassignedPartitions); for (TopicIdPartition partition : sortedPartitions) { - if (rackInfo.useRackStrategy && currentPartitionOwners.containsKey(partition)) { - String prevOwner = currentPartitionOwners.get(partition); - if (assignmentManager.maybeAssignPartitionToMember(partition, prevOwner)) { - continue; - } - } - TreeSet sortedMembers = assignmentManager.sortMembersByAssignmentSize( membersPerTopic.get(partition.topicId()) ); @@ -405,10 +346,6 @@ private void balance() { * Performs reassignments of partitions to balance the load across members. * This method iteratively reassigns partitions until no further moves can improve the balance. * - * The process involves sorting reassignable partitions for efficiency and considering rack matching - * and sticky partition assignments. Reassignments are made based on the comparative load of members - * and the adherence to rack policies. - * * The method uses a do-while loop to ensure at least one pass over the partitions and continues * reassigning as long as there are modifications to the current assignments. It checks for balance * after each reassignment and exits if the balance is achieved. @@ -448,65 +385,12 @@ private void performReassignments() { "to a member", reassignablePartition) ); - boolean foundMatchingRackMember = false; - - // If rack strategy is used and the current assignment adheres to rack matching, - // check if another member in the same rack is better suited for this topicIdPartition. - if (rackInfo.useRackStrategy) { - String memberRack = rackInfo.memberRacks.get(currentTargetOwner); - Set partitionRacks = rackInfo.partitionRacks.get(reassignablePartition); - - if (partitionRacks.contains(memberRack)) { - for (String otherMember : rackInfo.getSortedMembersWithMatchingRack(reassignablePartition, targetAssignment)) { - // Only subscribed members eligible for re-balancing should be considered. - if (!sortedMembersByAssignmentSize.contains(otherMember) || !membersPerTopic.containsKey(reassignablePartition.topicId())) - continue; - - String otherMemberRack = rackInfo.memberRacks.get(otherMember); - if (otherMemberRack == null || !partitionRacks.contains(otherMemberRack)) - continue; - if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) { - reassignPartition(reassignablePartition, otherMember); - modified = true; - reassignmentOccurred = true; - foundMatchingRackMember = true; - break; - } - } - } - } - - // If rack-aware strategy is not used OR no other member with matching rack was better suited, - // First check if the topicIdPartition already belongs to its previous owner, if not and a previous owner - // exists, check if the topicIdPartition can be assigned to it. - // If not, assign to any other better suited member with the topic subscription. - if (!foundMatchingRackMember) { - boolean isPartitionSticky = assignedStickyPartitions.contains(reassignablePartition); - boolean isCurrentOwnerKnown = currentPartitionOwners.containsKey(reassignablePartition); - - if (rackInfo.useRackStrategy && !isPartitionSticky && isCurrentOwnerKnown) { - String currentOwner = currentPartitionOwners.get(reassignablePartition); - int currentMemberAssignmentSize = assignmentManager.targetAssignmentSize(currentTargetOwner); - int currentOwnerAssignmentSize = assignmentManager.targetAssignmentSize(currentOwner); - - // Check if reassignment is needed based on assignment sizes - if (currentMemberAssignmentSize > currentOwnerAssignmentSize + 1) { - reassignPartition(reassignablePartition, currentOwner); - modified = true; - reassignmentOccurred = true; - } - } - - // The topicIdPartition is already sticky and no other member with matching rack is better suited. - if (!reassignmentOccurred) { - for (String otherMember : membersPerTopic.get(reassignablePartition.topicId())) { - if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) { - reassignPartition(reassignablePartition); - modified = true; - reassignmentOccurred = true; - break; - } - } + for (String otherMember : membersPerTopic.get(reassignablePartition.topicId())) { + if (assignmentManager.targetAssignmentSize(currentTargetOwner) > assignmentManager.targetAssignmentSize(otherMember) + 1) { + reassignPartition(reassignablePartition); + modified = true; + reassignmentOccurred = true; + break; } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java index b60ab450e903..a14363708e37 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java @@ -47,13 +47,11 @@ *
  • Balance: Ensure partitions are distributed equally among all members. * The difference in assignments sizes between any two members * should not exceed one partition.
  • - *
  • Rack Matching: When feasible, aim to assign partitions to members - * located on the same rack thus avoiding cross-zone traffic.
  • *
  • Stickiness: Minimize partition movements among members by retaining * as much of the existing assignment as possible.
  • * * The assignment builder prioritizes the properties in the following order: - * Balance > Rack Matching > Stickiness. + * Balance > Stickiness. */ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); @@ -73,11 +71,6 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Set subscribedTopicIds; - /** - * Rack information and helper methods. - */ - private final RackInfo rackInfo; - /** * The number of members to receive an extra partition beyond the minimum quota. * Minimum Quota = Total Partitions / Total Members @@ -103,22 +96,12 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment */ private final Map targetAssignment; - /** - * Tracks the existing owner of each partition. - * Only populated when the rack awareness strategy is used. - */ - private final Map currentPartitionOwners; - OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.assignmentSpec = assignmentSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); - this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscribedTopicIds); this.potentiallyUnfilledMembers = new HashMap<>(); this.targetAssignment = new HashMap<>(); - // Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary - // as all sticky partitions are retained until a member meets its quota. - this.currentPartitionOwners = rackInfo.useRackStrategy ? new HashMap<>() : Collections.emptyMap(); } /** @@ -127,13 +110,10 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment *
  • Compute the quotas of partitions for each member based on the total partitions and member count.
  • *
  • Initialize unassigned partitions to all the topic partitions and * remove partitions from the list as and when they are assigned.
  • - *
  • For existing assignments, retain partitions based on the determined quota and member's rack compatibility.
  • - *
  • If a partition's rack mismatches with its owner, track it for future use.
  • + *
  • For existing assignments, retain partitions based on the determined quota.
  • *
  • Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.
  • - *
  • Proceed with a round-robin assignment adhering to rack awareness. + *
  • Proceed with a round-robin assignment according to quotas. * For each unassigned partition, locate the first compatible member from the potentially unfilled list.
  • - *
  • If no rack-compatible member is found, revert to the tracked current owner. - * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment.
  • */ @Override protected GroupAssignment buildAssignment() throws PartitionAssignorException { @@ -168,7 +148,6 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber); potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - if (rackInfo.useRackStrategy) rackAwarePartitionAssignment(); unassignedPartitionsRoundRobinAssignment(); if (!unassignedPartitions.isEmpty()) { @@ -181,13 +160,10 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { /** * Retains a set of partitions from the existing assignment and includes them in the target assignment. * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * In addition, if rack awareness is enabled, it is ensured that a partition's rack matches the member's rack. * *

    For each member: *

      - *
    1. Find the valid current assignment considering topic subscriptions, metadata and rack information.
    2. - *
    3. When rack aware strategy is used, only partitions with their rack matching their current - * owner's rack are returned in the valid assignment.
    4. + *
    5. Find the valid current assignment considering topic subscriptions and metadata
    6. *
    7. If the current assignment exists, retain partitions up to the minimum quota.
    8. *
    9. If the current assignment size is greater than the minimum quota and * there are members that could get an extra partition, assign the next partition as well.
    10. @@ -205,7 +181,6 @@ private Map assignStickyPartitions(int minQuota) { assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { List validCurrentMemberAssignment = validCurrentMemberAssignment( - memberId, assignmentMemberSpec.assignedPartitions() ); @@ -252,16 +227,12 @@ private Map assignStickyPartitions(int minQuota) { * Filters the current assignment of partitions for a given member based on certain criteria. * * Any partition that still belongs to the member's subscribed topics list is considered valid. - * If rack aware strategy can be used: Only partitions with matching rack are valid and non-matching partitions are - * tracked with their current owner for future use. * - * @param memberId The Id of the member whose assignment is being validated. * @param currentMemberAssignment The map of topics to partitions currently assigned to the member. * * @return List of valid partitions after applying the filters. */ private List validCurrentMemberAssignment( - String memberId, Map> currentMemberAssignment ) { List validCurrentAssignmentList = new ArrayList<>(); @@ -269,11 +240,7 @@ private List validCurrentMemberAssignment( if (subscribedTopicIds.contains(topicId)) { partitions.forEach(partition -> { TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { - currentPartitionOwners.put(topicIdPartition, memberId); - } else { - validCurrentAssignmentList.add(topicIdPartition); - } + validCurrentAssignmentList.add(topicIdPartition); }); } else { LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); @@ -283,37 +250,8 @@ private List validCurrentMemberAssignment( return validCurrentAssignmentList; } - /** - * Allocates the unassigned partitions to unfilled members present in the same rack. - * Partitions with the least number of potential members in the same rack are allotted first. - * Members in the same rack with the least number of partitions in the target assignment - * are assigned partitions first. - */ - private void rackAwarePartitionAssignment() { - // Sort partitions in ascending order by number of potential members with matching racks. - // Partitions with no potential members in the same rack aren't included in this list. - List sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); - - sortedPartitions.forEach(partition -> { - List sortedMembersWithMatchingRack = rackInfo.getSortedMembersWithMatchingRack(partition, targetAssignment); - - for (String memberId : sortedMembersWithMatchingRack) { - if (potentiallyUnfilledMembers.containsKey(memberId) && maybeAssignPartitionToMember(memberId, partition)) { - unassignedPartitions.remove(partition); - break; - } - } - }); - } - /** * Allocates the unassigned partitions to unfilled members in a round-robin fashion. - * - * If the rack-aware strategy is enabled, partitions are attempted to be assigned back to their current owners first. - * This is because pure stickiness without rack matching is not considered initially. - * - * If a partition couldn't be assigned to its current owner due to the quotas OR - * if the rack-aware strategy is not enabled, the partitions are allocated to the unfilled members. */ private void unassignedPartitionsRoundRobinAssignment() { Queue roundRobinMembers = new LinkedList<>(potentiallyUnfilledMembers.keySet()); @@ -327,16 +265,6 @@ private void unassignedPartitionsRoundRobinAssignment() { for (TopicIdPartition topicIdPartition : sortedPartitionsList) { boolean assigned = false; - if (rackInfo.useRackStrategy && currentPartitionOwners.containsKey(topicIdPartition)) { - String currentOwner = currentPartitionOwners.get(topicIdPartition); - if (potentiallyUnfilledMembers.containsKey(currentOwner)) { - assigned = maybeAssignPartitionToMember(currentOwner, topicIdPartition); - if (!potentiallyUnfilledMembers.containsKey(currentOwner)) { - roundRobinMembers.remove(currentOwner); - } - } - } - for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { String memberId = roundRobinMembers.poll(); if (potentiallyUnfilledMembers.containsKey(memberId)) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java index ad82771bd4dc..9f66ca0e00a0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java @@ -33,7 +33,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -79,7 +78,7 @@ public void testTwoMembersNoTopicSubscription() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); @@ -113,14 +112,14 @@ public void testTwoMembersSubscribedToNonexistentTopics() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); assertThrows(PartitionAssignorException.class, () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); } @Test - public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { + public void testFirstAssignmentTwoMembersTwoTopics() { Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, @@ -149,7 +148,7 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -166,193 +165,6 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { assertAssignment(expectedAssignment, computedAssignment); } - @Test - public void testFirstAssignmentThreeMembersThreeTopicsNoPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 6, - Collections.emptyMap() - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 8, - Collections.emptyMap() - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - Collections.emptyMap() - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Collections.singletonList(topic2Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack3"), - Arrays.asList(topic1Uuid, topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic2Uuid, 2, 4, 6), - mkTopicAssignment(topic1Uuid, 0, 4) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 3, 5, 7) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2, 3, 5), - mkTopicAssignment(topic3Uuid, 0, 1) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - - //checked alreaddy - @Test - public void testFirstAssignmentThreeMembersThreeTopicsWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - mkMapOfPartitionRacks(2) - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Collections.singletonList(topic1Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack3"), - Collections.singletonList(topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 2) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic3Uuid, 0, 1) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - - @Test - public void testFirstAssignmentThreeMembersThreeTopicsWithSomeMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - Collections.emptyMap() - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.empty(), - Arrays.asList(topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Collections.singletonList(topic2Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.empty(), - Collections.singletonList(topic1Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic2Uuid, 0), - mkTopicAssignment(topic3Uuid, 0, 1) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 1, 2) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - @Test public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Map topicMetadata = new HashMap<>(); @@ -389,7 +201,7 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -472,7 +284,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -493,151 +305,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen } @Test - public void testReassignmentOnRackChangesWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 3, - mkMapOfPartitionRacks(3) - )); - - // Initially A was in rack 1 and B was in rack 2, now let's switch them. - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1), - mkTopicAssignment(topic3Uuid, 0, 1, 2) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic3Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 0, 1, 2) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2), - mkTopicAssignment(topic3Uuid, 0, 1, 2) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0, 1, 2) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - - @Test - public void testReassignmentOnAddingPartitionsWithMemberAndPartitionRacks() { - // Initially they had 3 partitions each. - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 5, - mkMapOfPartitionRacks(5) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 5, - mkMapOfPartitionRacks(5) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 4, - mkMapOfPartitionRacks(4) - )); - topicMetadata.put(topic4Uuid, new TopicMetadata( - topic4Uuid, - topic4Name, - 4, - mkMapOfPartitionRacks(4) - )); - - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2), - mkTopicAssignment(topic4Uuid, 0, 1, 2) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Arrays.asList(topic1Uuid, topic4Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 2), - mkTopicAssignment(topic3Uuid, 0, 1, 2) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic3Uuid, topic2Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2, 3, 4), - mkTopicAssignment(topic4Uuid, 0, 1, 2, 3) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 2, 3, 4), - mkTopicAssignment(topic3Uuid, 0, 1, 2, 3) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - - @Test - public void testReassignmentWhenPartitionsAreAddedForTwoMembersNoMemberRack() { + public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { // Simulating adding partitions to T1, T2, T3 - originally T1 -> 4, T2 -> 3, T3 -> 2, T4 -> 3 Map topicMetadata = new HashMap<>(); topicMetadata.put(topic1Uuid, new TopicMetadata( @@ -693,7 +361,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersNoMemberRack() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -760,7 +428,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -780,76 +448,6 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop assertAssignment(expectedAssignment, computedAssignment); } - @Test - public void testReassignmentOnAddingMemberWithRackAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 6, - mkMapOfPartitionRacks(6) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 5, - mkMapOfPartitionRacks(5) - )); - - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2, 3, 4, 5) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Collections.singletonList(topic1Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 2, 3, 4) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Collections.singletonList(topic2Uuid), - currentAssignmentForB - )); - - // New member added. - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 3, 4) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1, 3, 4) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2, 5), - mkTopicAssignment(topic2Uuid, 2) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } - @Test public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersThreeTopics() { Map topicMetadata = new HashMap<>(); @@ -897,7 +495,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -955,7 +553,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -970,65 +568,4 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith assertAssignment(expectedAssignment, computedAssignment); } - - @Test - public void testReassignmentWhenOneSubscriptionRemovedWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 5, - mkMapOfPartitionRacks(5) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 2, - mkMapOfPartitionRacks(2) - )); - - // Initial subscriptions were [T1, T2]. - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2, 4), - mkTopicAssignment(topic2Uuid, 0) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 3), - mkTopicAssignment(topic2Uuid, 1) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Collections.singletonList(topic2Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1, 2, 3, 4) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic2Uuid, 0, 1) - )); - - assertAssignment(expectedAssignment, computedAssignment); - } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 4fdce379534b..929a1fcf54d0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -36,7 +36,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; -import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -77,7 +76,7 @@ public void testOneMemberNoTopicSubscription() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); @@ -107,7 +106,7 @@ public void testOneMemberSubscribedToNonexistentTopic() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); assertThrows(PartitionAssignorException.class, () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); @@ -143,7 +142,7 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -162,253 +161,6 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { checkValidityAndBalance(members, computedAssignment); } - @Test - public void testFirstAssignmentTwoMembersTwoTopicsNoPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - Collections.emptyMap() - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - Collections.emptyMap() - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2), - mkTopicAssignment(topic3Uuid, 1) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic3Uuid, 0) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - - @Test - public void testFirstAssignmentThreeMembersThreeTopicsWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - mkMapOfPartitionRacks(2) - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack3"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0), - mkTopicAssignment(topic3Uuid, 0) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1), - mkTopicAssignment(topic3Uuid, 1) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 2) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - - @Test - public void testFirstAssignmentThreeMembersThreeTopicsWithMemberAndPartitionRacks2() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 2, - mkMapOfPartitionRacks(2) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 2, - mkMapOfPartitionRacks(2) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - mkMapOfPartitionRacks(2) - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic3Uuid, 0) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic2Uuid, 1), - mkTopicAssignment(topic3Uuid, 1) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - - @Test - public void testFirstAssignmentThreeMembersThreeTopicsWithSomeMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic3Uuid, new TopicMetadata( - topic3Uuid, - topic3Name, - 2, - Collections.emptyMap() - )); - - Map members = new TreeMap<>(); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 2), - mkTopicAssignment(topic3Uuid, 1) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2), - mkTopicAssignment(topic2Uuid, 1) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic2Uuid, 0), - mkTopicAssignment(topic3Uuid, 0) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - @Test public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Map topicMetadata = new HashMap<>(); @@ -439,7 +191,7 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -485,7 +237,7 @@ public void testValidityAndBalanceForLargeSampleSet() { )); } - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -537,7 +289,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -556,134 +308,6 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( checkValidityAndBalance(members, computedAssignment); } - @Test - public void testReassignmentOnRackChangesWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - - // Initially A was in rack 1 and B was in rack 2, now let's switch them. - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1), - mkTopicAssignment(topic2Uuid, 0) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 1, 2) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2), - mkTopicAssignment(topic2Uuid, 2) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 1, 0) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - - @Test - public void testReassignmentOnAddingPartitionsWithMemberAndPartitionRacks() { - // Initially T1,T2 had 3 partitions. - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 5, - mkMapOfPartitionRacks(5) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 5, - mkMapOfPartitionRacks(5) - )); - - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0, 2) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2), - mkTopicAssignment(topic2Uuid, 1) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 3), - mkTopicAssignment(topic2Uuid, 0, 3, 2) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 4, 2), - mkTopicAssignment(topic2Uuid, 1, 4) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - @Test public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { // Simulating adding partition to T1 and T2 - originally T1 -> 3 Partitions and T2 -> 3 Partitions @@ -729,7 +353,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -796,7 +420,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -817,81 +441,6 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe checkValidityAndBalance(members, computedAssignment); } - @Test - public void testReassignmentOnAddingMemberWithRackAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 3, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 3, - mkMapOfPartitionRacks(3) - )); - - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0, 2) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2), - mkTopicAssignment(topic2Uuid, 1) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), - currentAssignmentForB - )); - - // New member added. - members.put(memberC, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid), - Collections.emptyMap() - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1) - )); - expectedAssignment.put(memberC, mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 2) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - @Test public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() { Map topicMetadata = new HashMap<>(); @@ -934,7 +483,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -994,7 +543,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + AssignmentSpec assignmentSpec = new AssignmentSpec(members); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -1011,68 +560,6 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith checkValidityAndBalance(members, computedAssignment); } - @Test - public void testReassignmentWhenOneSubscriptionRemovedWithMemberAndPartitionRacks() { - Map topicMetadata = new HashMap<>(); - topicMetadata.put(topic1Uuid, new TopicMetadata( - topic1Uuid, - topic1Name, - 5, - mkMapOfPartitionRacks(3) - )); - topicMetadata.put(topic2Uuid, new TopicMetadata( - topic2Uuid, - topic2Name, - 2, - mkMapOfPartitionRacks(3) - )); - - // Initial subscriptions were [T1, T2]. - Map members = new TreeMap<>(); - - Map> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2, 4), - mkTopicAssignment(topic2Uuid, 0) - ) - ); - members.put(memberA, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack0"), - Collections.singletonList(topic1Uuid), - currentAssignmentForA - )); - - Map> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 3), - mkTopicAssignment(topic2Uuid, 1) - ) - ); - members.put(memberB, new AssignmentMemberSpec( - Optional.empty(), - Optional.of("rack1"), - Collections.singletonList(topic1Uuid), - currentAssignmentForB - )); - - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - - Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2, 4) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 3) - )); - - assertAssignment(expectedAssignment, computedAssignment); - checkValidityAndBalance(members, computedAssignment); - } - /** * Verifies that the given assignment is valid with respect to the given subscriptions. * Validity requirements: From 9cff2163322a1dfa8b1c53855a7461e7aae6445c Mon Sep 17 00:00:00 2001 From: Ritika Reddy Date: Mon, 13 May 2024 11:28:31 -0700 Subject: [PATCH 2/2] rebase --- .../GeneralUniformAssignmentBuilderTest.java | 19 +++++++++-------- ...OptimizedUniformAssignmentBuilderTest.java | 21 ++++++++++--------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java index 9f66ca0e00a0..3d21a4199430 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java @@ -33,6 +33,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -78,7 +79,7 @@ public void testTwoMembersNoTopicSubscription() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); @@ -112,7 +113,7 @@ public void testTwoMembersSubscribedToNonexistentTopics() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); assertThrows(PartitionAssignorException.class, () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); @@ -148,7 +149,7 @@ public void testFirstAssignmentTwoMembersTwoTopics() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -201,7 +202,7 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -284,7 +285,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -361,7 +362,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -428,7 +429,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -495,7 +496,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -553,7 +554,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 929a1fcf54d0..95a16ce455f2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -36,6 +36,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +77,7 @@ public void testOneMemberNoTopicSubscription() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); assertEquals(Collections.emptyMap(), groupAssignment.members()); @@ -106,7 +107,7 @@ public void testOneMemberSubscribedToNonexistentTopic() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); assertThrows(PartitionAssignorException.class, () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); @@ -142,7 +143,7 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -191,7 +192,7 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -237,7 +238,7 @@ public void testValidityAndBalanceForLargeSampleSet() { )); } - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -289,7 +290,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -353,7 +354,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -420,7 +421,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -483,7 +484,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); @@ -543,7 +544,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members); + AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata);