From 650e81ee343991d1c78ee544419c7e3854367bf9 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Thu, 16 Apr 2020 17:31:29 +0200 Subject: [PATCH 1/8] [KIP-54] Implement sticky partition assignment strategy without py27 support --- kafka/coordinator/assignors/abstract.py | 3 +- kafka/coordinator/assignors/range.py | 2 +- kafka/coordinator/assignors/roundrobin.py | 2 +- .../coordinator/assignors/sticky/__init__.py | 0 .../assignors/sticky/partition_movements.py | 152 ++++ .../assignors/sticky/sticky_assignor.py | 681 ++++++++++++++ kafka/coordinator/consumer.py | 5 +- requirements-dev.txt | 1 + setup.py | 4 + test/test_assignors.py | 843 +++++++++++++++++- test/test_coordinator.py | 19 +- test/test_partition_movements.py | 23 + 12 files changed, 1712 insertions(+), 23 deletions(-) create mode 100644 kafka/coordinator/assignors/sticky/__init__.py create mode 100644 kafka/coordinator/assignors/sticky/partition_movements.py create mode 100644 kafka/coordinator/assignors/sticky/sticky_assignor.py create mode 100644 test/test_partition_movements.py diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index a1fef3840..5f3431bce 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -44,7 +44,7 @@ def metadata(self, topics): pass @abc.abstractmethod - def on_assignment(self, assignment): + def on_assignment(self, assignment, generation): """Callback that runs on each assignment. This method can be used to update internal state, if any, of the @@ -52,5 +52,6 @@ def on_assignment(self, assignment): Arguments: assignment (MemberAssignment): the member's assignment + generation (int): generation id """ pass diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py index 299e39c48..6612d7cd8 100644 --- a/kafka/coordinator/assignors/range.py +++ b/kafka/coordinator/assignors/range.py @@ -73,5 +73,5 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') @classmethod - def on_assignment(cls, assignment): + def on_assignment(cls, assignment, generation): pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index 2d24a5c8b..2292ebdc9 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -92,5 +92,5 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') @classmethod - def on_assignment(cls, assignment): + def on_assignment(cls, assignment, generation): pass diff --git a/kafka/coordinator/assignors/sticky/__init__.py b/kafka/coordinator/assignors/sticky/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py new file mode 100644 index 000000000..ab0cd3337 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/partition_movements.py @@ -0,0 +1,152 @@ +import logging +from collections import defaultdict +from copy import deepcopy +from typing import Dict, Set, List, Tuple, NamedTuple + +from kafka.structs import TopicPartition + +log = logging.getLogger(__name__) + + +class ConsumerPair(NamedTuple): + """ + Represents a pair of Kafka consumer ids involved in a partition reassignment. + Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some + partition of the particular topic was moved from the source consumer to the destination consumer + during the rebalance. This class helps in determining whether a partition reassignment results in cycles among + the generated graph of consumer pairs. + """ + src_member_id: str + dst_member_id: str + + +def is_sublist(source: List, target: Tuple) -> bool: + """Checks if one list is a sublist of another. + + Arguments: + source: the list in which to search for the occurrence of target. + target: the list to search for as a sublist of source + + Returns: + true if target is in source; false otherwise + """ + for index in (i for i, e in enumerate(source) if e == target[0]): + if tuple(source[index: index + len(target)]) == target: + return True + return False + + +class PartitionMovements: + """ + This class maintains some data structures to simplify lookup of partition movements among consumers. + At each point of time during a partition rebalance it keeps track of partition movements + corresponding to each topic, and also possible movement (in form a ConsumerPair object) for each partition. + """ + + def __init__(self): + self.partition_movements_by_topic: Dict[str, Dict[ConsumerPair, Set[TopicPartition]]] = defaultdict( + lambda: defaultdict(set) + ) + self.partition_movements: Dict[TopicPartition, ConsumerPair] = {} + + def move_partition(self, partition: TopicPartition, old_consumer: str, new_consumer: str): + pair = ConsumerPair(src_member_id=old_consumer, dst_member_id=new_consumer) + if partition in self.partition_movements: + # this partition has previously moved + existing_pair = self._remove_movement_record_of_partition(partition) + assert existing_pair.dst_member_id == old_consumer + if existing_pair.src_member_id != new_consumer: + # the partition is not moving back to its previous consumer + self._add_partition_movement_record( + partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer) + ) + else: + self._add_partition_movement_record(partition, pair) + + def get_partition_to_be_moved(self, partition: TopicPartition, old_consumer: str, new_consumer: str): + if partition.topic not in self.partition_movements_by_topic: + return partition + if partition in self.partition_movements: + # this partition has previously moved + assert old_consumer == self.partition_movements[partition].dst_member_id + old_consumer = self.partition_movements[partition].src_member_id + reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer) + if reverse_pair not in self.partition_movements_by_topic[partition.topic]: + return partition + + return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) + + def are_sticky(self) -> bool: + for topic, movements in self.partition_movements_by_topic.items(): + movement_pairs = set(movements.keys()) + if self._has_cycles(movement_pairs): + log.error( + f"Stickiness is violated for topic {topic}\n" + f"Partition movements for this topic occurred among the following consumer pairs:\n" + f"{movement_pairs}" + ) + return False + return True + + def _remove_movement_record_of_partition(self, partition: TopicPartition) -> ConsumerPair: + pair = self.partition_movements[partition] + del self.partition_movements[partition] + + self.partition_movements_by_topic[partition.topic][pair].remove(partition) + if not self.partition_movements_by_topic[partition.topic][pair]: + del self.partition_movements_by_topic[partition.topic][pair] + if not self.partition_movements_by_topic[partition.topic]: + del self.partition_movements_by_topic[partition.topic] + + return pair + + def _add_partition_movement_record(self, partition: TopicPartition, pair: ConsumerPair): + self.partition_movements[partition] = pair + self.partition_movements_by_topic[partition.topic][pair].add(partition) + + def _has_cycles(self, consumer_pairs: Set[ConsumerPair]): + cycles = set() + for pair in consumer_pairs: + reduced_pairs = deepcopy(consumer_pairs) + reduced_pairs.remove(pair) + path = [pair.src_member_id] + if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle( + path, cycles + ): + cycles.add(tuple(path)) + log.error(f"A cycle of length {len(path) - 1} was found: {path}") + + # for now we want to make sure there is no partition movements of the same topic between a pair of consumers. + # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized + # tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases. + for cycle in cycles: + if len(cycle) == 3: # indicates a cycle of length 2 + return True + return False + + @staticmethod + def _is_subcycle(cycle: List[str], cycles: Set[Tuple[str]]) -> bool: + super_cycle = deepcopy(cycle) + super_cycle = super_cycle[:-1] + super_cycle.extend(cycle) + for found_cycle in cycles: + if len(found_cycle) == len(cycle) and is_sublist(super_cycle, found_cycle): + return True + return False + + def _is_linked(self, src: str, dst: str, pairs: Set[ConsumerPair], current_path: List[str]): + if src == dst: + return False + if not pairs: + return False + if ConsumerPair(src, dst) in pairs: + current_path.append(src) + current_path.append(dst) + return True + for pair in pairs: + if pair.src_member_id == src: + reduced_set = deepcopy(pairs) + reduced_set.remove(pair) + current_path.append(pair.src_member_id) + return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path) + return False diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py new file mode 100644 index 000000000..86cfbaee1 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -0,0 +1,681 @@ +import logging +from collections import defaultdict +from copy import deepcopy +from typing import List, Dict, NamedTuple, Set, Tuple + +from sortedcontainers import SortedSet, SortedDict, SortedList + +from kafka.cluster import ClusterMetadata +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.coordinator.protocol import Schema +from kafka.protocol.struct import Struct +from kafka.protocol.types import String, Array, Int32 +from kafka.structs import TopicPartition + +log = logging.getLogger(__name__) + + +class ConsumerGenerationPair(NamedTuple): + consumer: str + generation: int + + +def has_identical_list_elements(list_: List[List]) -> bool: + """Checks if all lists in the collection have the same members + + Arguments: + list_: collection of lists + + Returns: + true if all lists in the collection have the same members; false otherwise + """ + if not list_: + return True + for i in range(1, len(list_)): + if list_[i] != list_[i - 1]: + return False + return True + + +def subscriptions_comparator_key(element: Tuple[str, Tuple[TopicPartition]]) -> Tuple[int, str]: + return len(element[1]), element[0] + + +def partitions_comparator_key(element: Tuple[TopicPartition, Tuple[str]]) -> Tuple[int, str, int]: + return len(element[1]), element[0].topic, element[0].partition + + +def remove_if_present(collection, element): + try: + collection.remove(element) + except (ValueError, KeyError): + pass + + +class StickyAssignorMemberMetadataV1(NamedTuple): + subscription: List[str] + partitions: List[TopicPartition] + generation: int + + +class StickyAssignorUserDataV1(Struct): + """ + Used for preserving consumer's previously assigned partitions + list and sending it as user data to the leader during a rebalance + """ + + SCHEMA = Schema( + ("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32) + ) + + +class StickyAssignmentExecutor: + def __init__(self, cluster: ClusterMetadata, members: Dict[str, StickyAssignorMemberMetadataV1]): + self.members = members + # a mapping between consumers and their assigned partitions that is updated during assignment procedure + self.current_assignment: Dict[str, List[TopicPartition]] = defaultdict(list) + # an assignment from a previous generation + self.previous_assignment: Dict[TopicPartition, ConsumerGenerationPair] = {} + # a mapping between partitions and their assigned consumers + self.current_partition_consumer: Dict[TopicPartition, str] = {} + # a flag indicating that there were no previous assignments performed ever + self.is_fresh_assignment = False + # a mapping of all topic partitions to all consumers that can be assigned to them + self.partition_to_all_potential_consumers: Dict[TopicPartition, List[str]] = {} + # a mapping of all consumers to all potential topic partitions that can be assigned to them + self.consumer_to_all_potential_partitions: Dict[str, List[TopicPartition]] = {} + # an ascending sorted set of consumers based on how many topic partitions are already assigned to them + self.sorted_current_subscriptions: Set[Tuple[str, Tuple[TopicPartition]]] = set() + # an ascending sorted list of topic partitions based on how many consumers can potentially use them + self.sorted_partitions: List[TopicPartition] = [] + # all partitions that need to be assigned + self.unassigned_partitions: List[TopicPartition] = [] + # a flag indicating that a certain partition cannot remain assigned to its current consumer because the consumer + # is no longer subscribed to its topic + self.revocation_required = False + + self.partition_movements = PartitionMovements() + self._initialize(cluster) + + def perform_initial_assignment(self): + self._populate_sorted_partitions() + self._populate_partitions_to_reassign() + + def balance(self): + self._initialize_current_subscriptions() + initializing = len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) == 0 + + # assign all unassigned partitions + for partition in self.unassigned_partitions: + # skip if there is no potential consumer for the partition + if not self.partition_to_all_potential_consumers[partition]: + continue + self._assign_partition(partition) + + # narrow down the reassignment scope to only those partitions that can actually be reassigned + fixed_partitions: Set[TopicPartition] = set() + for partition in self.partition_to_all_potential_consumers.keys(): + if not self._can_partition_participate_in_reassignment(partition): + fixed_partitions.add(partition) + for fixed_partition in fixed_partitions: + remove_if_present(self.sorted_partitions, fixed_partition) + remove_if_present(self.unassigned_partitions, fixed_partition) + + # narrow down the reassignment scope to only those consumers that are subject to reassignment + fixed_assignments: Dict[str, List[TopicPartition]] = {} + for consumer in self.consumer_to_all_potential_partitions.keys(): + if not self._can_consumer_participate_in_reassignment(consumer): + self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + fixed_assignments[consumer] = self.current_assignment[consumer] + del self.current_assignment[consumer] + + # create a deep copy of the current assignment so we can revert to it + # if we do not get a more balanced assignment later + prebalance_assignment = deepcopy(self.current_assignment) + prebalance_partition_consumers = deepcopy(self.current_partition_consumer) + + # if we don't already need to revoke something due to subscription changes, + # first try to balance by only moving newly added partitions + if not self.revocation_required: + self._perform_reassignments(self.unassigned_partitions) + reassignment_performed = self._perform_reassignments(self.sorted_partitions) + + # if we are not preserving existing assignments and we have made changes to the current assignment + # make sure we are getting a more balanced assignment; otherwise, revert to previous assignment + if ( + not initializing + and reassignment_performed + and self._get_balance_score(self.current_assignment) >= self._get_balance_score(prebalance_assignment) + ): + self.current_assignment = prebalance_assignment + self.current_partition_consumer.clear() + self.current_partition_consumer.update(prebalance_partition_consumers) + + # add the fixed assignments (those that could not change) back + for consumer, partitions in fixed_assignments.items(): + self.current_assignment[consumer] = partitions + self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) + + def get_final_assignment(self, member_id) -> List[Tuple[str, List[int]]]: + assignment: Dict[str, List[int]] = defaultdict(lambda: SortedList()) + for topic_partition in self.current_assignment[member_id]: + assignment[topic_partition.topic].add(topic_partition.partition) + # noinspection PyTypeChecker + return list(assignment.items()) + + def _initialize(self, cluster: ClusterMetadata): + self._init_current_assignments(self.members) + + for topic in cluster.topics(): + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning("No partition metadata for topic %s", topic) + continue + for p in partitions: + partition = TopicPartition(topic=topic, partition=p) + self.partition_to_all_potential_consumers[partition] = [] + for consumer_id, member_metadata in self.members.items(): + self.consumer_to_all_potential_partitions[consumer_id] = [] + for topic in member_metadata.subscription: + if cluster.partitions_for_topic(topic) is None: + log.warning(f"No partition metadata for topic {topic}") + continue + for p in cluster.partitions_for_topic(topic): + partition = TopicPartition(topic=topic, partition=p) + self.consumer_to_all_potential_partitions[consumer_id].append(partition) + self.partition_to_all_potential_consumers[partition].append(consumer_id) + if consumer_id not in self.current_assignment: + self.current_assignment[consumer_id] = [] + + def _init_current_assignments(self, members: Dict[str, StickyAssignorMemberMetadataV1]): + # we need to process subscriptions' user data with each consumer's reported generation in mind + # higher generations overwrite lower generations in case of a conflict + # note that a conflict could exists only if user data is for different generations + + # for each partition we create a sorted map of its consumers by generation + sorted_partition_consumers_by_generation: Dict[TopicPartition, SortedDict[int, str]] = {} + for consumer, member_metadata in members.items(): + for partitions in member_metadata.partitions: + if partitions in sorted_partition_consumers_by_generation: + consumers: Dict[int, str] = sorted_partition_consumers_by_generation[partitions] + if member_metadata.generation and member_metadata.generation in consumers: + # same partition is assigned to two consumers during the same rebalance. + # log a warning and skip this record + log.warning( + f"Partition {partitions} is assigned to multiple consumers " + f"following sticky assignment generation {member_metadata.generation}." + ) + else: + consumers[member_metadata.generation] = consumer + else: + sorted_consumers = SortedDict() + sorted_consumers[member_metadata.generation] = consumer + sorted_partition_consumers_by_generation[partitions] = sorted_consumers + + # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition + # current and previous consumers are the last two consumers of each partition in the above sorted map + for partitions, consumers in sorted_partition_consumers_by_generation.items(): + generations = list(reversed(consumers.keys())) + self.current_assignment[consumers[generations[0]]].append(partitions) + # now update previous assignment if any + if len(generations) > 1: + self.previous_assignment[partitions] = ConsumerGenerationPair( + consumer=consumers[generations[1]], generation=generations[1] + ) + + self.is_fresh_assignment = len(self.current_assignment) == 0 + + for consumer_id, partitions in self.current_assignment.items(): + for partition in partitions: + self.current_partition_consumer[partition] = consumer_id + + def _are_subscriptions_identical(self) -> bool: + """ + Returns: + true, if both potential consumers of partitions and potential partitions that consumers can + consume are the same + """ + if not has_identical_list_elements(list(self.partition_to_all_potential_consumers.values())): + return False + return has_identical_list_elements(list(self.consumer_to_all_potential_partitions.values())) + + def _populate_sorted_partitions(self): + # an ascending sorted set of topic partitions based on how many consumers can potentially use them + sorted_all_partitions: SortedSet[Tuple[TopicPartition, List[str]]] = SortedSet( + iterable=[(tp, tuple(consumers)) for tp, consumers in self.partition_to_all_potential_consumers.items()], + key=partitions_comparator_key, + ) + + self.sorted_partitions = [] + if not self.is_fresh_assignment and self._are_subscriptions_identical(): + # if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics) + # then we just need to simply list partitions in a round robin fashion (from consumers with + # most assigned partitions to those with least) + assignments = deepcopy(self.current_assignment) + for consumer_id, partitions in assignments.items(): + to_remove = [] + for partition in partitions: + if partition not in self.partition_to_all_potential_consumers: + to_remove.append(partition) + for partition in to_remove: + partitions.remove(partition) + + sorted_consumers: SortedSet[Tuple[str, Tuple[TopicPartition]]] = SortedSet( + iterable=[(consumer, tuple(partitions)) for consumer, partitions in assignments.items()], + key=subscriptions_comparator_key, + ) + # at this point, sorted_consumers contains an ascending-sorted list of consumers based on + # how many valid partitions are currently assigned to them + while sorted_consumers: + # take the consumer with the most partitions + consumer, _ = sorted_consumers.pop() + # currently assigned partitions to this consumer + remaining_partitions = assignments[consumer] + # from partitions that had a different consumer before, + # keep only those that are assigned to this consumer now + previous_partitions = set(self.previous_assignment.keys()).intersection(set(remaining_partitions)) + if previous_partitions: + # if there is a partition of this consumer that was assigned to another consumer before + # mark it as good options for reassignment + partition = previous_partitions.pop() + remaining_partitions.remove(partition) + self.sorted_partitions.append(partition) + sorted_consumers.add((consumer, tuple(assignments[consumer]))) + elif remaining_partitions: + # otherwise, mark any other one of the current partitions as a reassignment candidate + self.sorted_partitions.append(remaining_partitions.pop()) + sorted_consumers.add((consumer, tuple(assignments[consumer]))) + + while sorted_all_partitions: + partition = sorted_all_partitions.pop(0)[0] + if partition not in self.sorted_partitions: + self.sorted_partitions.append(partition) + else: + while sorted_all_partitions: + self.sorted_partitions.append(sorted_all_partitions.pop(0)[0]) + + def _populate_partitions_to_reassign(self): + self.unassigned_partitions = deepcopy(self.sorted_partitions) + + assignments_to_remove = [] + for consumer_id, partitions in self.current_assignment.items(): + if consumer_id not in self.members: + # if a consumer that existed before (and had some partition assignments) is now removed, + # remove it from current_assignment + for partition in partitions: + del self.current_partition_consumer[partition] + assignments_to_remove.append(consumer_id) + else: + # otherwise (the consumer still exists) + partitions_to_remove = [] + for partition in partitions: + if partition not in self.partition_to_all_potential_consumers: + # if this topic partition of this consumer no longer exists + # remove it from current_assignment of the consumer + partitions_to_remove.append(partition) + elif partition.topic not in self.members[consumer_id].subscription: + # if this partition cannot remain assigned to its current consumer because the consumer + # is no longer subscribed to its topic remove it from current_assignment of the consumer + partitions_to_remove.append(partition) + self.revocation_required = True + else: + # otherwise, remove the topic partition from those that need to be assigned only if + # its current consumer is still subscribed to its topic (because it is already assigned + # and we would want to preserve that assignment as much as possible) + self.unassigned_partitions.remove(partition) + for partition in partitions_to_remove: + self.current_assignment[consumer_id].remove(partition) + del self.current_partition_consumer[partition] + for consumer_id in assignments_to_remove: + del self.current_assignment[consumer_id] + + def _initialize_current_subscriptions(self): + self.sorted_current_subscriptions = SortedSet( + iterable=[(consumer, tuple(partitions)) for consumer, partitions in self.current_assignment.items()], + key=subscriptions_comparator_key, + ) + + def _get_consumer_with_least_subscriptions(self) -> str: + return self.sorted_current_subscriptions[0][0] + + def _get_consumer_with_most_subscriptions(self) -> str: + return self.sorted_current_subscriptions[-1][0] + + def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer: str): + self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) + + def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer: str): + self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer]))) + + def _is_balanced(self) -> bool: + """Determines if the current assignment is a balanced one""" + if ( + len(self.current_assignment[self._get_consumer_with_least_subscriptions()]) + >= len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) - 1 + ): + # if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true + return True + + # create a mapping from partitions to the consumer assigned to them + all_assigned_partitions: Dict[TopicPartition, str] = {} + for consumer_id, consumer_partitions in self.current_assignment.items(): + for partition in consumer_partitions: + if partition in all_assigned_partitions: + log.error(f"{partition} is assigned to more than one consumer.") + all_assigned_partitions[partition] = consumer_id + + # for each consumer that does not have all the topic partitions it can get + # make sure none of the topic partitions it could but did not get cannot be moved to it + # (because that would break the balance) + for consumer, _ in self.sorted_current_subscriptions: + consumer_partition_count = len(self.current_assignment[consumer]) + # skip if this consumer already has all the topic partitions it can get + if consumer_partition_count == len(self.consumer_to_all_potential_partitions[consumer]): + continue + + # otherwise make sure it cannot get any more + for partition in self.consumer_to_all_potential_partitions[consumer]: + if partition not in self.current_assignment[consumer]: + other_consumer = all_assigned_partitions[partition] + other_consumer_partition_count = len(self.current_assignment[other_consumer]) + if consumer_partition_count < other_consumer_partition_count: + return False + return True + + def _assign_partition(self, partition: TopicPartition): + for consumer, _ in self.sorted_current_subscriptions: + if partition in self.consumer_to_all_potential_partitions[consumer]: + self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self.current_assignment[consumer].append(partition) + self.current_partition_consumer[partition] = consumer + self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) + break + + def _can_partition_participate_in_reassignment(self, partition: TopicPartition) -> bool: + return len(self.partition_to_all_potential_consumers[partition]) >= 2 + + def _can_consumer_participate_in_reassignment(self, consumer: str) -> bool: + current_partitions = self.current_assignment[consumer] + current_assignment_size = len(current_partitions) + max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) + if current_assignment_size > max_assignment_size: + log.error(f"The consumer {consumer} is assigned more partitions than the maximum possible.") + if current_assignment_size < max_assignment_size: + # if a consumer is not assigned all its potential partitions it is subject to reassignment + return True + for partition in current_partitions: + # if any of the partitions assigned to a consumer is subject to reassignment the consumer itself + # is subject to reassignment + if self._can_partition_participate_in_reassignment(partition): + return True + return False + + def _perform_reassignments(self, reassignable_partitions: List[TopicPartition]) -> bool: + reassignment_performed = False + + # repeat reassignment until no partition can be moved to improve the balance + while True: + modified = False + # reassign all reassignable partitions until the full list is processed or a balance is achieved + # (starting from the partition with least potential consumers and if needed) + for partition in reassignable_partitions: + if self._is_balanced(): + break + # the partition must have at least two potential consumers + if len(self.partition_to_all_potential_consumers[partition]) <= 1: + log.error(f"Expected more than one potential consumer for partition {partition}") + # the partition must have a current consumer + consumer = self.current_partition_consumer.get(partition) + if consumer is None: + log.error(f"Expected partition {partition} to be assigned to a consumer") + + if ( + partition in self.previous_assignment + and len(self.current_assignment[consumer]) + > len(self.current_assignment[self.previous_assignment[partition].consumer]) + 1 + ): + self._reassign_partition_to_consumer( + partition, self.previous_assignment[partition].consumer, + ) + reassignment_performed = True + modified = True + continue + + # check if a better-suited consumer exist for the partition; if so, reassign it + for other_consumer in self.partition_to_all_potential_consumers[partition]: + if len(self.current_assignment[consumer]) > len(self.current_assignment[other_consumer]) + 1: + self._reassign_partition(partition) + reassignment_performed = True + modified = True + break + + if not modified: + break + return reassignment_performed + + def _reassign_partition(self, partition: TopicPartition): + new_consumer = None + for another_consumer, _ in self.sorted_current_subscriptions: + if partition in self.consumer_to_all_potential_partitions[another_consumer]: + new_consumer = another_consumer + break + assert new_consumer is not None + self._reassign_partition_to_consumer(partition, new_consumer) + + def _reassign_partition_to_consumer(self, partition: TopicPartition, new_consumer: str): + consumer = self.current_partition_consumer[partition] + # find the correct partition movement considering the stickiness requirement + partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer) + self._move_partition(partition_to_be_moved, new_consumer) + + def _move_partition(self, partition: TopicPartition, new_consumer: str): + old_consumer = self.current_partition_consumer[partition] + self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer) + + self.partition_movements.move_partition(partition, old_consumer, new_consumer) + + self.current_assignment[old_consumer].remove(partition) + self.current_assignment[new_consumer].append(partition) + self.current_partition_consumer[partition] = new_consumer + + self._add_consumer_to_current_subscriptions_and_maintain_order(new_consumer) + self._add_consumer_to_current_subscriptions_and_maintain_order(old_consumer) + + @staticmethod + def _get_balance_score(assignment: Dict[str, List[TopicPartition]]) -> int: + """Calculates a balance score of a give assignment + as the sum of assigned partitions size difference of all consumer pairs. + A perfectly balanced assignment (with all consumers getting the same number of partitions) + has a balance score of 0. Lower balance score indicates a more balanced assignment. + + Arguments: + assignment (dict): {consumer: list of assigned topic partitions} + + Returns: + the balance score of the assignment + """ + score = 0 + consumer2assignment: Dict[str, int] = {} + for consumer_id, partitions in assignment.items(): + consumer2assignment[consumer_id] = len(partitions) + + consumers_to_explore = set(consumer2assignment.keys()) + for consumer_id in consumer2assignment.keys(): + if consumer_id in consumers_to_explore: + consumers_to_explore.remove(consumer_id) + for other_consumer_id in consumers_to_explore: + score += abs(consumer2assignment[consumer_id] - consumer2assignment[other_consumer_id]) + return score + + +class StickyPartitionAssignor(AbstractPartitionAssignor): + """ + https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy + + The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: + - the numbers of topic partitions assigned to consumers differ by at most one; or + - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it. + + Second, it preserved as many existing assignment as possible when a reassignment occurs. + This helps in saving some of the overhead processing when topic partitions move from one consumer to another. + + Starting fresh it would work by distributing the partitions over consumers as evenly as possible. + Even though this may sound similar to how round robin assignor works, the second example below shows that it is not. + During a reassignment it would perform the reassignment in such a way that in the new assignment + - topic partitions are still distributed as evenly as possible, and + - topic partitions stay with their previously assigned consumers as much as possible. + + The first goal above takes precedence over the second one. + + Example 1. + Suppose there are three consumers C0, C1, C2, + four topics t0, t1, t2, t3, and each topic has 2 partitions, + resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. + Each consumer is subscribed to all three topics. + + The assignment with both sticky and round robin assignors will be: + - C0: [t0p0, t1p1, t3p0] + - C1: [t0p1, t2p0, t3p1] + - C2: [t1p0, t2p1] + + Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce: + - C0: [t0p0, t1p0, t2p0, t3p0] + - C2: [t0p1, t1p1, t2p1, t3p1] + + while the sticky assignor would result in: + - C0 [t0p0, t1p1, t3p0, t2p0] + - C2 [t1p0, t2p1, t0p1, t3p1] + preserving all the previous assignments (unlike the round robin assignor). + + + Example 2. + There are three consumers C0, C1, C2, + and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. + Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. + C0 is subscribed to t0; + C1 is subscribed to t0, t1; + and C2 is subscribed to t0, t1, t2. + + The round robin assignor would come up with the following assignment: + - C0 [t0p0] + - C1 [t1p0] + - C2 [t1p1, t2p0, t2p1, t2p2] + + which is not as balanced as the assignment suggested by sticky assignor: + - C0 [t0p0] + - C1 [t1p0, t1p1] + - C2 [t2p0, t2p1, t2p2] + + Now, if consumer C0 is removed, these two assignors would produce the following assignments. + Round Robin (preserves 3 partition assignments): + - C1 [t0p0, t1p1] + - C2 [t1p0, t2p0, t2p1, t2p2] + + Sticky (preserves 5 partition assignments): + - C1 [t1p0, t1p1, t0p0] + - C2 [t2p0, t2p1, t2p2] + """ + + DEFAULT_GENERATION_ID = -1 + + name = "sticky" + version = 0 + + member_assignment: List[TopicPartition] = None + generation: int = DEFAULT_GENERATION_ID + + _latest_partition_movements = None + + @classmethod + def assign(cls, cluster: ClusterMetadata, members: dict): + """Performs group assignment given cluster metadata and member subscriptions + + Arguments: + cluster (ClusterMetadata): cluster metadata + members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. + + Returns: + dict: {member_id: MemberAssignment} + """ + members_metadata: Dict[str, StickyAssignorMemberMetadataV1] = {} + for consumer, member_metadata in members.items(): + members_metadata[consumer] = cls.parse_member_metadata(member_metadata) + + executor = StickyAssignmentExecutor(cluster, members_metadata) + executor.perform_initial_assignment() + executor.balance() + + cls._latest_partition_movements = executor.partition_movements + + assignment = {} + for member_id in members: + assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, sorted(executor.get_final_assignment(member_id)), b'' + ) + return assignment + + @classmethod + def parse_member_metadata(cls, metadata) -> StickyAssignorMemberMetadataV1: + """ + Parses member metadata into a python object. + This implementation only serializes and deserializes the StickyAssignorMemberMetadataV1 user data, + since no StickyAssignor written in Python was deployed ever in the wild with version V0, meaning that + there is no need to support backward compatibility with V0. + + Arguments: + metadata (MemberMetadata): decoded metadata for a member of the group. + + Returns: + parsed metadata (StickyAssignorMemberMetadataV1) + """ + user_data = metadata.user_data + if not user_data: + return StickyAssignorMemberMetadataV1( + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + ) + + try: + decoded_user_data = StickyAssignorUserDataV1.decode(user_data) + except Exception as e: + # ignore the consumer's previous assignment if it cannot be parsed + log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args + return StickyAssignorMemberMetadataV1( + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + ) + + member_partitions: List[TopicPartition] = [] + for topic, partitions in decoded_user_data.previous_assignment: + member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) + return StickyAssignorMemberMetadataV1( + # pylint: disable=no-member + partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription + ) + + @classmethod + def metadata(cls, topics): + if cls.member_assignment is None: + log.debug("No member assignment available") + user_data = b'' + else: + log.debug(f"Member assignment is available, generating the metadata: generation {cls.generation}") + partitions_by_topic = defaultdict(list) + for topic_partition in cls.member_assignment: + partitions_by_topic[topic_partition.topic].append(topic_partition.partition) + data = StickyAssignorUserDataV1(partitions_by_topic.items(), cls.generation) + user_data = data.encode() + return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) + + @classmethod + def on_assignment(cls, assignment, generation: int): + """Callback that runs on each assignment. Updates assignor's state. + + Arguments: + assignment: MemberAssignment + generation: generation id (if present) + """ + log.debug(f"On assignment: assignment={assignment}, generation={generation}") + cls.member_assignment: List[TopicPartition] = assignment.partitions() + cls.generation = generation diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index fda80aa67..0d9496721 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -6,6 +6,7 @@ import logging import time +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.vendor import six from kafka.coordinator.base import BaseCoordinator, Generation @@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, - 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor), + 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor), 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -233,7 +234,7 @@ def _on_join_complete(self, generation, member_id, protocol, # give the assignor a chance to update internal state # based on the received assignment - assignor.on_assignment(assignment) + assignor.on_assignment(assignment, generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval diff --git a/requirements-dev.txt b/requirements-dev.txt index d2830905b..ace1f8c93 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,3 +15,4 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 +sortedcontainers==2.1.0 diff --git a/setup.py b/setup.py index 8bc484c9a..35c0e754a 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ # since we can't import something we haven't built yet :) exec(open('kafka/version.py').read()) + class Tox(Command): user_options = [] @@ -58,5 +59,8 @@ def run(cls): "Programming Language :: Python :: 3.7", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", + ], + install_requires=[ + "sortedcontainers==2.1.0" ] ) diff --git a/test/test_assignors.py b/test/test_assignors.py index 0821caf83..1cbe7f10a 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -1,28 +1,45 @@ # pylint: skip-file from __future__ import absolute_import +from collections import defaultdict +from random import randint, sample +from typing import Set, Dict, List + import pytest +from kafka.structs import TopicPartition from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1 +from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata + +@pytest.fixture(autouse=True) +def reset_sticky_assignor(): + yield + StickyPartitionAssignor.member_assignment = None + StickyPartitionAssignor.generation = -1 -@pytest.fixture -def cluster(mocker): + +def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lambda=None): cluster = mocker.MagicMock() - cluster.partitions_for_topic.return_value = set([0, 1, 2]) + cluster.topics.return_value = topics + if topics_partitions is not None: + cluster.partitions_for_topic.return_value = topics_partitions + if topic_partitions_lambda is not None: + cluster.partitions_for_topic.side_effect = topic_partitions_lambda return cluster -def test_assignor_roundrobin(cluster): +def test_assignor_roundrobin(mocker): assignor = RoundRobinPartitionAssignor member_metadata = { - 'C0': assignor.metadata(set(['t0', 't1'])), - 'C1': assignor.metadata(set(['t0', 't1'])), + 'C0': assignor.metadata({'t0', 't1'}), + 'C1': assignor.metadata({'t0', 't1'}), } + cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) ret = assignor.assign(cluster, member_metadata) expected = { 'C0': ConsumerProtocolMemberAssignment( @@ -36,14 +53,15 @@ def test_assignor_roundrobin(cluster): assert ret[member].encode() == expected[member].encode() -def test_assignor_range(cluster): +def test_assignor_range(mocker): assignor = RangePartitionAssignor member_metadata = { - 'C0': assignor.metadata(set(['t0', 't1'])), - 'C1': assignor.metadata(set(['t0', 't1'])), + 'C0': assignor.metadata({'t0', 't1'}), + 'C1': assignor.metadata({'t0', 't1'}), } + cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) ret = assignor.assign(cluster, member_metadata) expected = { 'C0': ConsumerProtocolMemberAssignment( @@ -55,3 +73,808 @@ def test_assignor_range(cluster): assert set(ret) == set(expected) for member in ret: assert ret[member].encode() == expected[member].encode() + + +def test_sticky_assignor1(mocker): + """ + Given: there are three consumers C0, C1, C2, + four topics t0, t1, t2, t3, and each topic has 2 partitions, + resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. + Each consumer is subscribed to all three topics. + Then: perform fresh assignment + Expected: the assignment is + - C0: [t0p0, t1p1, t3p0] + - C1: [t0p1, t2p0, t3p1] + - C2: [t1p0, t2p1] + Then: remove C1 consumer and perform the reassignment + Expected: the new assignment is + - C0 [t0p0, t1p1, t2p0, t3p0] + - C2 [t0p1, t1p0, t2p1, t3p1] + """ + cluster = create_cluster(mocker, topics={'t0', 't1', 't2', 't3'}, topics_partitions={0, 1}) + + subscriptions = { + 'C0': {'t0', 't1', 't2', 't3'}, + 'C1': {'t0', 't1', 't2', 't3'}, + 'C2': {'t0', 't1', 't2', 't3'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + del subscriptions['C1'] + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C0': ConsumerProtocolMemberAssignment( + StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' + ), + 'C2': ConsumerProtocolMemberAssignment( + StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b'' + ), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_assignor2(mocker): + """ + Given: there are three consumers C0, C1, C2, + and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. + Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. + C0 is subscribed to t0; + C1 is subscribed to t0, t1; + and C2 is subscribed to t0, t1, t2. + Then: perform the assignment + Expected: the assignment is + - C0 [t0p0] + - C1 [t1p0, t1p1] + - C2 [t2p0, t2p1, t2p2] + Then: remove C0 and perform the assignment + Expected: the assignment is + - C1 [t0p0, t1p0, t1p1] + - C2 [t2p0, t2p1, t2p2] + """ + + partitions = {'t0': {0}, 't1': {0, 1}, 't2': {0, 1, 2}} + cluster = create_cluster(mocker, topics={'t0', 't1', 't2'}, topic_partitions_lambda=lambda t: partitions[t]) + + subscriptions = { + 'C0': {'t0'}, + 'C1': {'t0', 't1'}, + 'C2': {'t0', 't1', 't2'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, []) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''), + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + del subscriptions['C0'] + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_one_consumer_no_topic(mocker): + cluster = create_cluster(mocker, topics={}, topics_partitions={}) + + subscriptions = { + 'C': set(), + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_one_consumer_nonexisting_topic(mocker): + cluster = create_cluster(mocker, topics={}, topics_partitions={}) + + subscriptions = { + 'C': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_one_consumer_one_topic(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker): + cluster = create_cluster(mocker, topics={'t', 'other-t'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_one_consumer_multiple_topics(mocker): + cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C': {'t1', 't2'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_two_consumers_one_topic_one_partition(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0}) + + subscriptions = { + 'C1': {'t'}, + 'C2': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_two_consumers_one_topic_two_partitions(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1}) + + subscriptions = { + 'C1': {'t'}, + 'C2': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [1])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker): + partitions = {'t1': {0, 1, 2}, 't2': {0, 1}} + cluster = create_cluster(mocker, topics={'t1', 't2'}, topic_partitions_lambda=lambda t: partitions[t]) + + subscriptions = { + 'C1': {'t1'}, + 'C2': {'t1', 't2'}, + 'C3': {'t1'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), + 'C3': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_add_remove_consumer_one_topic(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C1': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + } + assert_assignment(assignment, expected_assignment) + + subscriptions = { + 'C1': {'t'}, + 'C2': {'t'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata( + topics, assignment[member].partitions() if member in assignment else [] + ) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + subscriptions = { + 'C2': {'t'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert len(assignment['C2'].assignment[0][1]) == 3 + + +def test_sticky_add_remove_topic_two_consumers(mocker): + cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C1': {'t1'}, + 'C2': {'t1'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + subscriptions = { + 'C1': {'t1', 't2'}, + 'C2': {'t1', 't2'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + subscriptions = { + 'C1': {'t2'}, + 'C2': {'t2'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_sticky_reassignment_after_one_consumer_leaves(mocker): + partitions = dict([(f't{i}', set(range(i))) for i in range(1, 20)]) + cluster = create_cluster( + mocker, topics=set([f't{i}' for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t] + ) + + subscriptions = {} + for i in range(1, 20): + topics = set() + for j in range(1, i + 1): + topics.add(f't{j}') + subscriptions[f'C{i}'] = topics + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + del subscriptions['C10'] + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_sticky_reassignment_after_one_consumer_added(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions=set(range(20))) + + subscriptions = defaultdict(set) + for i in range(1, 10): + subscriptions[f'C{i}'] = {'t'} + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + subscriptions['C10'] = {'t'} + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata( + topics, assignment[member].partitions() if member in assignment else [] + ) + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_sticky_same_subscriptions(mocker): + partitions = dict([(f't{i}', set(range(i))) for i in range(1, 15)]) + cluster = create_cluster( + mocker, topics=set([f't{i}' for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t] + ) + + subscriptions = defaultdict(set) + for i in range(1, 9): + for j in range(1, len(partitions.keys()) + 1): + subscriptions[f'C{i}'].add(f't{j}') + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + del subscriptions['C5'] + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): + n_topics = 40 + n_consumers = 200 + + all_topics = set([f't{i}' for i in range(1, n_topics + 1)]) + partitions = dict([(t, set(range(1, randint(0, 10) + 1))) for t in all_topics]) + cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) + + subscriptions = defaultdict(set) + for i in range(1, n_consumers + 1): + for j in range(0, randint(1, 20)): + subscriptions[f'C{i}'].add(f't{randint(1, n_topics)}') + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + + for i in range(50): + member = f'C{randint(1, n_consumers)}' + if member in subscriptions: + del subscriptions[member] + del member_metadata[member] + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_new_subscription(mocker): + cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4'}, topics_partitions={0}) + + subscriptions = defaultdict(set) + for i in range(3): + for j in range(i, 3 * i - 2 + 1): + subscriptions[f'C{i}'].add(f't{j}') + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + subscriptions['C0'].add('t1') + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, []) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_move_existing_assignments(mocker): + cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4', 't5', 't6'}, topics_partitions={0}) + + subscriptions = { + 'C1': {'t1', 't2'}, + 'C2': {'t1', 't2', 't3', 't4'}, + 'C3': {'t2', 't3', 't4', 't5', 't6'}, + } + member_assignments = { + 'C1': [TopicPartition('t1', 0)], + 'C2': [TopicPartition('t2', 0), TopicPartition('t3', 0)], + 'C3': [TopicPartition('t4', 0), TopicPartition('t5', 0), TopicPartition('t6', 0)], + } + + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, member_assignments[member]) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + +def test_stickiness(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) + subscriptions = { + 'C1': {'t'}, + 'C2': {'t'}, + 'C3': {'t'}, + 'C4': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + partitions_assigned = {} + for consumer, consumer_assignment in assignment.items(): + assert ( + len(consumer_assignment.partitions()) <= 1 + ), f'Consumer {consumer} is assigned more topic partitions than expected.' + if len(consumer_assignment.partitions()) == 1: + partitions_assigned[consumer] = consumer_assignment.partitions()[0] + + # removing the potential group leader + del subscriptions['C1'] + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + for consumer, consumer_assignment in assignment.items(): + assert ( + len(consumer_assignment.partitions()) <= 1 + ), f'Consumer {consumer} is assigned more topic partitions than expected.' + assert ( + consumer not in partitions_assigned or partitions_assigned[consumer] in consumer_assignment.partitions() + ), f'Stickiness was not honored for consumer {consumer}' + + +def test_assignment_updated_for_deleted_topic(mocker): + def topic_partitions(topic): + if topic == 't1': + return {0} + if topic == 't3': + return set(range(100)) + + cluster = create_cluster(mocker, topics={'t1', 't3'}, topic_partitions_lambda=topic_partitions) + + subscriptions = { + 'C': {'t1', 't2', 't3'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) + + subscriptions = { + 'C': {'t'}, + } + member_metadata = make_member_metadata(subscriptions) + + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + subscriptions = { + 'C': {}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) + + cluster = create_cluster(mocker, topics={}, topics_partitions={}) + sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + expected_assignment = { + 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + } + assert_assignment(sticky_assignment, expected_assignment) + + +def test_conflicting_previous_assignments(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1}) + + subscriptions = { + 'C1': {'t'}, + 'C2': {'t'}, + } + member_metadata = {} + for member, topics in subscriptions.items(): + # assume both C1 and C2 have partition 1 assigned to them in generation 1 + member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + +@pytest.mark.parametrize( + 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] +) +def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): + all_topics = set([f't{i}' for i in range(1, n_topics + 1)]) + partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) + cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) + + subscriptions = defaultdict(set) + for i in range(n_consumers): + topics_sample = sample(all_topics, randint(1, len(all_topics) - 1)) + subscriptions[f'C{i}'].update(topics_sample) + + member_metadata = make_member_metadata(subscriptions) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + + subscriptions = defaultdict(set) + for i in range(n_consumers): + topics_sample = sample(all_topics, randint(1, len(all_topics) - 1)) + subscriptions[f'C{i}'].update(topics_sample) + + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, assignment[member].partitions()) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance(subscriptions, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_assignment_with_multiple_generations1(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) + + member_metadata = { + 'C1': build_metadata({'t'}, []), + 'C2': build_metadata({'t'}, []), + 'C3': build_metadata({'t'}, []), + } + + assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) + assert len(assignment1['C1'].assignment[0][1]) == 2 + assert len(assignment1['C2'].assignment[0][1]) == 2 + assert len(assignment1['C3'].assignment[0][1]) == 2 + + member_metadata = { + 'C1': build_metadata({'t'}, assignment1['C1'].partitions()), + 'C2': build_metadata({'t'}, assignment1['C2'].partitions()), + } + + assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2) + assert len(assignment2['C1'].assignment[0][1]) == 3 + assert len(assignment2['C2'].assignment[0][1]) == 3 + assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]]) + assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + member_metadata = { + 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + } + + assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3) + assert len(assignment3['C2'].assignment[0][1]) == 3 + assert len(assignment3['C3'].assignment[0][1]) == 3 + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def test_assignment_with_multiple_generations2(mocker): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) + + member_metadata = { + 'C1': build_metadata({'t'}, []), + 'C2': build_metadata({'t'}, []), + 'C3': build_metadata({'t'}, []), + } + + assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) + assert len(assignment1['C1'].assignment[0][1]) == 2 + assert len(assignment1['C2'].assignment[0][1]) == 2 + assert len(assignment1['C3'].assignment[0][1]) == 2 + + member_metadata = { + 'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1), + } + + assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C2': {'t'}}, assignment2) + assert len(assignment2['C2'].assignment[0][1]) == 6 + assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + member_metadata = { + 'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1), + 'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1), + } + + assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1]) + assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1]) + assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1]) + + +@pytest.mark.parametrize('execution_number', range(50)) +def test_assignment_with_conflicting_previous_generations(mocker, execution_number): + cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) + + member_assignments = { + 'C1': [TopicPartition('t', p) for p in {0, 1, 4}], + 'C2': [TopicPartition('t', p) for p in {0, 2, 3}], + 'C3': [TopicPartition('t', p) for p in {3, 4, 5}], + } + member_generations = { + 'C1': 1, + 'C2': 1, + 'C3': 2, + } + member_metadata = {} + for member in member_assignments.keys(): + member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member]) + + assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment) + assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + + +def make_member_metadata(subscriptions: Dict[str, Set[str]]): + member_metadata = {} + for member, topics in subscriptions.items(): + member_metadata[member] = build_metadata(topics, []) + return member_metadata + + +def build_metadata(topics, member_assignment_partitions, generation=-1): + partitions_by_topic = defaultdict(list) + for topic_partition in member_assignment_partitions: + partitions_by_topic[topic_partition.topic].append(topic_partition.partition) + data = StickyAssignorUserDataV1(partitions_by_topic.items(), generation) + user_data = data.encode() + return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data) + + +def assert_assignment(result_assignment, expected_assignment): + assert result_assignment == expected_assignment + assert set(result_assignment) == set(expected_assignment) + for member in result_assignment: + assert result_assignment[member].encode() == expected_assignment[member].encode() + + +def verify_validity_and_balance(subscriptions: Dict[str, Set[str]], assignment): + """ + Verifies that the given assignment is valid with respect to the given subscriptions + Validity requirements: + - each consumer is subscribed to topics of all partitions assigned to it, and + - each partition is assigned to no more than one consumer + Balance requirements: + - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or + - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions + + :param subscriptions topic subscriptions of each consumer + :param assignment: given assignment for balance check + """ + assert subscriptions.keys() == assignment.keys() + + consumers = sorted(list(assignment.keys())) + for i in range(len(consumers)): + consumer = consumers[i] + partitions = assignment[consumer].partitions() + for partition in partitions: + assert partition.topic in subscriptions[consumer], ( + f'Error: Partition {partition} is assigned to consumer {consumers[i]}, ' + f'but it is not subscribed to topic {partition.topic}\n' + f'Subscriptions: {subscriptions}\n' + f'Assignments: {assignment}' + ) + if i == len(consumers) - 1: + continue + + for j in range(i + 1, len(consumers)): + other_consumer = consumers[j] + other_partitions = assignment[other_consumer].partitions() + partitions_intersection = set(partitions).intersection(set(other_partitions)) + assert partitions_intersection == set(), ( + f'Error: Consumers {consumer} and {other_consumer} have common partitions ' + f'assigned to them: {partitions_intersection}\n' + f'Subscriptions: {subscriptions}\n' + f'Assignments: {assignment}' + ) + + if abs(len(partitions) - len(other_partitions)) <= 1: + continue + + assignments_by_topic = group_partitions_by_topic(partitions) + other_assignments_by_topic = group_partitions_by_topic(other_partitions) + if len(partitions) > len(other_partitions): + for topic in assignments_by_topic.keys(): + assert topic not in other_assignments_by_topic, ( + f'Error: Some partitions can be moved from {consumer} ({len(partitions)} partitions) ' + f'to {other_consumer} ({len(other_partitions)} partitions) ' + f'to achieve a better balance\n' + f'Subscriptions: {subscriptions}\n' + f'Assignments: {assignment}' + ) + if len(other_partitions) > len(partitions): + for topic in other_assignments_by_topic.keys(): + assert topic not in assignments_by_topic, ( + f'Error: Some partitions can be moved from {other_consumer} ({len(other_partitions)} partitions) ' + f'to {consumer} ({len(partitions)} partitions) ' + f'to achieve a better balance\n' + f'Subscriptions: {subscriptions}\n' + f'Assignments: {assignment}' + ) + + +def group_partitions_by_topic(partitions: List[TopicPartition]) -> Dict[str, Set[int]]: + result = defaultdict(set) + for p in partitions: + result[p.topic].add(p.partition) + return result diff --git a/test/test_coordinator.py b/test/test_coordinator.py index ea8f84bb6..d09a50819 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -9,6 +9,7 @@ SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( @@ -77,6 +78,10 @@ def test_group_protocols(coordinator): RoundRobinPartitionAssignor.version, ['foobar'], b'')), + ('sticky', ConsumerProtocolMemberMetadata( + StickyPartitionAssignor.version, + ['foobar'], + b'')), ] @@ -95,7 +100,7 @@ def test_pattern_subscription(coordinator, api_version): [(0, 'fizz', []), (0, 'foo1', [(0, 0, 0, [], [])]), (0, 'foo2', [(0, 0, 1, [], [])])])) - assert coordinator._subscription.subscription == set(['foo1', 'foo2']) + assert coordinator._subscription.subscription == {'foo1', 'foo2'} # 0.9 consumers should trigger dynamic partition assignment if api_version >= (0, 9): @@ -103,14 +108,14 @@ def test_pattern_subscription(coordinator, api_version): # earlier consumers get all partitions assigned locally else: - assert set(coordinator._subscription.assignment.keys()) == set([ - TopicPartition('foo1', 0), - TopicPartition('foo2', 0)]) + assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0), + TopicPartition('foo2', 0)} def test_lookup_assignor(coordinator): assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor assert coordinator._lookup_assignor('range') is RangePartitionAssignor + assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor assert coordinator._lookup_assignor('foobar') is None @@ -124,7 +129,7 @@ def test_join_complete(mocker, coordinator): coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert assignor.on_assignment.call_count == 1 - assignor.on_assignment.assert_called_with(assignment) + assignor.on_assignment.assert_called_with(assignment, 0) def test_subscription_listener(mocker, coordinator): @@ -141,9 +146,7 @@ def test_subscription_listener(mocker, coordinator): coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 - listener.on_partitions_assigned.assert_called_with(set([ - TopicPartition('foobar', 0), - TopicPartition('foobar', 1)])) + listener.on_partitions_assigned.assert_called_with({TopicPartition('foobar', 0), TopicPartition('foobar', 1)}) def test_subscription_listener_failure(mocker, coordinator): diff --git a/test/test_partition_movements.py b/test/test_partition_movements.py new file mode 100644 index 000000000..bc990bf3d --- /dev/null +++ b/test/test_partition_movements.py @@ -0,0 +1,23 @@ +from kafka.structs import TopicPartition + +from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements + + +def test_empty_movements_are_sticky(): + partition_movements = PartitionMovements() + assert partition_movements.are_sticky() + + +def test_sticky_movements(): + partition_movements = PartitionMovements() + partition_movements.move_partition(TopicPartition('t', 1), 'C1', 'C2') + partition_movements.move_partition(TopicPartition('t', 1), 'C2', 'C3') + partition_movements.move_partition(TopicPartition('t', 1), 'C3', 'C1') + assert partition_movements.are_sticky() + + +def test_should_detect_non_sticky_assignment(): + partition_movements = PartitionMovements() + partition_movements.move_partition(TopicPartition('t', 1), 'C1', 'C2') + partition_movements.move_partition(TopicPartition('t', 2), 'C2', 'C1') + assert not partition_movements.are_sticky() From c9149850887b66a74a18626c9fd7e885e9e3c4d9 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Mon, 27 Apr 2020 13:33:00 +0200 Subject: [PATCH 2/8] [KIP-54] Remove python3 typing hints --- .../assignors/sticky/partition_movements.py | 45 +++---- .../assignors/sticky/sticky_assignor.py | 114 +++++++++--------- test/test_assignors.py | 7 +- 3 files changed, 77 insertions(+), 89 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py index ab0cd3337..f1e146092 100644 --- a/kafka/coordinator/assignors/sticky/partition_movements.py +++ b/kafka/coordinator/assignors/sticky/partition_movements.py @@ -1,26 +1,21 @@ import logging -from collections import defaultdict +from collections import defaultdict, namedtuple from copy import deepcopy -from typing import Dict, Set, List, Tuple, NamedTuple - -from kafka.structs import TopicPartition log = logging.getLogger(__name__) -class ConsumerPair(NamedTuple): - """ - Represents a pair of Kafka consumer ids involved in a partition reassignment. - Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some - partition of the particular topic was moved from the source consumer to the destination consumer - during the rebalance. This class helps in determining whether a partition reassignment results in cycles among - the generated graph of consumer pairs. - """ - src_member_id: str - dst_member_id: str +ConsumerPair = namedtuple("ConsumerPair", ["src_member_id", "dst_member_id"]) +""" +Represents a pair of Kafka consumer ids involved in a partition reassignment. +Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some +partition of the particular topic was moved from the source consumer to the destination consumer +during the rebalance. This class helps in determining whether a partition reassignment results in cycles among +the generated graph of consumer pairs. +""" -def is_sublist(source: List, target: Tuple) -> bool: +def is_sublist(source, target): """Checks if one list is a sublist of another. Arguments: @@ -44,12 +39,12 @@ class PartitionMovements: """ def __init__(self): - self.partition_movements_by_topic: Dict[str, Dict[ConsumerPair, Set[TopicPartition]]] = defaultdict( + self.partition_movements_by_topic = defaultdict( lambda: defaultdict(set) ) - self.partition_movements: Dict[TopicPartition, ConsumerPair] = {} + self.partition_movements = {} - def move_partition(self, partition: TopicPartition, old_consumer: str, new_consumer: str): + def move_partition(self, partition, old_consumer, new_consumer): pair = ConsumerPair(src_member_id=old_consumer, dst_member_id=new_consumer) if partition in self.partition_movements: # this partition has previously moved @@ -63,7 +58,7 @@ def move_partition(self, partition: TopicPartition, old_consumer: str, new_consu else: self._add_partition_movement_record(partition, pair) - def get_partition_to_be_moved(self, partition: TopicPartition, old_consumer: str, new_consumer: str): + def get_partition_to_be_moved(self, partition, old_consumer, new_consumer): if partition.topic not in self.partition_movements_by_topic: return partition if partition in self.partition_movements: @@ -76,7 +71,7 @@ def get_partition_to_be_moved(self, partition: TopicPartition, old_consumer: str return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) - def are_sticky(self) -> bool: + def are_sticky(self): for topic, movements in self.partition_movements_by_topic.items(): movement_pairs = set(movements.keys()) if self._has_cycles(movement_pairs): @@ -88,7 +83,7 @@ def are_sticky(self) -> bool: return False return True - def _remove_movement_record_of_partition(self, partition: TopicPartition) -> ConsumerPair: + def _remove_movement_record_of_partition(self, partition): pair = self.partition_movements[partition] del self.partition_movements[partition] @@ -100,11 +95,11 @@ def _remove_movement_record_of_partition(self, partition: TopicPartition) -> Con return pair - def _add_partition_movement_record(self, partition: TopicPartition, pair: ConsumerPair): + def _add_partition_movement_record(self, partition, pair): self.partition_movements[partition] = pair self.partition_movements_by_topic[partition.topic][pair].add(partition) - def _has_cycles(self, consumer_pairs: Set[ConsumerPair]): + def _has_cycles(self, consumer_pairs): cycles = set() for pair in consumer_pairs: reduced_pairs = deepcopy(consumer_pairs) @@ -125,7 +120,7 @@ def _has_cycles(self, consumer_pairs: Set[ConsumerPair]): return False @staticmethod - def _is_subcycle(cycle: List[str], cycles: Set[Tuple[str]]) -> bool: + def _is_subcycle(cycle, cycles): super_cycle = deepcopy(cycle) super_cycle = super_cycle[:-1] super_cycle.extend(cycle) @@ -134,7 +129,7 @@ def _is_subcycle(cycle: List[str], cycles: Set[Tuple[str]]) -> bool: return True return False - def _is_linked(self, src: str, dst: str, pairs: Set[ConsumerPair], current_path: List[str]): + def _is_linked(self, src, dst, pairs, current_path): if src == dst: return False if not pairs: diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 86cfbaee1..9914bcf9a 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -1,7 +1,6 @@ import logging -from collections import defaultdict +from collections import defaultdict, namedtuple from copy import deepcopy -from typing import List, Dict, NamedTuple, Set, Tuple from sortedcontainers import SortedSet, SortedDict, SortedList @@ -16,13 +15,10 @@ log = logging.getLogger(__name__) +ConsumerGenerationPair = namedtuple("ConsumerGenerationPair", ["consumer", "generation"]) -class ConsumerGenerationPair(NamedTuple): - consumer: str - generation: int - -def has_identical_list_elements(list_: List[List]) -> bool: +def has_identical_list_elements(list_): """Checks if all lists in the collection have the same members Arguments: @@ -39,11 +35,11 @@ def has_identical_list_elements(list_: List[List]) -> bool: return True -def subscriptions_comparator_key(element: Tuple[str, Tuple[TopicPartition]]) -> Tuple[int, str]: +def subscriptions_comparator_key(element): return len(element[1]), element[0] -def partitions_comparator_key(element: Tuple[TopicPartition, Tuple[str]]) -> Tuple[int, str, int]: +def partitions_comparator_key(element): return len(element[1]), element[0].topic, element[0].partition @@ -54,10 +50,8 @@ def remove_if_present(collection, element): pass -class StickyAssignorMemberMetadataV1(NamedTuple): - subscription: List[str] - partitions: List[TopicPartition] - generation: int +StickyAssignorMemberMetadataV1 = namedtuple("StickyAssignorMemberMetadataV1", + ["subscription", "partitions", "generation"]) class StickyAssignorUserDataV1(Struct): @@ -72,26 +66,26 @@ class StickyAssignorUserDataV1(Struct): class StickyAssignmentExecutor: - def __init__(self, cluster: ClusterMetadata, members: Dict[str, StickyAssignorMemberMetadataV1]): + def __init__(self, cluster, members): self.members = members # a mapping between consumers and their assigned partitions that is updated during assignment procedure - self.current_assignment: Dict[str, List[TopicPartition]] = defaultdict(list) + self.current_assignment = defaultdict(list) # an assignment from a previous generation - self.previous_assignment: Dict[TopicPartition, ConsumerGenerationPair] = {} + self.previous_assignment = {} # a mapping between partitions and their assigned consumers - self.current_partition_consumer: Dict[TopicPartition, str] = {} + self.current_partition_consumer = {} # a flag indicating that there were no previous assignments performed ever self.is_fresh_assignment = False # a mapping of all topic partitions to all consumers that can be assigned to them - self.partition_to_all_potential_consumers: Dict[TopicPartition, List[str]] = {} + self.partition_to_all_potential_consumers = {} # a mapping of all consumers to all potential topic partitions that can be assigned to them - self.consumer_to_all_potential_partitions: Dict[str, List[TopicPartition]] = {} + self.consumer_to_all_potential_partitions = {} # an ascending sorted set of consumers based on how many topic partitions are already assigned to them - self.sorted_current_subscriptions: Set[Tuple[str, Tuple[TopicPartition]]] = set() + self.sorted_current_subscriptions = set() # an ascending sorted list of topic partitions based on how many consumers can potentially use them - self.sorted_partitions: List[TopicPartition] = [] + self.sorted_partitions = [] # all partitions that need to be assigned - self.unassigned_partitions: List[TopicPartition] = [] + self.unassigned_partitions = [] # a flag indicating that a certain partition cannot remain assigned to its current consumer because the consumer # is no longer subscribed to its topic self.revocation_required = False @@ -115,7 +109,7 @@ def balance(self): self._assign_partition(partition) # narrow down the reassignment scope to only those partitions that can actually be reassigned - fixed_partitions: Set[TopicPartition] = set() + fixed_partitions = set() for partition in self.partition_to_all_potential_consumers.keys(): if not self._can_partition_participate_in_reassignment(partition): fixed_partitions.add(partition) @@ -124,7 +118,7 @@ def balance(self): remove_if_present(self.unassigned_partitions, fixed_partition) # narrow down the reassignment scope to only those consumers that are subject to reassignment - fixed_assignments: Dict[str, List[TopicPartition]] = {} + fixed_assignments = {} for consumer in self.consumer_to_all_potential_partitions.keys(): if not self._can_consumer_participate_in_reassignment(consumer): self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) @@ -158,14 +152,14 @@ def balance(self): self.current_assignment[consumer] = partitions self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) - def get_final_assignment(self, member_id) -> List[Tuple[str, List[int]]]: - assignment: Dict[str, List[int]] = defaultdict(lambda: SortedList()) + def get_final_assignment(self, member_id): + assignment = defaultdict(lambda: SortedList()) for topic_partition in self.current_assignment[member_id]: assignment[topic_partition.topic].add(topic_partition.partition) # noinspection PyTypeChecker return list(assignment.items()) - def _initialize(self, cluster: ClusterMetadata): + def _initialize(self, cluster): self._init_current_assignments(self.members) for topic in cluster.topics(): @@ -189,17 +183,17 @@ def _initialize(self, cluster: ClusterMetadata): if consumer_id not in self.current_assignment: self.current_assignment[consumer_id] = [] - def _init_current_assignments(self, members: Dict[str, StickyAssignorMemberMetadataV1]): + def _init_current_assignments(self, members): # we need to process subscriptions' user data with each consumer's reported generation in mind # higher generations overwrite lower generations in case of a conflict # note that a conflict could exists only if user data is for different generations # for each partition we create a sorted map of its consumers by generation - sorted_partition_consumers_by_generation: Dict[TopicPartition, SortedDict[int, str]] = {} + sorted_partition_consumers_by_generation = {} for consumer, member_metadata in members.items(): for partitions in member_metadata.partitions: if partitions in sorted_partition_consumers_by_generation: - consumers: Dict[int, str] = sorted_partition_consumers_by_generation[partitions] + consumers = sorted_partition_consumers_by_generation[partitions] if member_metadata.generation and member_metadata.generation in consumers: # same partition is assigned to two consumers during the same rebalance. # log a warning and skip this record @@ -231,7 +225,7 @@ def _init_current_assignments(self, members: Dict[str, StickyAssignorMemberMetad for partition in partitions: self.current_partition_consumer[partition] = consumer_id - def _are_subscriptions_identical(self) -> bool: + def _are_subscriptions_identical(self): """ Returns: true, if both potential consumers of partitions and potential partitions that consumers can @@ -243,7 +237,7 @@ def _are_subscriptions_identical(self) -> bool: def _populate_sorted_partitions(self): # an ascending sorted set of topic partitions based on how many consumers can potentially use them - sorted_all_partitions: SortedSet[Tuple[TopicPartition, List[str]]] = SortedSet( + sorted_all_partitions = SortedSet( iterable=[(tp, tuple(consumers)) for tp, consumers in self.partition_to_all_potential_consumers.items()], key=partitions_comparator_key, ) @@ -262,7 +256,7 @@ def _populate_sorted_partitions(self): for partition in to_remove: partitions.remove(partition) - sorted_consumers: SortedSet[Tuple[str, Tuple[TopicPartition]]] = SortedSet( + sorted_consumers = SortedSet( iterable=[(consumer, tuple(partitions)) for consumer, partitions in assignments.items()], key=subscriptions_comparator_key, ) @@ -337,19 +331,19 @@ def _initialize_current_subscriptions(self): key=subscriptions_comparator_key, ) - def _get_consumer_with_least_subscriptions(self) -> str: + def _get_consumer_with_least_subscriptions(self): return self.sorted_current_subscriptions[0][0] - def _get_consumer_with_most_subscriptions(self) -> str: + def _get_consumer_with_most_subscriptions(self): return self.sorted_current_subscriptions[-1][0] - def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer: str): + def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) - def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer: str): + def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer): self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer]))) - def _is_balanced(self) -> bool: + def _is_balanced(self): """Determines if the current assignment is a balanced one""" if ( len(self.current_assignment[self._get_consumer_with_least_subscriptions()]) @@ -359,7 +353,7 @@ def _is_balanced(self) -> bool: return True # create a mapping from partitions to the consumer assigned to them - all_assigned_partitions: Dict[TopicPartition, str] = {} + all_assigned_partitions = {} for consumer_id, consumer_partitions in self.current_assignment.items(): for partition in consumer_partitions: if partition in all_assigned_partitions: @@ -384,7 +378,7 @@ def _is_balanced(self) -> bool: return False return True - def _assign_partition(self, partition: TopicPartition): + def _assign_partition(self, partition): for consumer, _ in self.sorted_current_subscriptions: if partition in self.consumer_to_all_potential_partitions[consumer]: self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) @@ -393,10 +387,10 @@ def _assign_partition(self, partition: TopicPartition): self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) break - def _can_partition_participate_in_reassignment(self, partition: TopicPartition) -> bool: + def _can_partition_participate_in_reassignment(self, partition): return len(self.partition_to_all_potential_consumers[partition]) >= 2 - def _can_consumer_participate_in_reassignment(self, consumer: str) -> bool: + def _can_consumer_participate_in_reassignment(self, consumer): current_partitions = self.current_assignment[consumer] current_assignment_size = len(current_partitions) max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) @@ -412,7 +406,7 @@ def _can_consumer_participate_in_reassignment(self, consumer: str) -> bool: return True return False - def _perform_reassignments(self, reassignable_partitions: List[TopicPartition]) -> bool: + def _perform_reassignments(self, reassignable_partitions): reassignment_performed = False # repeat reassignment until no partition can be moved to improve the balance @@ -455,7 +449,7 @@ def _perform_reassignments(self, reassignable_partitions: List[TopicPartition]) break return reassignment_performed - def _reassign_partition(self, partition: TopicPartition): + def _reassign_partition(self, partition): new_consumer = None for another_consumer, _ in self.sorted_current_subscriptions: if partition in self.consumer_to_all_potential_partitions[another_consumer]: @@ -464,13 +458,13 @@ def _reassign_partition(self, partition: TopicPartition): assert new_consumer is not None self._reassign_partition_to_consumer(partition, new_consumer) - def _reassign_partition_to_consumer(self, partition: TopicPartition, new_consumer: str): + def _reassign_partition_to_consumer(self, partition, new_consumer): consumer = self.current_partition_consumer[partition] # find the correct partition movement considering the stickiness requirement partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer) self._move_partition(partition_to_be_moved, new_consumer) - def _move_partition(self, partition: TopicPartition, new_consumer: str): + def _move_partition(self, partition, new_consumer): old_consumer = self.current_partition_consumer[partition] self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer) self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer) @@ -485,7 +479,7 @@ def _move_partition(self, partition: TopicPartition, new_consumer: str): self._add_consumer_to_current_subscriptions_and_maintain_order(old_consumer) @staticmethod - def _get_balance_score(assignment: Dict[str, List[TopicPartition]]) -> int: + def _get_balance_score(assignment): """Calculates a balance score of a give assignment as the sum of assigned partitions size difference of all consumer pairs. A perfectly balanced assignment (with all consumers getting the same number of partitions) @@ -498,16 +492,16 @@ def _get_balance_score(assignment: Dict[str, List[TopicPartition]]) -> int: the balance score of the assignment """ score = 0 - consumer2assignment: Dict[str, int] = {} + consumer_to_assignment = {} for consumer_id, partitions in assignment.items(): - consumer2assignment[consumer_id] = len(partitions) + consumer_to_assignment[consumer_id] = len(partitions) - consumers_to_explore = set(consumer2assignment.keys()) - for consumer_id in consumer2assignment.keys(): + consumers_to_explore = set(consumer_to_assignment.keys()) + for consumer_id in consumer_to_assignment.keys(): if consumer_id in consumers_to_explore: consumers_to_explore.remove(consumer_id) for other_consumer_id in consumers_to_explore: - score += abs(consumer2assignment[consumer_id] - consumer2assignment[other_consumer_id]) + score += abs(consumer_to_assignment[consumer_id] - consumer_to_assignment[other_consumer_id]) return score @@ -584,13 +578,13 @@ class StickyPartitionAssignor(AbstractPartitionAssignor): name = "sticky" version = 0 - member_assignment: List[TopicPartition] = None - generation: int = DEFAULT_GENERATION_ID + member_assignment = None + generation = DEFAULT_GENERATION_ID _latest_partition_movements = None @classmethod - def assign(cls, cluster: ClusterMetadata, members: dict): + def assign(cls, cluster, members): """Performs group assignment given cluster metadata and member subscriptions Arguments: @@ -600,7 +594,7 @@ def assign(cls, cluster: ClusterMetadata, members: dict): Returns: dict: {member_id: MemberAssignment} """ - members_metadata: Dict[str, StickyAssignorMemberMetadataV1] = {} + members_metadata = {} for consumer, member_metadata in members.items(): members_metadata[consumer] = cls.parse_member_metadata(member_metadata) @@ -618,7 +612,7 @@ def assign(cls, cluster: ClusterMetadata, members: dict): return assignment @classmethod - def parse_member_metadata(cls, metadata) -> StickyAssignorMemberMetadataV1: + def parse_member_metadata(cls, metadata): """ Parses member metadata into a python object. This implementation only serializes and deserializes the StickyAssignorMemberMetadataV1 user data, @@ -646,7 +640,7 @@ def parse_member_metadata(cls, metadata) -> StickyAssignorMemberMetadataV1: partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription ) - member_partitions: List[TopicPartition] = [] + member_partitions = [] for topic, partitions in decoded_user_data.previous_assignment: member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) return StickyAssignorMemberMetadataV1( @@ -669,7 +663,7 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) @classmethod - def on_assignment(cls, assignment, generation: int): + def on_assignment(cls, assignment, generation): """Callback that runs on each assignment. Updates assignor's state. Arguments: @@ -677,5 +671,5 @@ def on_assignment(cls, assignment, generation: int): generation: generation id (if present) """ log.debug(f"On assignment: assignment={assignment}, generation={generation}") - cls.member_assignment: List[TopicPartition] = assignment.partitions() + cls.member_assignment = assignment.partitions() cls.generation = generation diff --git a/test/test_assignors.py b/test/test_assignors.py index 1cbe7f10a..2c09c0ccf 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -3,7 +3,6 @@ from collections import defaultdict from random import randint, sample -from typing import Set, Dict, List import pytest @@ -785,7 +784,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb assert StickyPartitionAssignor._latest_partition_movements.are_sticky() -def make_member_metadata(subscriptions: Dict[str, Set[str]]): +def make_member_metadata(subscriptions): member_metadata = {} for member, topics in subscriptions.items(): member_metadata[member] = build_metadata(topics, []) @@ -808,7 +807,7 @@ def assert_assignment(result_assignment, expected_assignment): assert result_assignment[member].encode() == expected_assignment[member].encode() -def verify_validity_and_balance(subscriptions: Dict[str, Set[str]], assignment): +def verify_validity_and_balance(subscriptions, assignment): """ Verifies that the given assignment is valid with respect to the given subscriptions Validity requirements: @@ -873,7 +872,7 @@ def verify_validity_and_balance(subscriptions: Dict[str, Set[str]], assignment): ) -def group_partitions_by_topic(partitions: List[TopicPartition]) -> Dict[str, Set[int]]: +def group_partitions_by_topic(partitions): result = defaultdict(set) for p in partitions: result[p.topic].add(p.partition) From 32f35be78374677f1489a6d3d1f4c2040c20f97c Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Mon, 27 Apr 2020 13:45:58 +0200 Subject: [PATCH 3/8] [KIP-54] Use six instead of direct usages of for keys(), values() and items() --- .../assignors/sticky/partition_movements.py | 4 +- .../assignors/sticky/sticky_assignor.py | 44 ++++++++------- test/test_assignors.py | 55 ++++++++++--------- 3 files changed, 54 insertions(+), 49 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py index f1e146092..1e4dfcc65 100644 --- a/kafka/coordinator/assignors/sticky/partition_movements.py +++ b/kafka/coordinator/assignors/sticky/partition_movements.py @@ -2,6 +2,8 @@ from collections import defaultdict, namedtuple from copy import deepcopy +from kafka.vendor import six + log = logging.getLogger(__name__) @@ -72,7 +74,7 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer): return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) def are_sticky(self): - for topic, movements in self.partition_movements_by_topic.items(): + for topic, movements in six.iteritems(self.partition_movements_by_topic): movement_pairs = set(movements.keys()) if self._has_cycles(movement_pairs): log.error( diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 9914bcf9a..543e5aafc 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -12,6 +12,7 @@ from kafka.protocol.struct import Struct from kafka.protocol.types import String, Array, Int32 from kafka.structs import TopicPartition +from kafka.vendor import six log = logging.getLogger(__name__) @@ -110,7 +111,7 @@ def balance(self): # narrow down the reassignment scope to only those partitions that can actually be reassigned fixed_partitions = set() - for partition in self.partition_to_all_potential_consumers.keys(): + for partition in six.iterkeys(self.partition_to_all_potential_consumers): if not self._can_partition_participate_in_reassignment(partition): fixed_partitions.add(partition) for fixed_partition in fixed_partitions: @@ -119,7 +120,7 @@ def balance(self): # narrow down the reassignment scope to only those consumers that are subject to reassignment fixed_assignments = {} - for consumer in self.consumer_to_all_potential_partitions.keys(): + for consumer in six.iterkeys(self.consumer_to_all_potential_partitions): if not self._can_consumer_participate_in_reassignment(consumer): self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) fixed_assignments[consumer] = self.current_assignment[consumer] @@ -148,7 +149,7 @@ def balance(self): self.current_partition_consumer.update(prebalance_partition_consumers) # add the fixed assignments (those that could not change) back - for consumer, partitions in fixed_assignments.items(): + for consumer, partitions in six.iteritems(fixed_assignments): self.current_assignment[consumer] = partitions self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) @@ -156,8 +157,7 @@ def get_final_assignment(self, member_id): assignment = defaultdict(lambda: SortedList()) for topic_partition in self.current_assignment[member_id]: assignment[topic_partition.topic].add(topic_partition.partition) - # noinspection PyTypeChecker - return list(assignment.items()) + return six.viewitems(assignment) def _initialize(self, cluster): self._init_current_assignments(self.members) @@ -170,7 +170,7 @@ def _initialize(self, cluster): for p in partitions: partition = TopicPartition(topic=topic, partition=p) self.partition_to_all_potential_consumers[partition] = [] - for consumer_id, member_metadata in self.members.items(): + for consumer_id, member_metadata in six.iteritems(self.members): self.consumer_to_all_potential_partitions[consumer_id] = [] for topic in member_metadata.subscription: if cluster.partitions_for_topic(topic) is None: @@ -190,7 +190,7 @@ def _init_current_assignments(self, members): # for each partition we create a sorted map of its consumers by generation sorted_partition_consumers_by_generation = {} - for consumer, member_metadata in members.items(): + for consumer, member_metadata in six.iteritems(members): for partitions in member_metadata.partitions: if partitions in sorted_partition_consumers_by_generation: consumers = sorted_partition_consumers_by_generation[partitions] @@ -210,7 +210,7 @@ def _init_current_assignments(self, members): # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition # current and previous consumers are the last two consumers of each partition in the above sorted map - for partitions, consumers in sorted_partition_consumers_by_generation.items(): + for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation): generations = list(reversed(consumers.keys())) self.current_assignment[consumers[generations[0]]].append(partitions) # now update previous assignment if any @@ -221,7 +221,7 @@ def _init_current_assignments(self, members): self.is_fresh_assignment = len(self.current_assignment) == 0 - for consumer_id, partitions in self.current_assignment.items(): + for consumer_id, partitions in six.iteritems(self.current_assignment): for partition in partitions: self.current_partition_consumer[partition] = consumer_id @@ -231,14 +231,16 @@ def _are_subscriptions_identical(self): true, if both potential consumers of partitions and potential partitions that consumers can consume are the same """ - if not has_identical_list_elements(list(self.partition_to_all_potential_consumers.values())): + if not has_identical_list_elements(list(six.itervalues(self.partition_to_all_potential_consumers))): return False - return has_identical_list_elements(list(self.consumer_to_all_potential_partitions.values())) + return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions))) def _populate_sorted_partitions(self): # an ascending sorted set of topic partitions based on how many consumers can potentially use them sorted_all_partitions = SortedSet( - iterable=[(tp, tuple(consumers)) for tp, consumers in self.partition_to_all_potential_consumers.items()], + iterable=[ + (tp, tuple(consumers)) for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers) + ], key=partitions_comparator_key, ) @@ -248,7 +250,7 @@ def _populate_sorted_partitions(self): # then we just need to simply list partitions in a round robin fashion (from consumers with # most assigned partitions to those with least) assignments = deepcopy(self.current_assignment) - for consumer_id, partitions in assignments.items(): + for consumer_id, partitions in six.iteritems(assignments): to_remove = [] for partition in partitions: if partition not in self.partition_to_all_potential_consumers: @@ -257,7 +259,7 @@ def _populate_sorted_partitions(self): partitions.remove(partition) sorted_consumers = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in assignments.items()], + iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(assignments)], key=subscriptions_comparator_key, ) # at this point, sorted_consumers contains an ascending-sorted list of consumers based on @@ -269,7 +271,7 @@ def _populate_sorted_partitions(self): remaining_partitions = assignments[consumer] # from partitions that had a different consumer before, # keep only those that are assigned to this consumer now - previous_partitions = set(self.previous_assignment.keys()).intersection(set(remaining_partitions)) + previous_partitions = set(six.iterkeys(self.previous_assignment)).intersection(set(remaining_partitions)) if previous_partitions: # if there is a partition of this consumer that was assigned to another consumer before # mark it as good options for reassignment @@ -294,7 +296,7 @@ def _populate_partitions_to_reassign(self): self.unassigned_partitions = deepcopy(self.sorted_partitions) assignments_to_remove = [] - for consumer_id, partitions in self.current_assignment.items(): + for consumer_id, partitions in six.iteritems(self.current_assignment): if consumer_id not in self.members: # if a consumer that existed before (and had some partition assignments) is now removed, # remove it from current_assignment @@ -327,7 +329,7 @@ def _populate_partitions_to_reassign(self): def _initialize_current_subscriptions(self): self.sorted_current_subscriptions = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in self.current_assignment.items()], + iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(self.current_assignment)], key=subscriptions_comparator_key, ) @@ -354,7 +356,7 @@ def _is_balanced(self): # create a mapping from partitions to the consumer assigned to them all_assigned_partitions = {} - for consumer_id, consumer_partitions in self.current_assignment.items(): + for consumer_id, consumer_partitions in six.iteritems(self.current_assignment): for partition in consumer_partitions: if partition in all_assigned_partitions: log.error(f"{partition} is assigned to more than one consumer.") @@ -493,7 +495,7 @@ def _get_balance_score(assignment): """ score = 0 consumer_to_assignment = {} - for consumer_id, partitions in assignment.items(): + for consumer_id, partitions in six.iteritems(assignment): consumer_to_assignment[consumer_id] = len(partitions) consumers_to_explore = set(consumer_to_assignment.keys()) @@ -595,7 +597,7 @@ def assign(cls, cluster, members): dict: {member_id: MemberAssignment} """ members_metadata = {} - for consumer, member_metadata in members.items(): + for consumer, member_metadata in six.iteritems(members): members_metadata[consumer] = cls.parse_member_metadata(member_metadata) executor = StickyAssignmentExecutor(cluster, members_metadata) @@ -658,7 +660,7 @@ def metadata(cls, topics): partitions_by_topic = defaultdict(list) for topic_partition in cls.member_assignment: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(partitions_by_topic.items(), cls.generation) + data = StickyAssignorUserDataV1(six.iteritems(partitions_by_topic), cls.generation) user_data = data.encode() return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) diff --git a/test/test_assignors.py b/test/test_assignors.py index 2c09c0ccf..89c981e67 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -11,6 +11,7 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1 from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata +from kafka.vendor import six @pytest.fixture(autouse=True) @@ -109,7 +110,7 @@ def test_sticky_assignor1(mocker): del subscriptions['C1'] member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -152,7 +153,7 @@ def test_sticky_assignor2(mocker): 'C2': {'t0', 't1', 't2'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, []) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -165,7 +166,7 @@ def test_sticky_assignor2(mocker): del subscriptions['C0'] member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -324,7 +325,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): 'C2': {'t'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata( topics, assignment[member].partitions() if member in assignment else [] ) @@ -336,7 +337,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): 'C2': {'t'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -365,7 +366,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): 'C2': {'t1', 't2'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -380,7 +381,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker): 'C2': {'t2'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -411,7 +412,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker): del subscriptions['C10'] member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -433,7 +434,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): subscriptions['C10'] = {'t'} member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata( topics, assignment[member].partitions() if member in assignment else [] ) @@ -450,7 +451,7 @@ def test_sticky_same_subscriptions(mocker): subscriptions = defaultdict(set) for i in range(1, 9): - for j in range(1, len(partitions.keys()) + 1): + for j in range(1, len(six.viewkeys(partitions)) + 1): subscriptions[f'C{i}'].add(f't{j}') member_metadata = make_member_metadata(subscriptions) @@ -460,7 +461,7 @@ def test_sticky_same_subscriptions(mocker): del subscriptions['C5'] member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) @@ -486,7 +487,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): verify_validity_and_balance(subscriptions, assignment) member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) for i in range(50): @@ -515,7 +516,7 @@ def test_new_subscription(mocker): subscriptions['C0'].add('t1') member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, []) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -538,7 +539,7 @@ def test_move_existing_assignments(mocker): } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, member_assignments[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -558,7 +559,7 @@ def test_stickiness(mocker): assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) partitions_assigned = {} - for consumer, consumer_assignment in assignment.items(): + for consumer, consumer_assignment in six.iteritems(assignment): assert ( len(consumer_assignment.partitions()) <= 1 ), f'Consumer {consumer} is assigned more topic partitions than expected.' @@ -568,14 +569,14 @@ def test_stickiness(mocker): # removing the potential group leader del subscriptions['C1'] member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() - for consumer, consumer_assignment in assignment.items(): + for consumer, consumer_assignment in six.iteritems(assignment): assert ( len(consumer_assignment.partitions()) <= 1 ), f'Consumer {consumer} is assigned more topic partitions than expected.' @@ -623,7 +624,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): 'C': {}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions()) cluster = create_cluster(mocker, topics={}, topics_partitions={}) @@ -642,7 +643,7 @@ def test_conflicting_previous_assignments(mocker): 'C2': {'t'}, } member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): # assume both C1 and C2 have partition 1 assigned to them in generation 1 member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) @@ -674,7 +675,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu subscriptions[f'C{i}'].update(topics_sample) member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -776,7 +777,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb 'C3': 2, } member_metadata = {} - for member in member_assignments.keys(): + for member in six.iterkeys(member_assignments): member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member]) assignment = StickyPartitionAssignor.assign(cluster, member_metadata) @@ -786,7 +787,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb def make_member_metadata(subscriptions): member_metadata = {} - for member, topics in subscriptions.items(): + for member, topics in six.iteritems(subscriptions): member_metadata[member] = build_metadata(topics, []) return member_metadata @@ -795,7 +796,7 @@ def build_metadata(topics, member_assignment_partitions, generation=-1): partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(partitions_by_topic.items(), generation) + data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation) user_data = data.encode() return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data) @@ -820,9 +821,9 @@ def verify_validity_and_balance(subscriptions, assignment): :param subscriptions topic subscriptions of each consumer :param assignment: given assignment for balance check """ - assert subscriptions.keys() == assignment.keys() + assert six.viewkeys(subscriptions) == six.viewkeys(assignment) - consumers = sorted(list(assignment.keys())) + consumers = sorted(six.viewkeys(assignment)) for i in range(len(consumers)): consumer = consumers[i] partitions = assignment[consumer].partitions() @@ -853,7 +854,7 @@ def verify_validity_and_balance(subscriptions, assignment): assignments_by_topic = group_partitions_by_topic(partitions) other_assignments_by_topic = group_partitions_by_topic(other_partitions) if len(partitions) > len(other_partitions): - for topic in assignments_by_topic.keys(): + for topic in six.iterkeys(assignments_by_topic): assert topic not in other_assignments_by_topic, ( f'Error: Some partitions can be moved from {consumer} ({len(partitions)} partitions) ' f'to {other_consumer} ({len(other_partitions)} partitions) ' @@ -862,7 +863,7 @@ def verify_validity_and_balance(subscriptions, assignment): f'Assignments: {assignment}' ) if len(other_partitions) > len(partitions): - for topic in other_assignments_by_topic.keys(): + for topic in six.iterkeys(other_assignments_by_topic): assert topic not in assignments_by_topic, ( f'Error: Some partitions can be moved from {other_consumer} ({len(other_partitions)} partitions) ' f'to {consumer} ({len(partitions)} partitions) ' From d16d91f859345b310518938958baaf2ba6779c93 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Mon, 27 Apr 2020 19:15:12 +0200 Subject: [PATCH 4/8] [KIP-54] f-strings -> string.format --- .../assignors/sticky/partition_movements.py | 8 +-- .../assignors/sticky/sticky_assignor.py | 18 ++--- test/test_assignors.py | 72 +++++++++---------- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/kafka/coordinator/assignors/sticky/partition_movements.py index 1e4dfcc65..8851e4cda 100644 --- a/kafka/coordinator/assignors/sticky/partition_movements.py +++ b/kafka/coordinator/assignors/sticky/partition_movements.py @@ -78,9 +78,9 @@ def are_sticky(self): movement_pairs = set(movements.keys()) if self._has_cycles(movement_pairs): log.error( - f"Stickiness is violated for topic {topic}\n" - f"Partition movements for this topic occurred among the following consumer pairs:\n" - f"{movement_pairs}" + "Stickiness is violated for topic {}\n" + "Partition movements for this topic occurred among the following consumer pairs:\n" + "{}".format(topic, movement_pairs) ) return False return True @@ -111,7 +111,7 @@ def _has_cycles(self, consumer_pairs): path, cycles ): cycles.add(tuple(path)) - log.error(f"A cycle of length {len(path) - 1} was found: {path}") + log.error("A cycle of length {} was found: {}".format(len(path) - 1, path)) # for now we want to make sure there is no partition movements of the same topic between a pair of consumers. # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 543e5aafc..f729edab4 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -174,7 +174,7 @@ def _initialize(self, cluster): self.consumer_to_all_potential_partitions[consumer_id] = [] for topic in member_metadata.subscription: if cluster.partitions_for_topic(topic) is None: - log.warning(f"No partition metadata for topic {topic}") + log.warning("No partition metadata for topic {}".format(topic)) continue for p in cluster.partitions_for_topic(topic): partition = TopicPartition(topic=topic, partition=p) @@ -198,8 +198,8 @@ def _init_current_assignments(self, members): # same partition is assigned to two consumers during the same rebalance. # log a warning and skip this record log.warning( - f"Partition {partitions} is assigned to multiple consumers " - f"following sticky assignment generation {member_metadata.generation}." + "Partition {} is assigned to multiple consumers " + "following sticky assignment generation {}.".format(partitions, member_metadata.generation) ) else: consumers[member_metadata.generation] = consumer @@ -359,7 +359,7 @@ def _is_balanced(self): for consumer_id, consumer_partitions in six.iteritems(self.current_assignment): for partition in consumer_partitions: if partition in all_assigned_partitions: - log.error(f"{partition} is assigned to more than one consumer.") + log.error("{} is assigned to more than one consumer.".format(partition)) all_assigned_partitions[partition] = consumer_id # for each consumer that does not have all the topic partitions it can get @@ -397,7 +397,7 @@ def _can_consumer_participate_in_reassignment(self, consumer): current_assignment_size = len(current_partitions) max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) if current_assignment_size > max_assignment_size: - log.error(f"The consumer {consumer} is assigned more partitions than the maximum possible.") + log.error("The consumer {} is assigned more partitions than the maximum possible.".format(consumer)) if current_assignment_size < max_assignment_size: # if a consumer is not assigned all its potential partitions it is subject to reassignment return True @@ -421,11 +421,11 @@ def _perform_reassignments(self, reassignable_partitions): break # the partition must have at least two potential consumers if len(self.partition_to_all_potential_consumers[partition]) <= 1: - log.error(f"Expected more than one potential consumer for partition {partition}") + log.error("Expected more than one potential consumer for partition {}".format(partition)) # the partition must have a current consumer consumer = self.current_partition_consumer.get(partition) if consumer is None: - log.error(f"Expected partition {partition} to be assigned to a consumer") + log.error("Expected partition {} to be assigned to a consumer".format(partition)) if ( partition in self.previous_assignment @@ -656,7 +656,7 @@ def metadata(cls, topics): log.debug("No member assignment available") user_data = b'' else: - log.debug(f"Member assignment is available, generating the metadata: generation {cls.generation}") + log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) partitions_by_topic = defaultdict(list) for topic_partition in cls.member_assignment: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) @@ -672,6 +672,6 @@ def on_assignment(cls, assignment, generation): assignment: MemberAssignment generation: generation id (if present) """ - log.debug(f"On assignment: assignment={assignment}, generation={generation}") + log.debug("On assignment: assignment={}, generation={}".format(assignment, generation)) cls.member_assignment = assignment.partitions() cls.generation = generation diff --git a/test/test_assignors.py b/test/test_assignors.py index 89c981e67..016ff8e26 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -393,17 +393,17 @@ def test_sticky_add_remove_topic_two_consumers(mocker): def test_sticky_reassignment_after_one_consumer_leaves(mocker): - partitions = dict([(f't{i}', set(range(i))) for i in range(1, 20)]) + partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 20)]) cluster = create_cluster( - mocker, topics=set([f't{i}' for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t] + mocker, topics=set(['t{}'.format(i) for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t] ) subscriptions = {} for i in range(1, 20): topics = set() for j in range(1, i + 1): - topics.add(f't{j}') - subscriptions[f'C{i}'] = topics + topics.add('t{}'.format(j)) + subscriptions['C{}'.format(i)] = topics member_metadata = make_member_metadata(subscriptions) @@ -425,7 +425,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): subscriptions = defaultdict(set) for i in range(1, 10): - subscriptions[f'C{i}'] = {'t'} + subscriptions['C{}'.format(i)] = {'t'} member_metadata = make_member_metadata(subscriptions) @@ -444,15 +444,15 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): def test_sticky_same_subscriptions(mocker): - partitions = dict([(f't{i}', set(range(i))) for i in range(1, 15)]) + partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 15)]) cluster = create_cluster( - mocker, topics=set([f't{i}' for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t] + mocker, topics=set(['t{}'.format(i) for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t] ) subscriptions = defaultdict(set) for i in range(1, 9): for j in range(1, len(six.viewkeys(partitions)) + 1): - subscriptions[f'C{i}'].add(f't{j}') + subscriptions['C{}'.format(i)].add('t{}'.format(j)) member_metadata = make_member_metadata(subscriptions) @@ -472,14 +472,14 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): n_topics = 40 n_consumers = 200 - all_topics = set([f't{i}' for i in range(1, n_topics + 1)]) + all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, randint(0, 10) + 1))) for t in all_topics]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) subscriptions = defaultdict(set) for i in range(1, n_consumers + 1): for j in range(0, randint(1, 20)): - subscriptions[f'C{i}'].add(f't{randint(1, n_topics)}') + subscriptions['C{}'.format(i)].add('t{}'.format(randint(1, n_topics))) member_metadata = make_member_metadata(subscriptions) @@ -491,7 +491,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): member_metadata[member] = build_metadata(topics, assignment[member].partitions()) for i in range(50): - member = f'C{randint(1, n_consumers)}' + member = 'C{}'.format(randint(1, n_consumers)) if member in subscriptions: del subscriptions[member] del member_metadata[member] @@ -507,7 +507,7 @@ def test_new_subscription(mocker): subscriptions = defaultdict(set) for i in range(3): for j in range(i, 3 * i - 2 + 1): - subscriptions[f'C{i}'].add(f't{j}') + subscriptions['C{}'.format(i)].add('t{}'.format(j)) member_metadata = make_member_metadata(subscriptions) @@ -562,7 +562,7 @@ def test_stickiness(mocker): for consumer, consumer_assignment in six.iteritems(assignment): assert ( len(consumer_assignment.partitions()) <= 1 - ), f'Consumer {consumer} is assigned more topic partitions than expected.' + ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer) if len(consumer_assignment.partitions()) == 1: partitions_assigned[consumer] = consumer_assignment.partitions()[0] @@ -579,10 +579,10 @@ def test_stickiness(mocker): for consumer, consumer_assignment in six.iteritems(assignment): assert ( len(consumer_assignment.partitions()) <= 1 - ), f'Consumer {consumer} is assigned more topic partitions than expected.' + ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer) assert ( consumer not in partitions_assigned or partitions_assigned[consumer] in consumer_assignment.partitions() - ), f'Stickiness was not honored for consumer {consumer}' + ), 'Stickiness was not honored for consumer {}'.format(consumer) def test_assignment_updated_for_deleted_topic(mocker): @@ -655,14 +655,14 @@ def test_conflicting_previous_assignments(mocker): 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] ) def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): - all_topics = set([f't{i}' for i in range(1, n_topics + 1)]) + all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) subscriptions = defaultdict(set) for i in range(n_consumers): topics_sample = sample(all_topics, randint(1, len(all_topics) - 1)) - subscriptions[f'C{i}'].update(topics_sample) + subscriptions['C{}'.format(i)].update(topics_sample) member_metadata = make_member_metadata(subscriptions) @@ -672,7 +672,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu subscriptions = defaultdict(set) for i in range(n_consumers): topics_sample = sample(all_topics, randint(1, len(all_topics) - 1)) - subscriptions[f'C{i}'].update(topics_sample) + subscriptions['C{}'.format(i)].update(topics_sample) member_metadata = {} for member, topics in six.iteritems(subscriptions): @@ -829,10 +829,10 @@ def verify_validity_and_balance(subscriptions, assignment): partitions = assignment[consumer].partitions() for partition in partitions: assert partition.topic in subscriptions[consumer], ( - f'Error: Partition {partition} is assigned to consumer {consumers[i]}, ' - f'but it is not subscribed to topic {partition.topic}\n' - f'Subscriptions: {subscriptions}\n' - f'Assignments: {assignment}' + 'Error: Partition {} is assigned to consumer {}, ' + 'but it is not subscribed to topic {}\n' + 'Subscriptions: {}\n' + 'Assignments: {}'.format(partition, consumers[i], partition.topic, subscriptions, assignment) ) if i == len(consumers) - 1: continue @@ -842,10 +842,10 @@ def verify_validity_and_balance(subscriptions, assignment): other_partitions = assignment[other_consumer].partitions() partitions_intersection = set(partitions).intersection(set(other_partitions)) assert partitions_intersection == set(), ( - f'Error: Consumers {consumer} and {other_consumer} have common partitions ' - f'assigned to them: {partitions_intersection}\n' - f'Subscriptions: {subscriptions}\n' - f'Assignments: {assignment}' + 'Error: Consumers {} and {} have common partitions ' + 'assigned to them: {}\n' + 'Subscriptions: {}\n' + 'Assignments: {}'.format(consumer, other_consumer, partitions_intersection, subscriptions, assignment) ) if abs(len(partitions) - len(other_partitions)) <= 1: @@ -856,20 +856,20 @@ def verify_validity_and_balance(subscriptions, assignment): if len(partitions) > len(other_partitions): for topic in six.iterkeys(assignments_by_topic): assert topic not in other_assignments_by_topic, ( - f'Error: Some partitions can be moved from {consumer} ({len(partitions)} partitions) ' - f'to {other_consumer} ({len(other_partitions)} partitions) ' - f'to achieve a better balance\n' - f'Subscriptions: {subscriptions}\n' - f'Assignments: {assignment}' + 'Error: Some partitions can be moved from {} ({} partitions) ' + 'to {} ({} partitions) ' + 'to achieve a better balance\n' + 'Subscriptions: {}\n' + 'Assignments: {}'.format(consumer, len(partitions), other_consumer, len(other_partitions), subscriptions, assignment) ) if len(other_partitions) > len(partitions): for topic in six.iterkeys(other_assignments_by_topic): assert topic not in assignments_by_topic, ( - f'Error: Some partitions can be moved from {other_consumer} ({len(other_partitions)} partitions) ' - f'to {consumer} ({len(partitions)} partitions) ' - f'to achieve a better balance\n' - f'Subscriptions: {subscriptions}\n' - f'Assignments: {assignment}' + 'Error: Some partitions can be moved from {} ({} partitions) ' + 'to {} ({} partitions) ' + 'to achieve a better balance\n' + 'Subscriptions: {}\n' + 'Assignments: {}'.format(other_consumer, len(other_partitions), consumer, len(partitions), subscriptions, assignment) ) From 6d5e61218e38c4bb301d3f62ba0bf86c32d9e16a Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Tue, 26 May 2020 17:14:07 +0200 Subject: [PATCH 5/8] [KIP-54] Disable no-member and not-an-iterable checks for assignment variables --- kafka/coordinator/assignors/sticky/sticky_assignor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index f729edab4..4d85bdb29 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -643,7 +643,7 @@ def parse_member_metadata(cls, metadata): ) member_partitions = [] - for topic, partitions in decoded_user_data.previous_assignment: + for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) return StickyAssignorMemberMetadataV1( # pylint: disable=no-member @@ -658,7 +658,7 @@ def metadata(cls, topics): else: log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) partitions_by_topic = defaultdict(list) - for topic_partition in cls.member_assignment: + for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable partitions_by_topic[topic_partition.topic].append(topic_partition.partition) data = StickyAssignorUserDataV1(six.iteritems(partitions_by_topic), cls.generation) user_data = data.encode() From ae5f2aeaf90e8fe07a11f9f99eb9da9ffcf30894 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Tue, 22 Sep 2020 15:01:07 +0200 Subject: [PATCH 6/8] [KIP-54] Split on_assignment method to allow backwards compatibility. --- kafka/coordinator/assignors/abstract.py | 3 +-- kafka/coordinator/assignors/range.py | 2 +- kafka/coordinator/assignors/roundrobin.py | 2 +- .../assignors/sticky/sticky_assignor.py | 14 ++++++++++--- kafka/coordinator/consumer.py | 4 +++- test/test_coordinator.py | 21 ++++++++++++++++--- 6 files changed, 35 insertions(+), 11 deletions(-) diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index 5f3431bce..a1fef3840 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -44,7 +44,7 @@ def metadata(self, topics): pass @abc.abstractmethod - def on_assignment(self, assignment, generation): + def on_assignment(self, assignment): """Callback that runs on each assignment. This method can be used to update internal state, if any, of the @@ -52,6 +52,5 @@ def on_assignment(self, assignment, generation): Arguments: assignment (MemberAssignment): the member's assignment - generation (int): generation id """ pass diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py index 6612d7cd8..299e39c48 100644 --- a/kafka/coordinator/assignors/range.py +++ b/kafka/coordinator/assignors/range.py @@ -73,5 +73,5 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') @classmethod - def on_assignment(cls, assignment, generation): + def on_assignment(cls, assignment): pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index 2292ebdc9..2d24a5c8b 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -92,5 +92,5 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') @classmethod - def on_assignment(cls, assignment, generation): + def on_assignment(cls, assignment): pass diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 4d85bdb29..11a1cbfe2 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -665,13 +665,21 @@ def metadata(cls, topics): return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) @classmethod - def on_assignment(cls, assignment, generation): + def on_assignment(cls, assignment): """Callback that runs on each assignment. Updates assignor's state. Arguments: assignment: MemberAssignment - generation: generation id (if present) """ - log.debug("On assignment: assignment={}, generation={}".format(assignment, generation)) + log.debug("On assignment: assignment={}".format(assignment)) cls.member_assignment = assignment.partitions() + + @classmethod + def on_generation_assignment(cls, generation): + """Callback that runs on each assignment. Updates assignor's generation id. + + Arguments: + generation: generation id + """ + log.debug("On generation assignment: generation={}".format(generation)) cls.generation = generation diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 0d9496721..c8b4e8bfa 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -234,7 +234,9 @@ def _on_join_complete(self, generation, member_id, protocol, # give the assignor a chance to update internal state # based on the received assignment - assignor.on_assignment(assignment, generation) + assignor.on_assignment(assignment) + if assignor.name == 'sticky': + assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval diff --git a/test/test_coordinator.py b/test/test_coordinator.py index d09a50819..a35cdd1a0 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -126,10 +126,25 @@ def test_join_complete(mocker, coordinator): mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete( - 0, 'member-foo', 'roundrobin', assignment.encode()) + coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode()) + assert assignor.on_assignment.call_count == 1 + assignor.on_assignment.assert_called_with(assignment) + + +def test_join_complete_with_sticky_assignor(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + assignor = StickyPartitionAssignor() + coordinator.config['assignors'] = (assignor,) + mocker.spy(assignor, 'on_assignment') + mocker.spy(assignor, 'on_generation_assignment') + assert assignor.on_assignment.call_count == 0 + assert assignor.on_generation_assignment.call_count == 0 + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode()) assert assignor.on_assignment.call_count == 1 - assignor.on_assignment.assert_called_with(assignment, 0) + assert assignor.on_generation_assignment.call_count == 1 + assignor.on_assignment.assert_called_with(assignment) + assignor.on_generation_assignment.assert_called_with(0) def test_subscription_listener(mocker, coordinator): From 53cc810a7d89714846d969dc248d7cd9455e27ca Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Tue, 22 Sep 2020 15:01:21 +0200 Subject: [PATCH 7/8] [KIP-54] Fix import order --- kafka/coordinator/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index c8b4e8bfa..971f5e802 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -6,12 +6,12 @@ import logging import time -from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.vendor import six from kafka.coordinator.base import BaseCoordinator, Generation from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol import kafka.errors as Errors from kafka.future import Future From e4c05f8432c87b03b817ac0cf3585d30dd8fedc3 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Wed, 23 Sep 2020 15:59:06 +0200 Subject: [PATCH 8/8] [KIP-54] Remove extra dependency on sortedcontainers --- .../assignors/sticky/sorted_set.py | 63 +++++++++++++++++++ .../assignors/sticky/sticky_assignor.py | 42 ++++++------- requirements-dev.txt | 1 - 3 files changed, 82 insertions(+), 24 deletions(-) create mode 100644 kafka/coordinator/assignors/sticky/sorted_set.py diff --git a/kafka/coordinator/assignors/sticky/sorted_set.py b/kafka/coordinator/assignors/sticky/sorted_set.py new file mode 100644 index 000000000..6a454a42d --- /dev/null +++ b/kafka/coordinator/assignors/sticky/sorted_set.py @@ -0,0 +1,63 @@ +class SortedSet: + def __init__(self, iterable=None, key=None): + self._key = key if key is not None else lambda x: x + self._set = set(iterable) if iterable is not None else set() + + self._cached_last = None + self._cached_first = None + + def first(self): + if self._cached_first is not None: + return self._cached_first + + first = None + for element in self._set: + if first is None or self._key(first) > self._key(element): + first = element + self._cached_first = first + return first + + def last(self): + if self._cached_last is not None: + return self._cached_last + + last = None + for element in self._set: + if last is None or self._key(last) < self._key(element): + last = element + self._cached_last = last + return last + + def pop_last(self): + value = self.last() + self._set.remove(value) + self._cached_last = None + return value + + def add(self, value): + if self._cached_last is not None and self._key(value) > self._key(self._cached_last): + self._cached_last = value + if self._cached_first is not None and self._key(value) < self._key(self._cached_first): + self._cached_first = value + + return self._set.add(value) + + def remove(self, value): + if self._cached_last is not None and self._cached_last == value: + self._cached_last = None + if self._cached_first is not None and self._cached_first == value: + self._cached_first = None + + return self._set.remove(value) + + def __contains__(self, value): + return value in self._set + + def __iter__(self): + return iter(sorted(self._set, key=self._key)) + + def _bool(self): + return len(self._set) != 0 + + __nonzero__ = _bool + __bool__ = _bool diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 11a1cbfe2..782708686 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -2,11 +2,10 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from sortedcontainers import SortedSet, SortedDict, SortedList - from kafka.cluster import ClusterMetadata from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from kafka.coordinator.assignors.sticky.sorted_set import SortedSet from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment from kafka.coordinator.protocol import Schema from kafka.protocol.struct import Struct @@ -82,7 +81,7 @@ def __init__(self, cluster, members): # a mapping of all consumers to all potential topic partitions that can be assigned to them self.consumer_to_all_potential_partitions = {} # an ascending sorted set of consumers based on how many topic partitions are already assigned to them - self.sorted_current_subscriptions = set() + self.sorted_current_subscriptions = SortedSet() # an ascending sorted list of topic partitions based on how many consumers can potentially use them self.sorted_partitions = [] # all partitions that need to be assigned @@ -154,9 +153,10 @@ def balance(self): self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) def get_final_assignment(self, member_id): - assignment = defaultdict(lambda: SortedList()) + assignment = defaultdict(list) for topic_partition in self.current_assignment[member_id]: - assignment[topic_partition.topic].add(topic_partition.partition) + assignment[topic_partition.topic].append(topic_partition.partition) + assignment = {k: sorted(v) for k, v in six.iteritems(assignment)} return six.viewitems(assignment) def _initialize(self, cluster): @@ -188,7 +188,7 @@ def _init_current_assignments(self, members): # higher generations overwrite lower generations in case of a conflict # note that a conflict could exists only if user data is for different generations - # for each partition we create a sorted map of its consumers by generation + # for each partition we create a map of its consumers by generation sorted_partition_consumers_by_generation = {} for consumer, member_metadata in six.iteritems(members): for partitions in member_metadata.partitions: @@ -204,14 +204,13 @@ def _init_current_assignments(self, members): else: consumers[member_metadata.generation] = consumer else: - sorted_consumers = SortedDict() - sorted_consumers[member_metadata.generation] = consumer + sorted_consumers = {member_metadata.generation: consumer} sorted_partition_consumers_by_generation[partitions] = sorted_consumers # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition # current and previous consumers are the last two consumers of each partition in the above sorted map for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation): - generations = list(reversed(consumers.keys())) + generations = sorted(consumers.keys(), reverse=True) self.current_assignment[consumers[generations[0]]].append(partitions) # now update previous assignment if any if len(generations) > 1: @@ -236,13 +235,10 @@ def _are_subscriptions_identical(self): return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions))) def _populate_sorted_partitions(self): - # an ascending sorted set of topic partitions based on how many consumers can potentially use them - sorted_all_partitions = SortedSet( - iterable=[ - (tp, tuple(consumers)) for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers) - ], - key=partitions_comparator_key, - ) + # set of topic partitions with their respective potential consumers + all_partitions = set((tp, tuple(consumers)) + for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers)) + partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key) self.sorted_partitions = [] if not self.is_fresh_assignment and self._are_subscriptions_identical(): @@ -266,7 +262,7 @@ def _populate_sorted_partitions(self): # how many valid partitions are currently assigned to them while sorted_consumers: # take the consumer with the most partitions - consumer, _ = sorted_consumers.pop() + consumer, _ = sorted_consumers.pop_last() # currently assigned partitions to this consumer remaining_partitions = assignments[consumer] # from partitions that had a different consumer before, @@ -284,13 +280,13 @@ def _populate_sorted_partitions(self): self.sorted_partitions.append(remaining_partitions.pop()) sorted_consumers.add((consumer, tuple(assignments[consumer]))) - while sorted_all_partitions: - partition = sorted_all_partitions.pop(0)[0] + while partitions_sorted_by_num_of_potential_consumers: + partition = partitions_sorted_by_num_of_potential_consumers.pop(0)[0] if partition not in self.sorted_partitions: self.sorted_partitions.append(partition) else: - while sorted_all_partitions: - self.sorted_partitions.append(sorted_all_partitions.pop(0)[0]) + while partitions_sorted_by_num_of_potential_consumers: + self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0]) def _populate_partitions_to_reassign(self): self.unassigned_partitions = deepcopy(self.sorted_partitions) @@ -334,10 +330,10 @@ def _initialize_current_subscriptions(self): ) def _get_consumer_with_least_subscriptions(self): - return self.sorted_current_subscriptions[0][0] + return self.sorted_current_subscriptions.first()[0] def _get_consumer_with_most_subscriptions(self): - return self.sorted_current_subscriptions[-1][0] + return self.sorted_current_subscriptions.last()[0] def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) diff --git a/requirements-dev.txt b/requirements-dev.txt index ace1f8c93..d2830905b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,4 +15,3 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 -sortedcontainers==2.1.0