Skip to content

Commit

Permalink
Merge 89096e0 into 83b7b27
Browse files Browse the repository at this point in the history
  • Loading branch information
aynroot committed Nov 4, 2020
2 parents 83b7b27 + 89096e0 commit af239f8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 45 deletions.
10 changes: 7 additions & 3 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,19 @@ def parse_member_metadata(cls, metadata):

@classmethod
def metadata(cls, topics):
if cls.member_assignment is None:
return cls._metadata(topics, cls.member_assignment, cls.generation)

@classmethod
def _metadata(cls, topics, member_assignment_partitions, generation=-1):
if member_assignment_partitions is None:
log.debug("No member assignment available")
user_data = b''
else:
log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
partitions_by_topic = defaultdict(list)
for topic_partition in cls.member_assignment: # pylint: disable=not-an-iterable
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), cls.generation)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)

Expand Down
75 changes: 33 additions & 42 deletions test/test_assignors.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_sticky_assignor1(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_sticky_assignor2(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand All @@ -167,7 +167,7 @@ def test_sticky_assignor2(mocker):
del subscriptions['C0']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -326,7 +326,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(
member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)

Expand All @@ -338,7 +338,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -367,7 +367,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand All @@ -382,7 +382,7 @@ def test_sticky_add_remove_topic_two_consumers(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
expected_assignment = {
Expand Down Expand Up @@ -413,7 +413,7 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker):
del subscriptions['C10']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -435,7 +435,7 @@ def test_sticky_reassignment_after_one_consumer_added(mocker):
subscriptions['C10'] = {'t'}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(
member_metadata[member] = StickyPartitionAssignor._metadata(
topics, assignment[member].partitions() if member in assignment else []
)
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -462,7 +462,7 @@ def test_sticky_same_subscriptions(mocker):
del subscriptions['C5']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
Expand All @@ -488,7 +488,7 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

for i in range(50):
member = 'C{}'.format(randint(1, n_consumers))
Expand Down Expand Up @@ -517,7 +517,7 @@ def test_new_subscription(mocker):
subscriptions['C0'].add('t1')
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -540,7 +540,7 @@ def test_move_existing_assignments(mocker):

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, member_assignments[member])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -570,7 +570,7 @@ def test_stickiness(mocker):
del subscriptions['C1']
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -625,7 +625,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
}
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions())

cluster = create_cluster(mocker, topics={}, topics_partitions={})
sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -645,7 +645,7 @@ def test_conflicting_previous_assignments(mocker):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
# assume both C1 and C2 have partition 1 assigned to them in generation 1
member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand Down Expand Up @@ -676,7 +676,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu

member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, assignment[member].partitions())
member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions())

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
Expand All @@ -687,9 +687,9 @@ def test_assignment_with_multiple_generations1(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

member_metadata = {
'C1': build_metadata({'t'}, []),
'C2': build_metadata({'t'}, []),
'C3': build_metadata({'t'}, []),
'C1': StickyPartitionAssignor._metadata({'t'}, []),
'C2': StickyPartitionAssignor._metadata({'t'}, []),
'C3': StickyPartitionAssignor._metadata({'t'}, []),
}

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -699,8 +699,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2

member_metadata = {
'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()),
}

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -712,8 +712,8 @@ def test_assignment_with_multiple_generations1(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}

assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -727,9 +727,9 @@ def test_assignment_with_multiple_generations2(mocker):
cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

member_metadata = {
'C1': build_metadata({'t'}, []),
'C2': build_metadata({'t'}, []),
'C3': build_metadata({'t'}, []),
'C1': StickyPartitionAssignor._metadata({'t'}, []),
'C2': StickyPartitionAssignor._metadata({'t'}, []),
'C3': StickyPartitionAssignor._metadata({'t'}, []),
}

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -739,7 +739,7 @@ def test_assignment_with_multiple_generations2(mocker):
assert len(assignment1['C3'].assignment[0][1]) == 2

member_metadata = {
'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
}

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -749,9 +749,9 @@ def test_assignment_with_multiple_generations2(mocker):
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1),
'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2),
'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1),
}

assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
Expand All @@ -778,7 +778,7 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
}
member_metadata = {}
for member in six.iterkeys(member_assignments):
member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])
member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member])

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
Expand All @@ -788,19 +788,10 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb
def make_member_metadata(subscriptions):
member_metadata = {}
for member, topics in six.iteritems(subscriptions):
member_metadata[member] = build_metadata(topics, [])
member_metadata[member] = StickyPartitionAssignor._metadata(topics, [])
return member_metadata


def build_metadata(topics, member_assignment_partitions, generation=-1):
partitions_by_topic = defaultdict(list)
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)


def assert_assignment(result_assignment, expected_assignment):
assert result_assignment == expected_assignment
assert set(result_assignment) == set(expected_assignment)
Expand Down

0 comments on commit af239f8

Please sign in to comment.