Skip to content

Commit

Permalink
netty: Reduce race window size between GOAWAY and new streams
Browse files Browse the repository at this point in the history
The race between new streams and transport shutdown is #2562, but it is still
far from being generally solved. This reduces the race window of new streams
from (transport selection → stream created on network thread) to (transport
selection → stream enqueued on network thread). Since only a single thread now
needs to do work in the stream creation race window, the window should be
dramatically smaller.

This only reduces GOAWAY races when the server performs a graceful shutdown
(using two GOAWAYs), as that is the only non-racy way on-the-wire to shutdown a
connection in HTTP/2.
  • Loading branch information
ejona86 committed Apr 10, 2020
1 parent 4974b51 commit 9ead606
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 3 deletions.
Expand Up @@ -43,14 +43,25 @@ public void notifyReady() {
listener.transportReady();
}

public void notifyShutdown(Status s) {
/**
* Marks transport as shutdown, but does not set the error status. This must eventually be
* followed by a call to notifyShutdown.
*/
public void notifyGracefulShutdown(Status s) {
if (transportShutdown) {
return;
}
transportShutdown = true;
listener.transportShutdown(s);
}

public void notifyShutdown(Status s) {
notifyGracefulShutdown(s);
if (shutdownStatus != null) {
return;
}
shutdownStatus = s;
shutdownThrowable = s.asException();
listener.transportShutdown(s);
}

public void notifyInUse(boolean inUse) {
Expand Down
13 changes: 12 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Expand Up @@ -755,10 +755,21 @@ public boolean visit(Http2Stream stream) throws Http2Exception {

/**
* Handler for a GOAWAY being received. Fails any streams created after the
* last known stream.
* last known stream. May only be called during a read.
*/
private void goingAway(Status status) {
lifecycleManager.notifyGracefulShutdown(status);
// Try to allocate as many in-flight streams as possible, to reduce race window of
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
// after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
// processed and thus this processing must be in-line before processing additional reads.

// This can cause reentrancy, but should be minor since it is normal to handle writes in
// response to a read. Also, the call stack is rather shallow at this point
clientWriteQueue.drainNow();
lifecycleManager.notifyShutdown(status);

final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
Expand Down
13 changes: 13 additions & 0 deletions netty/src/main/java/io/grpc/netty/WriteQueue.java
Expand Up @@ -101,6 +101,19 @@ void enqueue(Runnable runnable, boolean flush) {
}
}

/**
* Executes enqueued work directly on the current thread. This can be used to trigger writes
* before performing additional reads. Must be called from the event loop. This method makes no
* guarantee that the work queue is empty when it returns.
*/
void drainNow() {
Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
if (queue.peek() == null) {
return;
}
flush();
}

/**
* Process the queue of commands and dispatch them to the stream. This method is only
* called in the event loop
Expand Down
13 changes: 13 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Expand Up @@ -361,6 +361,19 @@ public void receivedGoAwayShouldNotAffectEarlyStreamId() throws Exception {
assertTrue(future.isDone());
}

@Test
public void receivedGoAwayShouldNotAffectRacingQueuedStreamId() throws Exception {
// This command has not actually been executed yet
ChannelFuture future = writeQueue().enqueue(
newCreateStreamCommand(grpcHeaders, streamTransportState), true);
channelRead(goAwayFrame(streamId));
verify(streamListener, never())
.closed(any(Status.class), any(Metadata.class));
verify(streamListener, never())
.closed(any(Status.class), any(RpcProgress.class), any(Metadata.class));
assertTrue(future.isDone());
}

@Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
Expand Down
4 changes: 4 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
Expand Up @@ -203,6 +203,10 @@ public int compareTo(Delayed o) {
}
}

protected final WriteQueue writeQueue() {
return writeQueue;
}

protected final T handler() {
return handler;
}
Expand Down

0 comments on commit 9ead606

Please sign in to comment.