Skip to content

Commit

Permalink
stub: Wait for onClose when blocking stub is interrupted
Browse files Browse the repository at this point in the history
Interceptors need to see the onClose to clean up properly.

This also changes an isInterrupted() to interrupted(), since previously
the interrupted flag was still set when InterruptedException was thrown.
This caused an infinite loop with the new code. Previously, all callers
immediately re-set the interrupted flag, so there was no issue.

Fixes grpc#5576
  • Loading branch information
ejona86 committed Apr 10, 2019
1 parent 75bc066 commit 8066a25
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 37 deletions.
82 changes: 54 additions & 28 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,30 @@ public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call
public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withCause(e)
.asRuntimeException();
interrupt = true;
call.cancel("Thread interrupted", e);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}

Expand Down Expand Up @@ -208,7 +213,7 @@ private static <V> V getUnchecked(Future<V> future) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withDescription("Thread interrupted")
.withCause(e)
.asRuntimeException();
} catch (ExecutionException e) {
Expand Down Expand Up @@ -546,30 +551,52 @@ ClientCall.Listener<T> listener() {
return listener;
}

private Object waitForNext() throws InterruptedException {
private Object waitForNext() {
if (threadless == null) {
return buffer.take();
boolean interrupt = false;
try {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
} else {
Object next = buffer.poll();
while (next == null) {
threadless.waitAndDrain();
next = buffer.poll();
boolean interrupt = false;
try {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return next;
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
return next;
}
}

@Override
public boolean hasNext() {
if (last == null) {
try {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException();
}
while (last == null) {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
}
if (last instanceof StatusRuntimeException) {
// Rethrow the exception with a new stacktrace.
Expand Down Expand Up @@ -643,15 +670,14 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
* Must only be called by one thread at a time.
*/
public void waitAndDrain() throws InterruptedException {
final Thread currentThread = Thread.currentThread();
throwIfInterrupted(currentThread);
throwIfInterrupted();
Runnable runnable = poll();
if (runnable == null) {
waiter = currentThread;
waiter = Thread.currentThread();
try {
while ((runnable = poll()) == null) {
LockSupport.park(this);
throwIfInterrupted(currentThread);
throwIfInterrupted();
}
} finally {
waiter = null;
Expand All @@ -666,8 +692,8 @@ public void waitAndDrain() throws InterruptedException {
} while ((runnable = poll()) != null);
}

private static void throwIfInterrupted(Thread currentThread) throws InterruptedException {
if (currentThread.isInterrupted()) {
private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
Expand Down

0 comments on commit 8066a25

Please sign in to comment.