-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: ManagedChannelImpl2. #2530
Conversation
1. Adapt to LoadBalancer2 interface. Channel holds on to a single DelayedClientTransport2. 2. Lock-free: every channel state mutation, including Subchannel mutations, calling into LoadBalancer, idleness and shutdown, is made from channelExecutor. 3. Idleness grace period is no longer needed.
// 2a terminating <- true | ||
// 2b loadBalancer <- null | ||
// 2c nameResolver <- null | ||
// 2d loadBalancer.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Step 2d should happen before 2b?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes :). Fixed.
|
||
// Must be called from channelExecutor | ||
private void maybeShutdownNowSubchannels() { | ||
Status nowStatus = Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
private final AtomicBoolean shutdown = new AtomicBoolean(false); | ||
// Must be mutated from channelExecutor | ||
private boolean shutdownNowed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: is this correct that shutdownNowed
doesn't need to be volatile since both read and write happen from channelExecutor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Updated the comment to make it clear.
|
||
@Override | ||
public void run() { | ||
if (cancelled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure, but should we also return immediately when shutdown.get()
returns true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily.
If shutdown()
's task is scheduled before this task, the former will set cancelled
to true
, thus the code below will not be run.
Otherwise, this task will still run, which is totally fine.
|
||
@Override | ||
public void transportTerminated() { | ||
subchannelImpl.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(still learning the code..) so we want to shut down the sub-channel when an oob-channel is terminated, not when the transport is shut down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shut down the sub-channel when the delayed transport (which is essentially the buffer) is terminated (which means the buffer has completed drained). If the buffer is not empty, sub-channel needs to remain open to serve the pending RPCs in the buffer.
|
||
@GuardedBy("shutdownLock") | ||
boolean shutdownRequested; | ||
@GuardedBy("shutdownLock") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we update delayedShutdownTask
at line 839 without acquiring shutdownLock
. should we remove this @GuardedBy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should actually be protected, because ManagedChannel.shutdown()
is thread-safe.
I have updated the code below accordingly.
} | ||
log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", | ||
new Object[] {getLogId(), servers, config}); | ||
channelExecutor.executeLater(new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure that's better, but we can use runSerialized
here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we can't, because the helper is not accessible here.
We can, however, use runSerialized
in place of channelExecutor.executeLater()
in LbHelperImpl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't the helper accessible here? It looks like it is to me. Maybe the code changed and now it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, with a small API tweak now it's accessible.
} | ||
|
||
@Override | ||
public ManagedChannel shutdownNow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we set shutdown
to true here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Fixed.
there was an unsed field in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @zhangkun83 ! Looks good to me!
|
||
private final SharedResourceHolder.Resource<ScheduledExecutorService> timerService; | ||
private final Supplier<Stopwatch> stopwatchSupplier; | ||
/** The timout before entering idle mode. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: s/timout/timeout/
} | ||
// Cancel the timer now, so that a racing due timer will not put Channel on idleness | ||
// when the caller of exitIdleMode() is about to use the returned loadBalancer. | ||
cancelIdleTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: not sure that's better, but we can also do
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer();
} else {
rescheduleIdleTimer();
}
since rescheduleIdleTimer
calls cancelIdleTimer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied suggested change.
// Should not be possible. | ||
throw new IllegalArgumentException(e); | ||
} | ||
if (targetUri != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: can remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and removed.
"invalid idleTimeoutMillis %s", idleTimeoutMillis); | ||
this.idleTimeoutMillis = idleTimeoutMillis; | ||
} | ||
this.decompressorRegistry = decompressorRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add checkNotNull
to decompressorRegistry
and compressorRegistry
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have also added checkNotNull
for several other fields.
nameResolver = null; | ||
} | ||
|
||
// Until LoadBalancer is shutdown, it may still create new subchannels. We catch them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do we need to set shutdownNowed
to true here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, since we only want to set shutdownNowed if the client requested an immediate shutdown. If shutdownNow wasn't called, we want to wait until the RPCs naturally complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not through it yet, but needing to make an appointment. The maybeShutdownNowSubchannels() comment is the main thing that needs to be changed.
} | ||
|
||
@Override | ||
public void transportInUse(boolean inUse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignoring this method points out that the OobChannel won't have some possibly-useful features. And the addition of the Executor
shows that the configuration is rather limited. As time goes on, we may need to tweak this part. (But it's fine right now.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.
channelExecutor.executeLater(new Runnable() { | ||
@Override | ||
public void run() { | ||
maybeTerminateChannel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this be handled by delayedTransport's termination listener? If it is needed here, why isn't it needed in shutdownNow()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It's not needed here. Deleted.
for (InternalSubchannel subchannel : subchannels) { | ||
subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); | ||
} | ||
subchannels.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these and the oobChannels really be removed? Don't we need to wait until the transports process the shutdownNow before claiming to be terminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Removed the two clear()
lines and added tests to cover this case.
nameResolver = null; | ||
} | ||
|
||
// Until LoadBalancer is shutdown, it may still create new subchannels. We catch them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, since we only want to set shutdownNowed if the client requested an immediate shutdown. If shutdownNow wasn't called, we want to wait until the RPCs naturally complete.
shutdownRequested = true; | ||
} | ||
ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor; | ||
// Add a delay to shutdown to deal with the race between 1) a transport being picked and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We maybe should start thinking about making newStream (or maybe start) atomic with transport selection. If clientTransportProvider
created the stream as well as chose the transport, then it could deal with this race by retrying. Transports would need to help out by returning differently if they are shutdown, but we do things like that frequently already with only volatile reads in the fast path.
I don't think we need to do that now (in this PR), but this race will now be happening frequently enough that it's probably time to address it directly. And we can reuse some of the same tricks we're using for the LB rework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have filed #2562 to track this.
ObjectPool is simpler to use and more test-friendly than the raw SharedResourceHolder. Also assign ManagedChannel's executor to directExecutor after returning the real executor to pool.
terminatedLatch.countDown(); | ||
executorPool.returnObject(executor); | ||
// Needed for delivering rejections to new calls after OobChannel is terminated. | ||
executor = MoreExecutors.directExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm uncomfortable with calling executor
after termination, as generally it is a bug. It also seems insufficient, as there is still the race between executor
being read during newCall
and ClientCallImpl.start()
. If this is temporary until we fix the real bug, then I'm fine with it. Ditto in OobChannel.
The comment here also seems strange. Is the reference to OobChannel
because it was a copy/paste that wasn't fully updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If newCall()
is called after the channel is terminated, the call should fail, but how should we deliver the failure? Currently we deliver the failure to the call listener, which requires the app executor. As we shouldn't use the original app executor after it's returned to the pool, the direct executor seems to be the best bet. The other option is to let newCall()
throw.
Yes I should've done s/OobChannel/Channel
after pasting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
This is pre-existing. I would feel very comfortable leaving it as equally broken as the current code (
ManagedChannelImpl
) and fixing it later. There are so many things happening already in this PR. I'm okay with a temporary hack in order to let tests work or similar. -
the app's executor shouldn't be called at all after termination. That applies equally to the pool, but the pool is under our control and so we have more freedom, whereas the app's executor interaction is part of externally-visible behavior. As soon as the Channel becomes terminated the application is free to shutdown their executor. Calling the listener in the same thread seems appropriate, but using
directExecutor
here is missing the larger problem. TheappExecutor
is saved inClientCall
which is created duringnewCall()
, but betweennewCall()
andClientCall.start()
the Channel could become terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I never thought of the race between termination and ClientCall.start()
. I have reverted to the preexisting state and captured this case in #1981
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally finished the full pass (although I didn't really look at the tests more than a glance).
@Override | ||
public void run() { | ||
if (terminating) { | ||
internalSubchannel.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This runSerialized()
block looks subtle. The runSerialized()
does not guarantee that the Runnable
has executed before it returns. So we could return a subchannel here that is non-yet-shutdown even when terminating. But that subchannel won't (can't?) be used because the delayed transport is already terminated and so has no RPCs and is not accepting any more RPCs (or passing-through any more RPCs).
It seems a comment would go a long way, if nothing else to prevent future bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment:
Because runSerialized() doesn't guarantee the runnable has been executed upon when returning, the subchannel may still be returned to the balancer without being shutdown even if "terminating" is already true. The subchannel will not be used in this case, because delayed transport has terminated when "terminating" becomes true, and no more requests will be sent to balancer beyond this point.
} | ||
log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", | ||
new Object[] {getLogId(), servers, config}); | ||
channelExecutor.executeLater(new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't the helper accessible here? It looks like it is to me. Maybe the code changed and now it is.
Because it's handled by delayedTransport termination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed all comments. PTAL.
terminatedLatch.countDown(); | ||
executorPool.returnObject(executor); | ||
// Needed for delivering rejections to new calls after OobChannel is terminated. | ||
executor = MoreExecutors.directExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I never thought of the race between termination and ClientCall.start()
. I have reverted to the preexisting state and captured this case in #1981
channelExecutor.executeLater(new Runnable() { | ||
@Override | ||
public void run() { | ||
maybeTerminateChannel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It's not needed here. Deleted.
} | ||
log.log(Level.FINE, "[{0}] resolved address: {1}, config={2}", | ||
new Object[] {getLogId(), servers, config}); | ||
channelExecutor.executeLater(new Runnable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, with a small API tweak now it's accessible.
} | ||
|
||
@Override | ||
public void transportInUse(boolean inUse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.
@Override | ||
public void run() { | ||
if (terminating) { | ||
internalSubchannel.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment:
Because runSerialized() doesn't guarantee the runnable has been executed upon when returning, the subchannel may still be returned to the balancer without being shutdown even if "terminating" is already true. The subchannel will not be used in this case, because delayed transport has terminated when "terminating" becomes true, and no more requests will be sent to balancer beyond this point.
for (InternalSubchannel subchannel : subchannels) { | ||
subchannel.shutdownNow(SHUTDOWN_NOW_STATUS); | ||
} | ||
subchannels.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Removed the two clear()
lines and added tests to cover this case.
} | ||
// Cancel the timer now, so that a racing due timer will not put Channel on idleness | ||
// when the caller of exitIdleMode() is about to use the returned loadBalancer. | ||
cancelIdleTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied suggested change.
"invalid idleTimeoutMillis %s", idleTimeoutMillis); | ||
this.idleTimeoutMillis = idleTimeoutMillis; | ||
} | ||
this.decompressorRegistry = decompressorRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have also added checkNotNull
for several other fields.
// Should not be possible. | ||
throw new IllegalArgumentException(e); | ||
} | ||
if (targetUri != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and removed.
shutdownRequested = true; | ||
} | ||
ScheduledExecutorService scheduledExecutorCopy = scheduledExecutor; | ||
// Add a delay to shutdown to deal with the race between 1) a transport being picked and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have filed #2562 to track this.
Adapt to LoadBalancer2 interface. Channel holds on to a single
DelayedClientTransport2.
Lock-free: every channel state mutation, including Subchannel
mutations, calling into LoadBalancer, idleness and shutdown, is made
from channelExecutor.
Idleness grace period is no longer needed.
As usual, the first commit is a pure fork of ManagedChannelImpl and its tests, although the change may be so drastic so that reviewing the whole PR may be easier.
/cc @lukaszx0 @kkaneda