-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14299: Handle TaskCorruptedException during initialization #12771
Conversation
State stores are initialized from the StreamThread even when the state updater thread is enabled. However, we were missing the corresponding handling of exceptions when thrown directly during the initialization. In particular, TaskCorruptedException would directly fall through to runLoop, and the task would fall out of book-keeping, since the exception is thrown when neither the StreamThread nor the StateUpdater is owning the task.
@cadonna Could you have a look please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucasbru Thanks for the PR!
Here my feedback!
Mockito.verify(tasks).addTask(statefulTask0); | ||
Mockito.verify(tasks).addTask(statefulTask1); | ||
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks()); | ||
assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage()); | |
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions) | ||
.inState(State.CREATED) | ||
.withInputPartitions(taskId01Partitions).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also add a stateful task that does not throw and verify whether the task is added to the state updater? That would verify that when at least one task throws an exception the good tasks are still added to the state updater.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
stateUpdater.add(task); | ||
} catch (final RuntimeException e) { | ||
// need to add task back to the bookkeeping to be handled by the stream thread | ||
tasks.addTask(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if it wouldn't be better to add the entire task to the TaskCorruptedException
instead of only the task ID. In such a way, we wouldn't need to add a corrupted task back to the TasksRegistry
. Is there somewhere a check that avoids executing corrupted tasks? Before the state updater we did not execute tasks before all tasks were restored but that is not the case anymore with the state updater.
Modifying the TaskCorruptedException
would need a KIP. Instead of creating a KIP for modifying the TaskCorruptedException
, we should create one to move TaskCorruptedException
and TaskMigratedException
to internal packages since they are completely handled within Streams and they should never be handled by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no check that avoids executing corrupted tasks. Here, I am just reusing Guozhangs trick in handleExceptionsFromStateUpdater
of adding the task back to the task registry, and re-throwing the exception so that runLoop
can retrieve the task back from the task registry. So since we re-throw immediately, we won't run into executing the corrupted task, but I agree, this could be refactored.
Having a KIP and adding the full task to TaskCorruptedException
could work. However, this is also a bit risky, because the task is not fully closed yet, so if you remove it from bookkeeping and only keep a reference in the exception, that can also easily lead to errors - for example if somebody decides to wrap the exception or re-throw it differently anywhere in the call stack, we will lose track of the task, and never close it - leaving traces everywhere.
I would suggest, if we want to clean this up at some point, to use TaskRegistry
as a place for all tasks, whether being in error state, owned by the state updater or owned by the main thread. This should also be reflected by getter methods such as TaskRegistry.allTasks
. In the original code path, allTasks
used to return any task, whether in error state, or currently restoring or actively processing. In the new code, it does not return restoring tasks any more, so some tasks seem to temporarily "disappear". This method is used in a lot of places, and I'm not sure if we have understood all the effects that this change has. For example, since allTasks
is used to update the threadMetadata
, the state updater code-path will return something different from the original code path for the public KafkaStreams.metadataForLocalThreads
API - with restoring tasks temporarily disappearing. They will also disappear from various log messages.
Either way, I would say such a clean-up is not in the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW w.r.t the idea of attaching the entire task instead of just its taskId, I agree with Lucas here -- seems risky, would likely end up being abused as things progress (by us not the users), but most importantly imo is that it's just bad practice. I'm not too well versed on anti patterns I'll admit (perks & downsides of being a physics major 😉), but I would think this counts as one (or should) -- we shouldn't use exceptions to pass around actual objects rather than simple info/metadata. Especially objects of uncertain status, questionable thread safety, and significant resources attached.
Just my two cents as someone who's coming into the restoration thread game very late -- given this fact I'm not particularly qualified to weigh in on what the right thing to do is here, though again I feel like Lucas's suggestion to maintain a/the TaskRegistry
as a central, source-of-truth type thing for tracking and organizing all the tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree with the risk of passing the entire task to an exception that does not have an explicit way how to cleanup itself like the shutdown()
methods in TaskManager
and StateUpdater
. However, I am not a fan of a central place for managing all tasks. We explicitly decided to have the invariant that a task that is managed by the state updater should not exist in the task manager and vice versa. That is to avoid concurrent modifications of the tasks by both threads.
We need to evaluate if in the places where all tasks are returned really all tasks need to be returned and if yes we also need to call getTasks()
on the state updater.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the guarantee of not processing corrupted tasks, I think we should be safe since maybeThrowTaskExceptions()
immediately throws the encountered exceptions. I just dislike the fact that we rely on the exceptions bubbling up all the way up to StreamThread#runLoop()
which might be broken by future changes. Maybe a set for broken tasks in the TasksRegistry
would improve the situation. But that is definitely not in the scope of this PR.
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Outdated
Show resolved
Hide resolved
…nals/TaskManagerTest.java
Build failures are unrelated:
|
…che#12771) State stores are initialized from the StreamThread even when the state updater thread is enabled. However, we were missing the corresponding handling of exceptions when thrown directly during the initialization. In particular, TaskCorruptedException would directly fall through to runLoop, and the task would fall out of book-keeping, since the exception is thrown when neither the StreamThread nor the StateUpdater is owning the task. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
…che#12771) State stores are initialized from the StreamThread even when the state updater thread is enabled. However, we were missing the corresponding handling of exceptions when thrown directly during the initialization. In particular, TaskCorruptedException would directly fall through to runLoop, and the task would fall out of book-keeping, since the exception is thrown when neither the StreamThread nor the StateUpdater is owning the task. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
State stores are initialized from the
StreamThread
even when the state updater thread is enabled.However, we were missing the corresponding handling of exceptions when thrown directly during
the initialization. In particular,
TaskCorruptedException
would directly fall through torunLoop
, andthe task would fall out of bookkeeping, since the exception is thrown when neither the
StreamThread
nor theStateUpdater
is owning the task.This change is validated by a unit test. We will add an integration test once the code path is stable.
Committer Checklist (excluded from commit message)