Skip to content

Commit

Permalink
core: fix the race between channel shutdown and clientCallImpl start
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Aug 14, 2017
1 parent 34f31fe commit d839724
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 48 deletions.
35 changes: 21 additions & 14 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
import java.io.InputStream;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -65,7 +64,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final Executor callExecutor;
private final Context context;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private volatile Future<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private final CallOptions callOptions;
private ClientStream stream;
Expand All @@ -74,14 +73,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider;
private final CancellationListener cancellationListener = new ContextCancellationListener();
private ScheduledExecutorService deadlineCancellationExecutor;
private final DeadlineHandler deadlineHandler;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();

ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
ClientTransportProvider clientTransportProvider,
ScheduledExecutorService deadlineCancellationExecutor) {
DeadlineHandler deadlineHandler) {
this.method = method;
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
Expand All @@ -95,7 +94,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|| method.getType() == MethodType.SERVER_STREAMING;
this.callOptions = callOptions;
this.clientTransportProvider = clientTransportProvider;
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.deadlineHandler = deadlineHandler;
}

private final class ContextCancellationListener implements CancellationListener {
Expand All @@ -117,6 +116,17 @@ interface ClientTransportProvider {
ClientTransport get(PickSubchannelArgs args);
}


/**
* Handler for RPC deadline.
*/
interface DeadlineHandler {
/**
* Schedules a task for deadline cancellation.
*/
Future<?> scheduleDeadlineCancellation(Runnable command, Deadline deadline);
}

ClientCallImpl<ReqT, RespT> setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
this.decompressorRegistry = decompressorRegistry;
return this;
Expand Down Expand Up @@ -234,9 +244,7 @@ public void runInContext() {
context.addListener(cancellationListener, directExecutor());
if (effectiveDeadline != null
// If the context has the effective deadline, we don't need to schedule an extra task.
&& context.getDeadline() != effectiveDeadline
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
&& context.getDeadline() != effectiveDeadline) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
Expand Down Expand Up @@ -288,7 +296,7 @@ private static void logIfContextNarrowedTimeout(long effectiveTimeout,

private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
ScheduledFuture<?> f = deadlineCancellationFuture;
Future<?> f = deadlineCancellationFuture;
if (f != null) {
f.cancel(false);
}
Expand All @@ -310,11 +318,10 @@ public void run() {
}
}

private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
private Future<?> startDeadlineTimer(final Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
return deadlineHandler.scheduleDeadlineCancellation(
new LogExceptionRunnable(new DeadlineTimer(remainingNanos)), deadline);
}

@Nullable
Expand Down
190 changes: 184 additions & 6 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.CompressorRegistry;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
Expand All @@ -46,6 +47,7 @@
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import io.grpc.internal.ClientCallImpl.DeadlineHandler;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand All @@ -54,9 +56,13 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -90,6 +96,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
private final LoadBalancer.Factory loadBalancerFactory;
private final ClientTransportFactory transportFactory;
private final Executor executor;
private final Executor directFallbackExecutor = new DirectFallbackExecutor();
private final DeadlineHandler deadlineHandler = new DeadlineHandlerImpl();
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> oobExecutorPool;
private final LogId logId = LogId.allocate(getClass().getName());
Expand All @@ -107,6 +115,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI

private final BackoffPolicy.Provider backoffPolicyProvider;

private final ReadWriteLock shutdownReadWriteLock = new ReadWriteLock();
private boolean shutdownWriteLockUsed;

/**
* We delegate to this channel, so that we can have interceptors as necessary. If there aren't
* any interceptors this will just be {@link RealChannel}.
Expand Down Expand Up @@ -457,6 +468,30 @@ public ManagedChannelImpl shutdown() {
return this;
}

runShutdownTask();
return this;
}

private void runShutdownTask() {
checkState(shutdown.get(), "shutdown must be true");

if (shutdownReadWriteLock.tryToAcquireWriteLock()) {
if (!shutdownWriteLockUsed) {
shutdownWriteLockUsed = true;
shutdownReadWriteLock.releaseWriteLock();
// Continue the following shutdown process.
} else {
shutdownReadWriteLock.releaseWriteLock();
return;
}
} else {
// Either the read or the write lock is currently acquired. Quit here, and let one of the read
// lock acquisitions or the write lock acquisition take over the following shutdown process.
return;
}

// The following is the shutdown process. It can be run only once.

// Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible.
// delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
// delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
Expand All @@ -478,7 +513,6 @@ public void run() {
}
}).drain();
log.log(Level.FINE, "[{0}] Shutting down", getLogId());
return this;
}

/**
Expand Down Expand Up @@ -535,18 +569,18 @@ public String authority() {

private class RealChannel extends Channel {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
Executor executor = callOptions.getExecutor();
if (executor == null) {
executor = ManagedChannelImpl.this.executor;
executor = directFallbackExecutor;
}
return new ClientCallImpl<ReqT, RespT>(
method,
executor,
callOptions,
transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService())
terminated ? null : deadlineHandler)
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry);
}
Expand All @@ -558,6 +592,151 @@ public String authority() {
}
}


/**
* Maintains a pair of associated locks. The read lock can be acquired many times as long as the
* write lock is not currently acquired. The write lock can be acquired only when no lock is
* currently acquired. Each acquisition of the locks must be released once and only once later.
* The lock is not reentrant. The lock is irrelevant to the threads in which you try to acquire or
* release.
*/
private static final class ReadWriteLock {
/**
* A positive number counts the times the read lock has been acquired, and Long.MIN_VALUE or a
* negative number close to Long.MIN_VALUE means a write lock is currently acquired.
*/
final AtomicLong readRef = new AtomicLong();

/**
* Try to acquire the read lock. Return true if succeeded; and false if not able to acquire it
* right now, meaning that the write lock is currently acquired. The method does not block.
*/
boolean tryToAcquireReadLock() {
return readRef.getAndIncrement() >= 0L;
}

/**
* Release an acquisition of the read lock.
*
* @throws IllegalStateException if the read lock is not currently acquired.
*/
void releaseReadLock() {
checkState(readRef.getAndDecrement() > 0L);
}

/**
* Try to acquire the write lock. Return true if succeeded; and false if not able to acquire it
* right now, meaning that either the read or the write lock is currently acquired. The method
* does not block.
*/
boolean tryToAcquireWriteLock() {
return readRef.compareAndSet(0L, Long.MIN_VALUE);
}

/**
* Release an acquisition of the write lock.
*
* @throws IllegalStateException if the write lock is not currently acquired.
*/
void releaseWriteLock() {
checkState(readRef.getAndSet(0L) < 0L);
}
}

private final class DirectFallbackExecutor implements Executor {
@Override
public void execute(final Runnable command) {
if (shutdown.get()) {
command.run();
return;
}

if (!shutdownReadWriteLock.tryToAcquireReadLock()) {
// The write lock is currently acquired, so shutdown is in progress.
command.run();
return;
}
if (shutdownWriteLockUsed) {
// The write lock had been acquired and then released, so shutdown is in progress.
shutdownReadWriteLock.releaseReadLock();
command.run();
return;
}

final boolean[] accepted = new boolean[1];
try {
final Thread thread = Thread.currentThread();
executor.execute(new Runnable() {
@Override
public void run() {
if (Thread.currentThread() == thread && !accepted[0]) {
// Just in case the executor is a direct one.
accepted[0] = true;
shutdownReadWriteLock.releaseReadLock();
if (shutdown.get()) {
// May take over the shutdown process.
runShutdownTask();
}
}
command.run();
}
});
} catch (RuntimeException e) {
// TODO(zdapeng): Handle exception properly
throw e;
} finally {
if (!accepted[0]) {
accepted[0] = true;
shutdownReadWriteLock.releaseReadLock();
if (shutdown.get()) {
// May take over the shutdown process.
runShutdownTask();
}
}
}
}
}

private final class DeadlineHandlerImpl implements DeadlineHandler {
@Override
public Future<?> scheduleDeadlineCancellation(Runnable command, Deadline deadline) {
if (shutdown.get()) {
return cancelledFuture(command);
}

if (!shutdownReadWriteLock.tryToAcquireReadLock()) {
// The write lock is currently acquired, so shutdown is in progress.
command.run();
return cancelledFuture(command);
}
if (shutdownWriteLockUsed) {
// The write lock had been acquired and then released, so shutdown is in progress.
shutdownReadWriteLock.releaseReadLock();
return cancelledFuture(command);
}

try {
return transportFactory.getScheduledExecutorService()
.schedule(command, deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
} catch (RejectedExecutionException e) {
// TODO(zdapeng): Handle Exception properly
throw e;
} finally {
shutdownReadWriteLock.releaseReadLock();
if (shutdown.get()) {
// May take over the shutdown process.
runShutdownTask();
}
}
}
}

private static Future<Void> cancelledFuture(Runnable command) {
Future<Void> future = new FutureTask<Void>(command, null);
future.cancel(false);
return future;
}

/**
* Terminate the channel if termination conditions are met.
*/
Expand Down Expand Up @@ -708,8 +887,7 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, Stri
// TODO(ejona): can we be even stricter? Like terminating?
checkState(!terminated, "Channel is terminated");
final OobChannel oobChannel = new OobChannel(
authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor);
authority, oobExecutorPool, deadlineHandler, channelExecutor);
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
Expand Down
Loading

0 comments on commit d839724

Please sign in to comment.