-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resume streaming tasks on Overlord switch #13223
Resume streaming tasks on Overlord switch #13223
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good. Left some comments. Please also add tests when the change is finalized.
// can lead to tasks being in a state where they are active but do not read. | ||
// If this is the first run, resume all existing active tasks to be safe | ||
if (getState().isFirstRunOnly()) { | ||
Map<String, ListenableFuture<Boolean>> activeTaskToResumeFutureMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if the term "active" task is used widely in the codebase. It seems ambiguous, especially considering that the next line mentions activelyReadingTasks
.
I would suggest simply using tasksToResume
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was supposed to mean a map from the set of activeTasks to their resumeFutures.
tasksToResume does seem simpler, I'll make the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
String taskId = entry.getKey(); | ||
ListenableFuture<Boolean> future = entry.getValue(); | ||
future.addListener( | ||
new Runnable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: prefer lambdas as they are easier to read.
log.info("Resumed task [%s]", taskId); | ||
} else { | ||
log.warn("Failed to resume task [%s]", taskId); | ||
killTask(taskId, "Could not resume task"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the kill message more informative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public void run() | ||
{ | ||
try { | ||
if (entry.getValue().get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do publishing tasks and other non-paused tasks return true
for a resume call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Also, PUBLISHING tasks are not resumed. Only READING / PAUSED tasks
@@ -1950,6 +1950,45 @@ public Boolean apply(SeekableStreamIndexTaskRunner.Status status) | |||
// make sure the checkpoints are consistent with each other and with the metadata store | |||
|
|||
verifyAndMergeCheckpoints(taskGroupsToVerify.values()); | |||
|
|||
// A pause from the previous Overlord's supervisor, immediately before leader change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put the changes in a separate method as this method is already very cumbersome to read.
Style: return early rather than indenting the whole code block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@kfaraz thank you for the review! I've added a unit test as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestion, otherwise looks good.
Thanks for improving the supervisor behaviour, @AmatyaAvadhanula !
} | ||
|
||
/** | ||
* If this is the first run, resume all tasks in the set of activelyReadingTaskGroups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments!
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
Build seems to be failing due to low code coverage. |
The test for KafkaSupervisor is in a different package than the one for SeekableStreamSupervisor |
…blestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
…blestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
…ksOnOverlordSwitch
…ksOnOverlordSwitch
|
||
// A pause from the previous Overlord's supervisor, immediately before leader change, | ||
// can lead to tasks being in a state where they are active but do not read. | ||
resumeAllActivelyReadingTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how frequently is discoverTasks
going to be called? This wouldn't just be called after leader re-election, right? There is also code to get the status of all tasks and then doing an action based on what the current status is. can the resuming go inside that block of code itself? So if the status returned is paused, only then we will be calling resumeAsync
instead of doing it for each task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While discoverTasks is called in every RunNotice, there is a check within this method to resume only during the first run or return otherwise.
It is called here as the pendingCompletionTaskGroups are determined here using the status i.e whether it is publishing or not. These tasks are not affected, and only the activelyReadingTaskGroups are resumed
} else { | ||
log.warn("Failed to resume task [%s] in first supervisor run.", taskId); | ||
killTask(taskId, | ||
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first supervisor run after Overlord change
is this really true?
…ksOnOverlordSwitch
Fixes issues where tasks stop ingesting but continue running when overlord leader changes occur.
Description
When an overlord switch occurs right after it has issued a pause to a streaming ingestion task, the task remains in a paused state even after the overlord re-election(s).
The task however continues to run until its duration elapses. This leads to a continuous increase in lag for partitions corresponding to the task(s) until its completion, when a new task can begin ingesting.
In the first supervisor run after a restart, resume all active tasks at the end of
discoverTasks
.A resume doesn't affect an actively reading task adversely and publishing tasks are not resumed.
Release note
For tips about how to write a good release note, see Release notes.
Key changed/added classes in this PR
SeekableStreamSupervisor
This PR has: