From 551d51eec7d9f5f052f930de1cb14e680b4c7f35 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 5 Aug 2021 18:22:37 -0700 Subject: [PATCH 1/2] core: fix bug RetriableStream cancel() racing with start() (#8386) There is a bug in the scenario of the following sequence of events: - `call.start()` - received retryable status and about to retry - The retry attempt Substream is created but not yet `drain()` - `call.cancel()` But `stream.cancel()` cannot be called prior to `stream.start()`, otherwise retry will cause a failure with IllegalStateException. The current RetriableStream code must be fixed to not cancel a stream until `start()` is called in `drain()`. --- .../io/grpc/internal/RetriableStream.java | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 23725788466..396c7cedfe2 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -104,6 +104,7 @@ abstract class RetriableStream implements ClientStream { @GuardedBy("lock") private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; + private Status cancellationStatus; RetriableStream( MethodDescriptor method, Metadata headers, @@ -244,14 +245,16 @@ private void drain(Substream substream) { int index = 0; int chunk = 0x80; List list = null; + boolean streamStarted = false; while (true) { State savedState; synchronized (lock) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me + if (savedState.winningSubstream != null && savedState.winningSubstream != substream + && streamStarted) { + // committed but not me, to be cancelled break; } if (index == savedState.buffer.size()) { // I'm drained @@ -275,17 +278,22 @@ private void drain(Substream substream) { for (BufferEntry bufferEntry : list) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me + if (savedState.winningSubstream != null && savedState.winningSubstream != substream + && streamStarted) { + // committed but not me, to be cancelled break; } - if (savedState.cancelled) { + if (savedState.cancelled && streamStarted) { checkState( savedState.winningSubstream == substream, "substream should be CANCELLED_BECAUSE_COMMITTED already"); + substream.stream.cancel(cancellationStatus); return; } bufferEntry.runWith(substream); + if (bufferEntry instanceof RetriableStream.StartEntry) { + streamStarted = true; + } } } @@ -299,6 +307,13 @@ private void drain(Substream substream) { @Nullable abstract Status prestart(); + class StartEntry implements BufferEntry { + @Override + public void runWith(Substream substream) { + substream.stream.start(new Sublistener(substream)); + } + } + /** Starts the first PRC attempt. */ @Override public final void start(ClientStreamListener listener) { @@ -311,13 +326,6 @@ public final void start(ClientStreamListener listener) { return; } - class StartEntry implements BufferEntry { - @Override - public void runWith(Substream substream) { - substream.stream.start(new Sublistener(substream)); - } - } - synchronized (lock) { state.buffer.add(new StartEntry()); } @@ -450,11 +458,18 @@ public final void cancel(Status reason) { return; } - state.winningSubstream.stream.cancel(reason); + Substream winningSubstreamToCancel = null; synchronized (lock) { - // This is not required, but causes a short-circuit in the draining process. + if (state.drainedSubstreams.contains(state.winningSubstream)) { + winningSubstreamToCancel = state.winningSubstream; + } else { // the winningSubstream will be cancelled while draining + cancellationStatus = reason; + } state = state.cancelled(); } + if (winningSubstreamToCancel != null) { + winningSubstreamToCancel.stream.cancel(reason); + } } private void delayOrExecute(BufferEntry bufferEntry) { From db2900a2138fb06198f8563efac0aec9a6c13ab9 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 6 Aug 2021 18:32:55 -0700 Subject: [PATCH 2/2] core: fix RetriableStream edge case bug introduced in #8386 (#8393) While adding regression tests to #8386, I found a bug in an edge case: while retry attempt is draining the last buffered entry, if it is in the mean time committed and then we cancel the call, the stream will never be cancelled. See the regression test case `commitAndCancelWhileDraining()`. --- .../io/grpc/internal/RetriableStream.java | 38 +++--- .../io/grpc/internal/RetriableStreamTest.java | 126 ++++++++++++++++++ 2 files changed, 146 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 396c7cedfe2..d19a260049b 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -252,10 +252,14 @@ private void drain(Substream substream) { synchronized (lock) { savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream - && streamStarted) { - // committed but not me, to be cancelled - break; + if (streamStarted) { + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; + } } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); @@ -277,27 +281,25 @@ private void drain(Substream substream) { } for (BufferEntry bufferEntry : list) { - savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream - && streamStarted) { - // committed but not me, to be cancelled - break; - } - if (savedState.cancelled && streamStarted) { - checkState( - savedState.winningSubstream == substream, - "substream should be CANCELLED_BECAUSE_COMMITTED already"); - substream.stream.cancel(cancellationStatus); - return; - } bufferEntry.runWith(substream); if (bufferEntry instanceof RetriableStream.StartEntry) { streamStarted = true; } + if (streamStarted) { + savedState = state; + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; + } + } } } - substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); + substream.stream.cancel( + state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } /** diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 26c6fcf9b4e..95d2c2ba8b5 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -749,6 +749,91 @@ public void request(int numMessages) { inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); } + @Test + public void cancelWhileDraining() { + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void request(int numMessages) { + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while requesting")); + } + })); + + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + retriableStream.start(masterListener); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + retriableStream.request(3); + inOrder.verify(mockStream1).request(3); + + // retry + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + sublistenerCaptor1.getValue().closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).request(3); + inOrder.verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + inOrder.verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()) + .isEqualTo("Stream thrown away because RetriableStream committed"); + verify(masterListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while requesting"); + } + + @Test + public void cancelWhileRetryStart() { + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void start(ClientStreamListener listener) { + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while retry start")); + } + })); + + InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + retriableStream.start(masterListener); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + + // retry + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); + sublistenerCaptor1.getValue().closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + inOrder.verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()) + .isEqualTo("Stream thrown away because RetriableStream committed"); + verify(masterListener).closed( + statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while retry start"); + } + @Test public void operationsAfterImmediateCommit() { ArgumentCaptor sublistenerCaptor1 = @@ -916,6 +1001,47 @@ public void start(ClientStreamListener listener) { verify(mockStream3).request(1); } + @Test + public void commitAndCancelWhileDraining() { + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = + mock( + ClientStream.class, + delegatesTo( + new NoopClientStream() { + @Override + public void start(ClientStreamListener listener) { + // commit while draining + listener.headersRead(new Metadata()); + // cancel while draining + retriableStream.cancel( + Status.CANCELLED.withDescription("cancelled while drained")); + } + })); + + when(retriableStreamRecorder.newSubstream(anyInt())) + .thenReturn(mockStream1, mockStream2); + + retriableStream.start(masterListener); + + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + ClientStreamListener listener1 = sublistenerCaptor1.getValue(); + + // retry + listener1.closed( + Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); + + verify(mockStream2).start(any(ClientStreamListener.class)); + verify(retriableStreamRecorder).postCommit(); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockStream2).cancel(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); + assertThat(statusCaptor.getValue().getDescription()).isEqualTo("cancelled while drained"); + } @Test public void perRpcBufferLimitExceeded() {