Skip to content

Commit

Permalink
core: mitigate the race between channel termination and clientCallImp…
Browse files Browse the repository at this point in the history
…l start
  • Loading branch information
dapengzhang0 committed Jul 27, 2017
1 parent da47085 commit 6f09430
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
75 changes: 68 additions & 7 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import io.grpc.Status;
import java.io.InputStream;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -149,6 +151,26 @@ public void start(final Listener<RespT> observer, Metadata headers) {
checkNotNull(observer, "observer");
checkNotNull(headers, "headers");

if (deadlineCancellationExecutor == null) {
stream = NoopClientStream.INSTANCE;
class ClosedDueToChannelTermination extends ContextRunnable {
ClosedDueToChannelTermination() {
super(context);
}

@Override
public void runInContext() {
closeObserver(
observer,
Status.UNAVAILABLE.withDescription("Channel is already terminated"),
new Metadata());
}
}

callExecutor.execute(new ClosedDueToChannelTermination());
return;
}

if (context.isCancelled()) {
// Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor
Expand Down Expand Up @@ -234,9 +256,7 @@ public void runInContext() {
context.addListener(cancellationListener, directExecutor());
if (effectiveDeadline != null
// If the context has the effective deadline, we don't need to schedule an extra task.
&& context.getDeadline() != effectiveDeadline
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
&& context.getDeadline() != effectiveDeadline) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
Expand Down Expand Up @@ -310,11 +330,52 @@ public void run() {
}
}

private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
private ScheduledFuture<?> startDeadlineTimer(final Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
try {
return deadlineCancellationExecutor
.schedule(new LogExceptionRunnable(new DeadlineTimer(remainingNanos)), remainingNanos,
TimeUnit.NANOSECONDS);
} catch (RejectedExecutionException e) {
class CancelledFuture implements ScheduledFuture<Void> {
@Override
public long getDelay(TimeUnit unit) {
return deadline.timeRemaining(unit);
}

@Override
public int compareTo(Delayed delayed) {
throw new UnsupportedOperationException();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}

@Override
public boolean isCancelled() {
return true;
}

@Override
public boolean isDone() {
return false;
}

@Override
public Void get() {
throw new CancellationException();
}

@Override
public Void get(long timeout, TimeUnit unit) {
throw new CancellationException();
}
}

return new CancelledFuture();
}
}

@Nullable
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -535,7 +536,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, Resp
CallOptions callOptions) {
Executor executor = callOptions.getExecutor();
if (executor == null) {
executor = ManagedChannelImpl.this.executor;
executor = new DirectFallbackExecutor();
}
return new ClientCallImpl<ReqT, RespT>(
method,
Expand All @@ -554,6 +555,32 @@ public String authority() {
}
}

private final class DirectFallbackExecutor implements Executor {
@Override
public void execute(final Runnable command) {
final boolean[] accepted = new boolean[1];
try {
ManagedChannelImpl.this.executor.execute(new Runnable() {
@Override
public void run() {
accepted[0] = true;
command.run();
}
});
} catch (RejectedExecutionException e) {
// If ManagedChannelImpl.this.executor is a direct executor, the command in the same thread
// may also throw a RejectedExecutionException.
if (accepted[0] || !terminated) {
throw e;
}
// This handles a race when a new call is created, the channel is terminated, then the call
// is started. The command is to close observer, but ManagedChannelImpl.this.executor is not
// available to use. We need close the observer anyway, so run the command directly.
command.run();
}
}
}

/**
* Terminate the channel if termination conditions are met.
*/
Expand Down

0 comments on commit 6f09430

Please sign in to comment.