-
Notifications
You must be signed in to change notification settings - Fork 25.5k
[ML] Ensure queued AbstractRunnables are notified when executor stops #135966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Refactor notifyQueueRunnables() to allow PriorityProcessWorkerExecutorService to notify the AbstractRunnable contained within queued OrderedRunnables - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests Closes elastic#134651
Pinging @elastic/ml-core (Team:ML) |
Hi @DonalEvans, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, left one suggestion
} | ||
} | ||
|
||
protected abstract void notifyIfAbstractRunnable(T runnable, Exception ex, String msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about having AbstractProcessWorkerExecutorService
contain the logic for notifyIfAbstractRunnable
and relying on an abstract method like getAsAbstractRunnable
which either returns the AbstractRunnable
or null
. That way child classes don't need to call back into the parent to do the notification, they would only return the abstract runnable if it was one.
Something like:
protected abstract AbstractRunnable getAsAbstractRunnable(T runnable);
private void notifyIfAbstractRunnable(T runnable, Exception ex, String msg) {
var abstractRunnable = getAsAbstractRunnable(runnable);
if (abstractRunnable != null) {
notifyAbstractRunnable(ex, msg, abstractRunnable);
}
}
Then PriorityProcessWorkerExecutorService
would have something like:
@Override
protected AbstractRunnable getAsAbstractRunnable(OrderedRunnable orderedRunnable, Exception ex, String msg) {
// The runnable contained within OrderedRunnable is always an AbstractRunnable, so no need to check the type
return orderedRunnable.runnable();
}
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs.
Closes #134651