Skip to content

KAFKA-16876: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit#22282

Open
gabriellefu wants to merge 4 commits into
apache:trunkfrom
gabriellefu:taskmanager
Open

KAFKA-16876: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit#22282
gabriellefu wants to merge 4 commits into
apache:trunkfrom
gabriellefu:taskmanager

Conversation

@gabriellefu
Copy link
Copy Markdown
Contributor

@gabriellefu gabriellefu commented May 13, 2026

  1. Fix TaskManager.handleRevocation to always suspend revoked tasks,
    even when prepareCommit throws (e.g. TaskMigratedException from
    producer.send during cache flush). Previously the exception propagated
    uncaught, skipping the suspend loop entirely. This left tasks in RUNNING
    state, which caused a downstream IllegalStateException when
    handleAssignment tried to close them.
  2. Wrap prepare/commit/postCommit in try-finally so the suspend loop and
    task unlock are guaranteed to execute regardless of where an exception
    occurs.
  3. Preserve all exceptions via addSuppressed instead of silently
    dropping later exceptions. The first exception remains the primary
    thrown exception for backward compatibility, but subsequent exceptions
    (e.g. the IllegalStateException from closing an unsuspended task) are
    now attached as suppressed exceptions instead of lost.

@github-actions github-actions Bot added triage PRs from the community streams labels May 13, 2026
@gabriellefu gabriellefu changed the title [KAFKA-16876] TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit KAFKA-16876: Handle TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit May 14, 2026
@gabriellefu gabriellefu changed the title KAFKA-16876: Handle TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit KAFKA-16876: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit May 14, 2026
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
log.error("Exception caught while preparing to commit revoked tasks " + revokedActiveTasks, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace with parametrised message.

dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// the task is in RUNNING tate
task.postCommit(false);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

try {
task.suspend();
} catch (final RuntimeException e) {
log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@github-actions github-actions Bot removed the triage PRs from the community label May 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants