Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned #15737

71 changes: 45 additions & 26 deletions tests/kafkatest/services/verifiable_consumer.py
Expand Up @@ -31,6 +31,12 @@ class ConsumerState:
Joined = 4


def _create_partition_from_dict(d):
topic = d["topic"]
partition = d["partition"]
return TopicPartition(topic, partition)


class ConsumerEventHandler(object):

def __init__(self, node, verify_offsets, idx):
Expand All @@ -45,13 +51,17 @@ def __init__(self, node, verify_offsets, idx):
self.total_consumed = 0
self.verify_offsets = verify_offsets

def handle_shutdown_complete(self):
def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}

def handle_startup_complete(self):
if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)

def handle_startup_complete(self, node, logger):
self.state = ConsumerState.Started
logger.debug("Started %s" % node.account.hostname)

def handle_offsets_committed(self, event, node, logger):
if event["success"]:
Expand All @@ -60,9 +70,7 @@ def handle_offsets_committed(self, event, node, logger):
logger.debug("%s: Offset commit failed for: %s" % (str(node.account), offset_commit))
continue

topic = offset_commit["topic"]
partition = offset_commit["partition"]
tp = TopicPartition(topic, partition)
tp = _create_partition_from_dict(offset_commit)
offset = offset_commit["offset"]
assert tp in self.assignment, \
"Committed offsets for partition %s not assigned (current assignment: %s)" % \
Expand All @@ -73,13 +81,14 @@ def handle_offsets_committed(self, event, node, logger):
(offset, self.position[tp], str(tp))
self.committed[tp] = offset

logger.debug("Offsets committed for %s" % node.account.hostname)

def handle_records_consumed(self, event, logger):
assert self.state == ConsumerState.Joined, \
"Consumed records should only be received when joined (current state: %s)" % str(self.state)

for record_batch in event["partitions"]:
tp = TopicPartition(topic=record_batch["topic"],
partition=record_batch["partition"])
tp = _create_partition_from_dict(record_batch)
min_offset = record_batch["minOffset"]
max_offset = record_batch["maxOffset"]

Expand All @@ -99,19 +108,20 @@ def handle_records_consumed(self, event, logger):
logger.warn(msg)
self.total_consumed += event["count"]

def handle_partitions_revoked(self, event):
def handle_partitions_revoked(self, event, node, logger):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
logger.debug("All partitions revoked from %s" % node.account.hostname)

def handle_partitions_assigned(self, event):
def handle_partitions_assigned(self, event, node, logger):
self.assigned_count += 1
self.state = ConsumerState.Joined
assignment = []
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
assignment.append(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)
assignment.append(tp)
logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname))
self.assignment = assignment

def handle_kill_process(self, clean_shutdown):
Expand Down Expand Up @@ -140,22 +150,31 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
def __init__(self, node, verify_offsets, idx):
super().__init__(node, verify_offsets, idx)

def handle_partitions_revoked(self, event):
def handle_partitions_revoked(self, event, node, logger):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
revoked = []

for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)
assert tp in self.assignment, \
"Topic partition %s cannot be revoked from %s as it was not previously assigned to that consumer" % \
(tp, node.account.hostname)
Comment on lines +161 to +163
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru—this is the main functional change: ensure that an attempt to remove a partition from the local state verifies that it was previously assigned.

self.assignment.remove(tp)
revoked.append(tp)

def handle_partitions_assigned(self, event):
logger.debug("Partitions %s revoked from %s" % (revoked, node.account.hostname))

def handle_partitions_assigned(self, event, node, logger):
self.assigned_count += 1
self.state = ConsumerState.Joined
assignment = []
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.append(TopicPartition(topic, partition))
tp = _create_partition_from_dict(topic_partition)
assignment.append(tp)
logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname))
self.assignment.extend(assignment)


class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
Expand Down Expand Up @@ -270,9 +289,9 @@ def _worker(self, idx, node):
with self.lock:
name = event["name"]
if name == "shutdown_complete":
handler.handle_shutdown_complete()
handler.handle_shutdown_complete(node, self.logger)
elif name == "startup_complete":
handler.handle_startup_complete()
handler.handle_startup_complete(node, self.logger)
elif name == "offsets_committed":
handler.handle_offsets_committed(event, node, self.logger)
self._update_global_committed(event)
Expand All @@ -282,9 +301,9 @@ def _worker(self, idx, node):
elif name == "record_data" and self.on_record_consumed:
self.on_record_consumed(event, node)
elif name == "partitions_revoked":
handler.handle_partitions_revoked(event)
handler.handle_partitions_revoked(event, node, self.logger)
elif name == "partitions_assigned":
handler.handle_partitions_assigned(event)
handler.handle_partitions_assigned(event, node, self.logger)
else:
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))

Expand All @@ -293,7 +312,7 @@ def _isEager(self):

def _update_global_position(self, consumed_event, node):
for consumed_partition in consumed_event["partitions"]:
tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
tp = _create_partition_from_dict(consumed_partition)
if tp in self.global_committed:
# verify that the position never gets behind the current commit.
if self.global_committed[tp] > consumed_partition["minOffset"]:
Expand All @@ -315,7 +334,7 @@ def _update_global_position(self, consumed_event, node):
def _update_global_committed(self, commit_event):
if commit_event["success"]:
for offset_commit in commit_event["offsets"]:
tp = TopicPartition(offset_commit["topic"], offset_commit["partition"])
tp = _create_partition_from_dict(offset_commit)
offset = offset_commit["offset"]
assert self.global_position[tp] >= offset, \
"Committed offset %d for partition %s is ahead of the current position %d" % \
Expand Down