Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16566: Fix consumer static membership system test with new protocol #15738

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 26 additions & 7 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,26 +348,45 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
consumer.start()
self.await_members(consumer, len(consumer.nodes))

num_rebalances = consumer.num_rebalances()
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
if group_protocol == consumer_group.classic_group_protocol:
# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail.
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)

wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers,
wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers,
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to stop")
else:
# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join.
self.await_consumed_messages(consumer)
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0
Comment on lines +364 to +366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we anticipate any timing issues here? That is, will num_rebalances() and joined_nodes() be "guaranteed" to return the correct values immediately after the call to await_consumed_messages() is finished? Or do we want to wrap those assertions as wait_until()s to give them a few seconds to coalesce to the correct value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no timing issues as I see it. For the consumer.joined_nodes there is a previous self.await_members, that ensures that we wait for the time needed for all the nodes to join. As for the conflict_consumer.joined_nodes(), its for nodes that never joined, we're just asserting that after the non-conflicting remained without rebalance, consuming (ensuring activity), the conflicting ones did not join. Makes sense?


# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
err_msg="Timed out waiting for the consumer to shutdown")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)


else:
consumer.start()
conflict_consumer.start()

wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec,
err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from"
timeout_sec=self.session_timeout_sec*2,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to help a bit with the flaky behaviour, making it also consistent to how we wait for members in all other tests that rely on the await_members.

err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from "
"normal consumer group and %d from conflict consumer group" % \
(len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
)
wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=self.session_timeout_sec,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in"
timeout_sec=self.session_timeout_sec*2,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in "
"normal consumer group and %d dead in conflict consumer group" % \
(len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
)
Expand Down