Skip to content

Commit

Permalink
KAFKA-16566: Fix consumer static membership system test with new prot…
Browse files Browse the repository at this point in the history
…ocol (#15738)

Updating consumer system test that was failing with the new protocol, related to static membership behaviour. The behaviour regarding static consumers that join with conflicting group instance id is slightly different between the classic and new consumer protocol, so the expectations in the tests needed to be updated.

If static members join with same instance id:

Classic protocol: all members join the group with the same group instance id, and then the first one will eventually fail (receives a HB error with FencedInstanceIdException)

Consumer protocol: new member with an instance Id already in use is not able to join, and first member remains active (new member with same instance Id receives an UnreleasedInstanceIdException in the response to the HB to join the group)

This PR is keeping the single parametrized test that existed before, given that what's being tested and part of the test itself apply to all protocols. This is just updating the expectations that are different, based on the protocol parameter.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
  • Loading branch information
lianetm committed Apr 19, 2024
1 parent 8f2fca7 commit 9ea0503
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,26 +348,45 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
consumer.start()
self.await_members(consumer, len(consumer.nodes))

num_rebalances = consumer.num_rebalances()
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
if group_protocol == consumer_group.classic_group_protocol:
# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail.
self.await_members(conflict_consumer, num_conflict_consumers)
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)

wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers,
wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers,
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to stop")
else:
# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join.
self.await_consumed_messages(consumer)
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0

# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
err_msg="Timed out waiting for the consumer to shutdown")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)


else:
consumer.start()
conflict_consumer.start()

wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec,
err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from"
timeout_sec=self.session_timeout_sec*2,
err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from "
"normal consumer group and %d from conflict consumer group" % \
(len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
)
wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=self.session_timeout_sec,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in"
timeout_sec=self.session_timeout_sec*2,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in "
"normal consumer group and %d dead in conflict consumer group" % \
(len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
)
Expand Down

0 comments on commit 9ea0503

Please sign in to comment.