-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Change to RUNNING if no pending task to init exist #14214
KAFKA-10199: Change to RUNNING if no pending task to init exist #14214
Conversation
@@ -53,9 +53,11 @@ public interface TasksRegistry { | |||
|
|||
void addPendingTaskToCloseClean(final TaskId taskId); | |||
|
|||
Set<Task> drainPendingTaskToInit(); | |||
Set<Task> drainPendingTasksToInit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just renaming.
|
||
void addPendingTaskToInit(final Collection<Task> tasks); | ||
void addPendingTasksToInit(final Collection<Task> tasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just renaming.
void addPendingTaskToInit(final Collection<Task> tasks); | ||
void addPendingTasksToInit(final Collection<Task> tasks); | ||
|
||
boolean hasPendingTasksToInit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual addition
return !stateUpdater.restoresActiveTasks() | ||
&& !tasks.hasPendingTasksToRecycle() | ||
&& !tasks.hasPendingTasksToInit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual fix.
// The state directory may still be locked by another thread, when the rebalance just happened. | ||
// Retry in the next iteration. | ||
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException); | ||
tasks.addPendingTaskToInit(Collections.singleton(task)); | ||
tasks.addPendingTasksToInit(Collections.singleton(task)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the code that required the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, lgtm!
What I didn't see, what was the symptom of the bug? I thought the state was largely to inform the user
I am not sure that this is the cause of the issue that I see. But in any case, it is not correct to change the state to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
A stream thread should only change to RUNNING if there are no active tasks in restoration in the state updater and if there are no pending tasks to recycle and to init. Usually all pending tasks to init are added to the state updater in the same poll iteration that handles the assignment. However, if during an initialization of a task a LockException the task is re-added to the tasks to init and initialization is retried in the next poll iteration. A LockException might occur when a state directory is still locked by another thread, when the rebalance just happened.
acb2d46
to
a5699d0
Compare
A stream thread should only change to RUNNING if there are no active tasks in restoration in the state updater and if there are no pending tasks to recycle and to init.
Usually all pending tasks to init are added to the state updater in the same poll iteration that handles the assignment. However, if during an initialization of a task a LockException the task is re-added to the tasks to init and initialization is retried in the next poll iteration.
A LockException might occur when a state directory is still locked by another thread, when the rebalance just happened.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)