Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix retriablestream deadlock #10386

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,23 @@ public void run() {
abstract void postCommit();

/**
* Calls commit() and if successful runs the post commit task.
* Calls commit() and if successful runs the post commit task. Post commit task will be non-null
* for only once. The post commit task cancels other non-winning streams on separate transport
* threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
* transports.(issues/10314)
* This method should be called only in subListener callbacks. This guarantees callExecutor
* schedules tasks before master listener closes, which is protected by the inFlightSubStreams
* decorative. That is because:
* For a successful winning stream, other streams won't attempt to close master listener.
* For a cancelled winning stream (noop), other stream won't attempt to close master listener.
* For a failed/closed winning stream, the last closed stream closes the master listener, and
* callExecutor scheduling happens-before that.
*/
private void commitAndRun(Substream winningSubstream) {
Runnable postCommitTask = commit(winningSubstream);

if (postCommitTask != null) {
postCommitTask.run();
callExecutor.execute(postCommitTask);
}
}

Expand Down
125 changes: 123 additions & 2 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Test;
Expand Down Expand Up @@ -190,7 +192,7 @@ Status prestart() {

private RetriableStream<String> retriableStream =
newThrottledRetriableStream(null /* throttle */);
private final RetriableStream<String> hedgingStream =
private RetriableStream<String> hedgingStream =
newThrottledHedgingStream(null /* throttle */);

private ClientStreamTracer bufferSizeTracer;
Expand All @@ -206,9 +208,13 @@ method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_
}

private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
return newThrottledHedgingStream(throttle, MoreExecutors.directExecutor());
}

private RetriableStream<String> newThrottledHedgingStream(Throttle throttle, Executor executor) {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
executor, fakeClock.getScheduledExecutorService(),
null, HEDGING_POLICY, throttle);
}

Expand Down Expand Up @@ -2482,6 +2488,121 @@ public void hedging_pushback_negative() {
inOrder.verifyNoMoreInteractions();
}

// This is for hedging deadlock when multiple in-flight streams races when transports call back,
// particularly for OkHttp:
// e.g. stream1 subListener gets closed() and in turn creates another stream. This ends up with
// transport1 thread lock is held while waiting for transport2 lock for creating a new stream.
// Stream2 subListener gets headersRead() and then try to commit and cancel all other drained
// streams, including the ones that is on transport1. This causes transport2 thread lock held
// while waiting for transport1 (cancel stream requires lock). Thus deadlock.
// Deadlock could also happen when two streams both gets headerRead() at the same time.
// It is believed that retry does not have the issue because streams are created sequentially.
@Test(timeout = 15000)
public void hedging_deadlock() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class); //on transport1
ClientStream mockStream2 = mock(ClientStream.class); //on transport2
ClientStream mockStream3 = mock(ClientStream.class); //on transport2
ClientStream mockStream4 = mock(ClientStream.class); //on transport1

ReentrantLock transport1Lock = new ReentrantLock();
ReentrantLock transport2Lock = new ReentrantLock();
InOrder inOrder = inOrder(
mockStream1, mockStream2, mockStream3,
retriableStreamRecorder, masterListener);
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1)
.thenReturn(mockStream2)
.thenReturn(mockStream3)
.thenAnswer(new Answer<ClientStream>() {

@Override
public ClientStream answer(InvocationOnMock invocation) throws Throwable {
transport1Lock.lock();
return mockStream4;
}
});

hedgingStream = newThrottledHedgingStream(null, fakeClock.getScheduledExecutorService());
hedgingStream.start(masterListener);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();

doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream3).cancel(any(Status.class));

doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream2).cancel(any(Status.class));

CountDownLatch latch = new CountDownLatch(1);
Thread transport1Activity = new Thread(new Runnable() {
@Override
public void run() {
transport1Lock.lock();
try {
sublistenerCaptor1.getValue().headersRead(new Metadata());
latch.countDown();
} finally {
transport1Lock.unlock();
}
}
}, "Thread-transport1");
transport1Activity.start();
Thread transport2Activity = new Thread(new Runnable() {
@Override
public void run() {
transport2Lock.lock();
try {
sublistenerCaptor2.getValue()
.closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), REFUSED, new Metadata());
} finally {
transport2Lock.unlock();
if (transport1Lock.tryLock()) {
transport1Lock.unlock();
}
}
}
}, "Thread-transport2");
transport2Activity.start();
latch.await();
fakeClock.runDueTasks();
transport2Activity.join();
transport1Activity.join();
}

@Test
public void hedging_pushback_positive() {
ClientStream mockStream1 = mock(ClientStream.class);
Expand Down