From cb8aaf8e3e9d0cc06cfa8b83ad40e093f5ebe2bc Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 25 Nov 2025 15:37:49 -0800 Subject: [PATCH] core: Fix shutdown failing accepted RPCs This fixes a race where RPCs could fail with "UNAVAILABLE: Channel shutdown invoked" even though they were created before channel.shutdown(). This basically adopts the internalStart() logic from DelayedStream, although the stream is a bit different because it has APIs that can be called before start() and doesn't need to handle cancel() without start(). The ManagedChannelImpltest had the number of due tasks increase because start() running earlier creates a DelayedStream. Previously the stream wasn't created until runDueTasks() so the mockPicker had already been installed and it could use a real stream from the beginning. But that's specific to the test; in practice it'd be a delayed stream before and after this change. See #12536 --- .../io/grpc/internal/DelayedClientCall.java | 31 +++++++++++++------ .../grpc/internal/DelayedClientCallTest.java | 4 ++- .../grpc/internal/ManagedChannelImplTest.java | 2 +- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientCall.java b/core/src/main/java/io/grpc/internal/DelayedClientCall.java index 253237c3c7d..b568bb12c46 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientCall.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientCall.java @@ -64,6 +64,8 @@ public class DelayedClientCall extends ClientCall { * order, but also used if an error occurs before {@code realCall} is set. */ private Listener listener; + // No need to synchronize; start() synchronization provides a happens-before + private Metadata startHeaders; // Must hold {@code this} lock when setting. private ClientCall realCall; @GuardedBy("this") @@ -161,13 +163,23 @@ public void run() { */ // When this method returns, passThrough is guaranteed to be true public final Runnable setCall(ClientCall call) { + Listener savedDelayedListener; synchronized (this) { // If realCall != null, then either setCall() or cancel() has been called. if (realCall != null) { return null; } setRealCall(checkNotNull(call, "call")); + // start() not yet called + if (delayedListener == null) { + assert pendingRunnables.isEmpty(); + pendingRunnables = null; + passThrough = true; + return null; + } + savedDelayedListener = this.delayedListener; } + internalStart(savedDelayedListener); return new ContextRunnable(context) { @Override public void runInContext() { @@ -176,8 +188,15 @@ public void runInContext() { }; } + private void internalStart(Listener listener) { + Metadata savedStartHeaders = this.startHeaders; + this.startHeaders = null; + context.run(() -> realCall.start(listener, savedStartHeaders)); + } + @Override public final void start(Listener listener, final Metadata headers) { + checkNotNull(headers, "headers"); checkState(this.listener == null, "already started"); Status savedError; boolean savedPassThrough; @@ -188,6 +207,7 @@ public final void start(Listener listener, final Metadata headers) { savedPassThrough = passThrough; if (!savedPassThrough) { listener = delayedListener = new DelayedListener<>(listener); + startHeaders = headers; } } if (savedError != null) { @@ -196,15 +216,7 @@ public final void start(Listener listener, final Metadata headers) { } if (savedPassThrough) { realCall.start(listener, headers); - } else { - final Listener finalListener = listener; - delayOrExecute(new Runnable() { - @Override - public void run() { - realCall.start(finalListener, headers); - } - }); - } + } // else realCall.start() will be called by setCall } // When this method returns, passThrough is guaranteed to be true @@ -253,6 +265,7 @@ public void run() { if (listenerToClose != null) { callExecutor.execute(new CloseListenerRunnable(listenerToClose, status)); } + internalStart(listenerToClose); // listener instance doesn't matter drainPendingCalls(); } callCancelled(); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java index 45682b3a385..557ddfe5ace 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientCallTest.java @@ -151,10 +151,12 @@ public void startThenSetCall() { delayedClientCall.request(1); Runnable r = delayedClientCall.setCall(mockRealCall); assertThat(r).isNotNull(); - r.run(); @SuppressWarnings("unchecked") ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(Listener.class); + // start() must be called before setCall() returns (not in runnable), to ensure the in-use + // counts keeping the channel alive after shutdown() don't momentarily decrease to zero. verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class)); + r.run(); Listener realCallListener = listenerCaptor.getValue(); verify(mockRealCall).request(1); realCallListener.onMessage(1); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index efc582703ba..91a9f506bc8 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -1343,7 +1343,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header PickResult.withSubchannel(subchannel)); updateBalancingStateSafely(helper, READY, mockPicker); - assertEquals(2, executor.runDueTasks()); + assertEquals(3, executor.runDueTasks()); verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class)); verify(mockTransport).newStream(