-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10199: Fix restoration behavior for paused tasks #14437
Conversation
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws Exception { | |||
verify(changelogReader, times(2)).enforceRestoreActive(); | |||
} | |||
|
|||
@Test | |||
public void shouldAwaitWhenAllTasksPaused() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests the absence of "restore" calls when all tasks are paused. This requires "smelly" constructs like Mockito.reset
and Thread.sleep
. We could alternatively not test this - the absence of something in a certain timeframe is not very amenable to unit testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @lucasbru !
Here my feedback!
|
||
verifyPausedTasks(task); | ||
|
||
reset(changelogReader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, you do not need to reset the mock since we guarantee that paused tasks that are added to the state updater are not restored. We miss a test for that case, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is quite true. Paused tasks will not be added to updatingTasks
, but we may still go around the loop calling restore
on a potentially empty set. This is testing that after a while, we do not go around the loop anymore. But it's actually not a good test, for the reason described above.
Thread.sleep(100); | ||
verify(changelogReader, never()).restore(any()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this, we could expose the thread state on the implementation of the state updater, but not on the interface. With that and a wait until verification, we could verify that the thread changes to the waiting state. We similar with getActiveTasks()
or getPausedTasks()
. That does also smell a bit, but it avoids the sleep and maybe smells a little bit less? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea!
@@ -307,7 +307,7 @@ private void waitIfAllChangelogsCompletelyRead() { | |||
tasksAndActionsLock.lock(); | |||
try { | |||
while (isRunning.get() && | |||
changelogReader.allChangelogsCompleted() && | |||
(changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty()) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
- If all tasks are paused, we should wait in the state updater - If all tasks are paused, we should return immediately from `StoreChangelogReader`. For testing, expose an `isIdle` state to check that we enter an idling state eventually.
504e123
to
5b828ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thanks @lucasbru !
Build failures are unrelated. |
* KAFKA-10199: Fix restoration behavior for paused tasks (apache#14437) State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty. Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader. Reviewer: Bruno Cadonna <cadonna@apache.org> * KAFKA-10199: Do not process when in PARTITIONS_REVOKED (apache#14265) When a Streams application is subscribed with a pattern to input topics and an input topic is deleted, the stream thread transists to PARTITIONS_REVOKED and a rebalance is triggered. This happens inside the poll call. Sometimes, the poll call returns before a new assignment is received. That means, Streams executes the poll loop in state PARTITIONS_REVOKED. With the state updater enabled processing is also executed in states other than RUNNING and so processing is also executed when the stream thread is in state PARTITION_REVOKED. However, that triggers an IllegalStateException with error message: No current assignment for partition TEST-TOPIC-A-0 which is a fatal error. This commit prevents processing when the stream thread is in state PARTITIONS_REVOKED. Reviewer: Lucas Brutschy <lbrutschy@confluent.io> * KAFKA-15326: [8/N] Move consumer interaction out of processing methods (apache#14226) The process method inside the tasks needs to be called from within the processing threads. However, it currently interacts with the consumer in two ways: * It resumes processing when the PartitionGroup buffers are empty * It fetches the lag from the consumer We introduce updateLags() and resumePollingForPartitionsWithAvailableSpace() methods that call into the task from the polling thread, in order to set up the consumer correctly for the next poll, and extract metadata from the consumer after the poll. Reviewer: Bruno Cadonna <bruno@confluent.io> --------- Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io> Co-authored-by: Bruno Cadonna <cadonna@apache.org>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty. Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader. Reviewer: Bruno Cadonna <cadonna@apache.org>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty. Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader. Reviewer: Bruno Cadonna <cadonna@apache.org>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty. Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader. Reviewer: Bruno Cadonna <cadonna@apache.org>
State updater can get into a busy loop when all tasks are paused, because
changelogReader
will never return that all changelogs have been read completely. Fix this, by awaiting ifupdatingTasks
is empty.Related and included: if we are restoring and all tasks are paused, we should return immediately from
StoreChangelogReader
.Committer Checklist (excluded from commit message)