Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned #15737

72 changes: 46 additions & 26 deletions tests/kafkatest/services/verifiable_consumer.py
Expand Up @@ -31,6 +31,12 @@ class ConsumerState:
Joined = 4


def _create_partition_from_dict(d):
topic = d["topic"]
partition = d["partition"]
return TopicPartition(topic, partition)


class ConsumerEventHandler(object):

def __init__(self, node, verify_offsets, idx):
Expand All @@ -45,13 +51,17 @@ def __init__(self, node, verify_offsets, idx):
self.total_consumed = 0
self.verify_offsets = verify_offsets

def handle_shutdown_complete(self):
def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}

def handle_startup_complete(self):
if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)

def handle_startup_complete(self, node, logger):
self.state = ConsumerState.Started
logger.debug("Started %s" % node.account.hostname)

def handle_offsets_committed(self, event, node, logger):
if event["success"]:
Expand All @@ -60,9 +70,7 @@ def handle_offsets_committed(self, event, node, logger):
logger.debug("%s: Offset commit failed for: %s" % (str(node.account), offset_commit))
continue

topic = offset_commit["topic"]
partition = offset_commit["partition"]
tp = TopicPartition(topic, partition)
tp = _create_partition_from_dict(offset_commit)
offset = offset_commit["offset"]
assert tp in self.assignment, \
"Committed offsets for partition %s not assigned (current assignment: %s)" % \
Expand All @@ -73,13 +81,14 @@ def handle_offsets_committed(self, event, node, logger):
(offset, self.position[tp], str(tp))
self.committed[tp] = offset

logger.debug("Offsets committed for %s" % node.account.hostname)

def handle_records_consumed(self, event, logger):
assert self.state == ConsumerState.Joined, \
"Consumed records should only be received when joined (current state: %s)" % str(self.state)

for record_batch in event["partitions"]:
tp = TopicPartition(topic=record_batch["topic"],
partition=record_batch["partition"])
tp = _create_partition_from_dict(record_batch)
min_offset = record_batch["minOffset"]
max_offset = record_batch["maxOffset"]

Expand All @@ -99,19 +108,20 @@ def handle_records_consumed(self, event, logger):
logger.warn(msg)
self.total_consumed += event["count"]

def handle_partitions_revoked(self, event):
def handle_partitions_revoked(self, event, node, logger):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
logger.debug("All partitions revoked from %s" % node.account.hostname)

def handle_partitions_assigned(self, event):
def handle_partitions_assigned(self, event, node, logger):
self.assigned_count += 1
self.state = ConsumerState.Joined
assignment = []
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
assignment.append(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)
assignment.append(tp)
logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname))
self.assignment = assignment

def handle_kill_process(self, clean_shutdown):
Expand Down Expand Up @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
def __init__(self, node, verify_offsets, idx):
super().__init__(node, verify_offsets, idx)

def handle_partitions_revoked(self, event):
def handle_partitions_revoked(self, event, node, logger):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
revoked = []

for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)

def handle_partitions_assigned(self, event):
if tp in self.assignment:
self.assignment.remove(tp)
revoked.append(tp)
else:
logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we understand why this situation is happening? Is it related maybe to the mismatch assignment failure we've seen elsewhere in the tests? My point is just to make sure we're not hiding the real failure with this change. I wouldn't expect that the consumer would ever receive a partition to revoke if it was not previously assigned right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re right @lianetm, this fix could result in a sweeping the problem under the rug, so to speak. I'll change the logic so that this case still results an error, but with more information so we can debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm—I changed the logging to an assert that provides useful information for troubleshooting:

tp = _create_partition_from_dict(topic_partition)
assert tp in self.assignment, \
    "Topic partition %s cannot be revoked from %s as it was not previously assigned to that consumer" % \
    (tp, node.account.hostname)
self.assignment.remove(tp)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! better I believe. Do we have a Jira to investigate the failure leading to this? it's concerning (and even more if the case is that is happening with the new protocol only??)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm—I will file a JIra on this in the next day or two. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed KAFKA-16623, FYI.


logger.debug("Partitions %s revoked from %s" % (revoked, node.account.hostname))

def handle_partitions_assigned(self, event, node, logger):
self.assigned_count += 1
self.state = ConsumerState.Joined
assignment = []
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.append(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)
assignment.append(tp)
logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname))
self.assignment.extend(assignment)


class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
Expand Down Expand Up @@ -270,9 +290,9 @@ def _worker(self, idx, node):
with self.lock:
name = event["name"]
if name == "shutdown_complete":
handler.handle_shutdown_complete()
handler.handle_shutdown_complete(node, self.logger)
elif name == "startup_complete":
handler.handle_startup_complete()
handler.handle_startup_complete(node, self.logger)
elif name == "offsets_committed":
handler.handle_offsets_committed(event, node, self.logger)
self._update_global_committed(event)
Expand All @@ -282,9 +302,9 @@ def _worker(self, idx, node):
elif name == "record_data" and self.on_record_consumed:
self.on_record_consumed(event, node)
elif name == "partitions_revoked":
handler.handle_partitions_revoked(event)
handler.handle_partitions_revoked(event, node, self.logger)
elif name == "partitions_assigned":
handler.handle_partitions_assigned(event)
handler.handle_partitions_assigned(event, node, self.logger)
else:
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))

Expand All @@ -293,7 +313,7 @@ def _isEager(self):

def _update_global_position(self, consumed_event, node):
for consumed_partition in consumed_event["partitions"]:
tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
tp = _create_partition_from_dict(consumed_partition)
if tp in self.global_committed:
# verify that the position never gets behind the current commit.
if self.global_committed[tp] > consumed_partition["minOffset"]:
Expand All @@ -315,7 +335,7 @@ def _update_global_position(self, consumed_event, node):
def _update_global_committed(self, commit_event):
if commit_event["success"]:
for offset_commit in commit_event["offsets"]:
tp = TopicPartition(offset_commit["topic"], offset_commit["partition"])
tp = _create_partition_from_dict(offset_commit)
offset = offset_commit["offset"]
assert self.global_position[tp] >= offset, \
"Committed offset %d for partition %s is ahead of the current position %d" % \
Expand Down