Skip to content

Conversation

@mjsax
Copy link
Member

@mjsax mjsax commented Dec 17, 2025

This bug was introduced via KAFKA-18015.

Reviewers: Lucas Brutschy lbrutschy@confluent.io

…ater

This bug was introduced via KAFKA-18015.
@mjsax mjsax added the streams label Dec 17, 2025
@github-actions github-actions bot added the small Small PRs label Dec 17, 2025
@lucasbru lucasbru requested a review from Copilot December 17, 2025 16:01
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a bug where active tasks undergoing reset-by-duration were incorrectly being handed back to the state updater. The state updater is responsible for restoring task state, not managing active tasks during offset reset operations. The incorrect behavior was introduced in KAFKA-18015.

Key Changes:

  • Removed erroneous stateUpdater.add(task) call from maybeInitTaskTimeoutsOrThrow method

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@Pankraz76 Pankraz76 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 1850 to 1851
final Task task = getActiveTask(partition);
task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Task task = getActiveTask(partition);
task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);
getActiveTask(partition).maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);

coupling now seems obsolete, considering cohesion.

) {
for (final TopicPartition partition : partitions) {
final Task task = getActiveTask(partition);
task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty for boiling down.

Now we have reached final version, can not take away anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants