Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class ConsumerEventHandler(object):

def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
revoked_count=0, assigned_count=0, assignment=None,
position=None, committed=None, total_consumed=0):
position=None, committed=None, total_consumed=0,
shutdown_complete=False):
self.node = node
self.verify_offsets = verify_offsets
self.idx = idx
Expand All @@ -53,11 +54,13 @@ def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
self.position = position if position is not None else {}
self.committed = committed if committed is not None else {}
self.total_consumed = total_consumed
self.shutdown_complete = shutdown_complete
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerEventHandler instantiates every consumer in ConsumerState.Dead. During the test, we sometimes read that state before the background thread emits startup_complete, so
the test concluded “this node started and then shut down” even though it never left the default state.

We now keep an explicit shutdown_complete flag so the fencing test only proceeds once each conflicting consumer has actually gone through the start→shutdown sequence.

stateDiagram-v2
    [*] --> Dead : Init (default)
    Dead --> Started : startup_complete
    Started --> Dead : shutdown_complete(flag = true)
Loading


def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}
self.shutdown_complete = True

if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)
Expand Down Expand Up @@ -277,7 +280,8 @@ def create_handler_helper(handler_class, node, idx, existing_handler=None):
assignment=existing_handler.assignment,
position=existing_handler.position,
committed=existing_handler.committed,
total_consumed=existing_handler.total_consumed)
total_consumed=existing_handler.total_consumed,
shutdown_complete=existing_handler.shutdown_complete)
else:
return handler_class(node, self.verify_offsets, idx)
existing_handler = self.event_handlers[node] if node in self.event_handlers else None
Expand All @@ -292,6 +296,7 @@ def _worker(self, idx, node):
with self.lock:
self.event_handlers[node] = self.create_event_handler(idx, node)
handler = self.event_handlers[node]
handler.shutdown_complete = False

node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)

Expand Down Expand Up @@ -526,5 +531,10 @@ def alive_nodes(self):
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]

def shutdown_complete_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.values()
if handler.shutdown_complete]

def is_consumer_group_protocol_enabled(self):
return self.group_protocol and self.group_protocol.lower() == consumer_group.consumer_group_protocol
15 changes: 14 additions & 1 deletion tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ducktape.mark.resource import cluster

from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group

import signal
Expand Down Expand Up @@ -74,6 +75,14 @@ def setup_consumer(self, topic, **kwargs):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer

def await_conflict_consumers_fenced(self, conflict_consumer):
# Rely on explicit shutdown_complete events from the verifiable consumer to guarantee each conflict member
# reached the fenced path rather than remaining in the default DEAD state prior to startup.
wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and
len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=60,
err_msg="Timed out waiting for conflict consumers to report shutdown completion after fencing")

@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
Expand Down Expand Up @@ -326,7 +335,11 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0


# Conflict consumers will terminate due to a fatal UnreleasedInstanceIdException error.
# Wait for termination to complete to prevent conflict consumers from immediately re-joining the group while existing nodes are shutting down.
self.await_conflict_consumers_fenced(conflict_consumer)

# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
Expand Down