New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: SynchronizationContext exposed by LoadBalancer.Helper #4971
Conversation
* submitted. | ||
*/ | ||
public final ScheduledContext scheduleNow(Runnable task) { | ||
return schedule(task, 0, TimeUnit.NANOSECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not excited about making an easy way to make a zero-delay task. This just abuses the scheduled executor and is a strong code smell to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can instead abstract it and make schedule() call scheduleNow() when delay <= 0. Is it better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It would be surprising for a task to suddenly run in the current thread when the delay is 0. They are fundamentally different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the same as the current runSerialized()
. I still don't understand the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you're saying is the same. Today runSerialized() runs on the current thread:
https://github.com/grpc/grpc-java/blob/v1.15.0/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L1236-L1238
And any schedule() would run on a separate thread. I'm against having schedule() turn into running on the current thread based on the timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I have decoupled schedule()
with scheduleNow()
.
/** | ||
* Returns the current time in nanos from the same clock that {@link #schedule} uses. | ||
*/ | ||
public abstract long currentTimeNanos(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird API to expose, since it will not agree with either currentTimeMillis nor nanoTime. Based on the documentation it would appear to be similar to nanoTime(), but the actual implementation uses a epoch of 1970 like currentTimeMillis, except if currentTimeMillis and nanoTime get out-of-sync. It seems this should just be nanoTime().
(I don't care really if it has a different offset than nanoTime(), but aligning it to 1970 seems like a bad idea since it can't be guaranteed to align with 1970.)
/** | ||
* Schedules a task to run as soon as poassible. | ||
* | ||
* <p>Non-reentrency is guaranteed. Although task may run inline, but if this method is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Although task may run inline, but if/If/ . That seems more clear.
public abstract ScheduledContext scheduleNow(Runnable task); | ||
|
||
/** | ||
* Schedules a task to be run after a delay. Unlike {@link #scheduleNow}, the task will typically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "typically" is hard to reason about. Could we just say, "Unlike {@link #scheduleNow}, will never be run inline."? (Or maybe even, just "Will never be run inline.")
And make it a semi-concrete class. And it absorbs ChannelExecutor. TODO: unit test on new methods on SynchronizationContext.
private final PriorityBlockingQueue<ScheduledTask> tasks = | ||
new PriorityBlockingQueue<ScheduledTask>(); | ||
// Must keep the ordering of tasks as they are required by ControlPlaneScheduler.scheduleNow(). | ||
private final LinkedBlockingQueue<ScheduledTask> tasks = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply use two queues instead? One for pending (ready to be executed) tasks and one for scheduled (for a future time) tasks? We'd keep the previous PriorityBlockingQueue and then just add a LinkedBlockingQueue for execute(). That more closely matches what would happen in practice and makes the code more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* | ||
* <p>The default implementation logs a warning. | ||
*/ | ||
protected void handleUncaughtThrowable(Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the class could be made final and be passed a Thread.UncaughtExceptionHandler (with a note that the thread will not die after executing the handler, which is different from its documentation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
/** | ||
* Enqueues a task that will be run when {@link #drain} is called. | ||
*/ | ||
public final void executeLater(Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to point out this is useful for adding things from within a lock and then calling drain outside the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ejona86. All comments are addressed.
private final PriorityBlockingQueue<ScheduledTask> tasks = | ||
new PriorityBlockingQueue<ScheduledTask>(); | ||
// Must keep the ordering of tasks as they are required by ControlPlaneScheduler.scheduleNow(). | ||
private final LinkedBlockingQueue<ScheduledTask> tasks = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
/** | ||
* Enqueues a task that will be run when {@link #drain} is called. | ||
*/ | ||
public final void executeLater(Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* | ||
* <p>The default implementation logs a warning. | ||
*/ | ||
protected void handleUncaughtThrowable(Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is | ||
* not terminated when the handler is called. | ||
*/ | ||
public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could provide a zero-arg version that just logs by default. But we can do that at any time. This seems fine for now.
Provides a
SynchronizationContext
for scheduling tasks, with and without delay, from LoadBalancer implementations. This absorbs and extends the internal utilityChannelExecutor
. It supersedesHelper.runSerialized()
, which is now deprecated.Motivation
I see multiple cases that schedule tasks with a delay while requiring the task to run in the "Channel Executor". There have been repeated work to wrap scheduled tasks and handle races between cancellation and task run (see the diff in
GrpclbState.java
for example). The LoadBalancer implementation (e.g., GrpclbLoadBalancer) also has to acquire theScheduledExecutorService
from somewhere and release it upon shutdown.The upcoming HealthCheckLoadBalancer (#4932), which would use back-off policy to retry health-checking streams, would have to do all the things above. At this point I think we need to provide something that combines
runSerialized()
with a scheduled executor with the same synchronization guarantees.Design details
SynchronizationContext
is a similar toScheduledExecutorService
but tailored for use inLoadBalancer
and potentially other cases outside ofLoadBalancer
. It offers task queuing and serialization and delayed scheduling. It guarantees non-reentrancy and happens-before among tasks. It owns no thread, but run tasks on caller's or caller-provided threads.All channel-level state mutations and callback methods on
LoadBalancer
are done in a SynchronizationContext, which was previously referred to as "Channel Executor".SynchronizationContext.schedule()
returns aScheduledHandle
for status checking and cancellation.ScheduedFuture
fromSchedulingExecutorService.schedule()
is too broad for our use cases (e.g., the blockingget()
should never be used).SynchronizationContext.schedule()
requires aScheduledExecutorService
, which is now available throughHelper.getScheduledExecutorService()
. LoadBalancers don't need to worry about where to getSchedulingExecutorService
any more.Alternatives
Alternatively, we could keep
Helper.runSerialized()
and add something likeHelper.runSerialiezdWithDelay()
, but having them on their own interface allows clean fake implementation byFakeClock
for test, and allows other components (potentiallyInternalSubchannel
for reconnection backoff) to use it too.Instead of asking caller of
schedule()
to provide theScheduledExecutorService
, we considered having SynchronizationContext take aScheduledExecutorService
at construction. It would be inconvenient for LoadBalancer implementations that don't useschedule()
, as they would be forced to provide a fakeScheduledExecutorService
(which is cumbersome).Instead of making
SynchronizationContext
a (semi-)concrete class, we considered making it an pure abstract class. However, we found it nontrivial to implementexecute()
correctly with the non-reentrancy guarantee.