From c387c62f2b2fa4761c650b47b861b7db7ae5a79b Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 15 Mar 2018 14:26:12 -0700 Subject: [PATCH 1/5] netty: http2 server transport graceful shutdown --- .../io/grpc/netty/NettyServerHandler.java | 176 +++++++++-- .../io/grpc/netty/NettyServerHandlerTest.java | 281 +++++++++++++++++- 2 files changed, 415 insertions(+), 42 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 21c1e6de718..d8343da993e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -82,6 +82,7 @@ import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -96,6 +97,10 @@ class NettyServerHandler extends AbstractNettyHandler { private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); private static final long KEEPALIVE_PING = 0xDEADL; + private static final long MAX_CONNECTION_AGE_PING = 0xA9EL; + private static final long MAX_CONNECTION_IDLE_PING = 0x1D1EL; + private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); + private static final SystemTicker SYSTEM_TICKER = new SystemTicker(); private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; @@ -121,6 +126,12 @@ class NettyServerHandler extends AbstractNettyHandler { private MaxConnectionIdleManager maxConnectionIdleManager; @CheckForNull private ScheduledFuture maxConnectionAgeMonitor; + @CheckForNull + private GracefulShutdownRunner maxAgeShutdownRunner; + @CheckForNull + private GracefulShutdownRunner maxIdleShutdownRunner; + + private Ticker ticker = SYSTEM_TICKER; static NettyServerHandler newHandler( ServerTransportListener transportListener, @@ -250,18 +261,9 @@ private NettyServerHandler( maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { @Override void close(ChannelHandlerContext ctx) { - goAway( - ctx, - Integer.MAX_VALUE, - Http2Error.NO_ERROR.code(), - ByteBufUtil.writeAscii(ctx.alloc(), "max_idle"), - ctx.newPromise()); - ctx.flush(); - try { - NettyServerHandler.this.close(ctx, ctx.newPromise()); - } catch (Exception e) { - onError(ctx, /* outbound= */ true, e); - } + maxIdleShutdownRunner = new GracefulShutdownRunner( + ctx, MAX_CONNECTION_IDLE_PING, "max_idle", null); + maxIdleShutdownRunner.run(); } }; } @@ -321,26 +323,9 @@ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { new LogExceptionRunnable(new Runnable() { @Override public void run() { - // send GO_AWAY - ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age"); - goAway( - ctx, - Integer.MAX_VALUE, - Http2Error.NO_ERROR.code(), - debugData, - ctx.newPromise()); - - // gracefully shutdown with specified grace time - long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis(); - try { - gracefulShutdownTimeoutMillis( - TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos)); - close(ctx, ctx.newPromise()); - } catch (Exception e) { - onError(ctx, /* outbound= */ true, e); - } finally { - gracefulShutdownTimeoutMillis(savedGracefulShutdownTime); - } + maxAgeShutdownRunner = new GracefulShutdownRunner( + ctx, MAX_CONNECTION_AGE_PING, "max_age", maxConnectionAgeGraceInNanos); + maxAgeShutdownRunner.run(); } }), maxConnectionAgeInNanos, @@ -787,6 +772,12 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce logger.log(Level.FINE, String.format("Window: %d", decoder().flowController().initialWindowSize(connection().connectionStream()))); } + } else if (data == MAX_CONNECTION_AGE_PING) { + checkNotNull(maxAgeShutdownRunner, "maxAgeShutdownRunner"); + maxAgeShutdownRunner.secondGoAwayAndClose(); + } else if (data == MAX_CONNECTION_IDLE_PING) { + checkNotNull(maxIdleShutdownRunner, "maxIdleShutdownRunner"); + maxIdleShutdownRunner.secondGoAwayAndClose(); } else if (data != KEEPALIVE_PING) { logger.warning("Received unexpected ping ack. No ping outstanding"); } @@ -803,7 +794,6 @@ private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger @Override public void ping() { ChannelFuture pingFuture = encoder().writePing( - // slice KEEPALIVE_PING because tls handler may modify the reader index ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise()); ctx.flush(); if (transportTracer != null) { @@ -837,6 +827,105 @@ public void onPingTimeout() { } } + private final class GracefulShutdownRunner { + + ChannelHandlerContext ctx; + long payload; + String goAwayMessage; + + /** + * The grace time between starting graceful shutdown and closing the netty channel, + * {@code null} is unspecified. + */ + @CheckForNull + Long graceTimeInNanos; + + /** + * True if ping is Acked or ping is timeout. + */ + boolean pingAckedOrTimeout; + + /** + * Deadline of the shutdown. + */ + long deadline; + Future pingFuture; + + GracefulShutdownRunner( + ChannelHandlerContext ctx, long payload, String goAwayMessage, + @Nullable Long graceTimeInNanos) { + this.ctx = ctx; + this.payload = payload; + this.goAwayMessage = goAwayMessage; + this.graceTimeInNanos = graceTimeInNanos; + } + + /** + * Sends out first GOAWAY and ping, and schedules second GOAWAY and close. + */ + void run() { + goAway( + ctx, + Integer.MAX_VALUE, + Http2Error.NO_ERROR.code(), + ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), + ctx.newPromise()); + ctx.flush(); + + long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS; + if (graceTimeInNanos != null) { + deadline = ticker.read() + graceTimeInNanos; + gracefulShutdownPingTimeout = Math.min(gracefulShutdownPingTimeout, graceTimeInNanos); + } + pingFuture = ctx.executor().schedule( + new Runnable() { + @Override + public void run() { + secondGoAwayAndClose(); + } + }, + gracefulShutdownPingTimeout, + TimeUnit.NANOSECONDS); + + encoder().writePing(ctx, false /* isAck */, payload, ctx.newPromise()); + ctx.flush(); + } + + void secondGoAwayAndClose() { + if (pingAckedOrTimeout) { + return; + } + pingAckedOrTimeout = true; + + checkNotNull(pingFuture, "pingFuture"); + pingFuture.cancel(false); + + // send the second GOAWAY with last stream id + goAway( + ctx, + connection().remote().lastStreamCreated(), + Http2Error.NO_ERROR.code(), + ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), + ctx.newPromise()); + ctx.flush(); + + // gracefully shutdown with specified grace time + long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); + long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis; + if (graceTimeInNanos != null) { + gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(deadline - ticker.read()); + } + try { + gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis); + close(ctx, ctx.newPromise()); + } catch (Exception e) { + onError(ctx, /* outbound= */ true, e); + } finally { + gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis); + } + } + } + // Use a frame writer so that we know when frames are through flow control and actually being // written. private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter { @@ -871,4 +960,25 @@ public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2 padding, endStream, promise); } } + + @VisibleForTesting + void setTickerForTest(Ticker ticker) { + this.ticker = ticker; + } + + // TODO(zsurocking): Classes below are copied from Deadline.java. We should consider share the + // code. + + /** Time source representing nanoseconds since fixed but arbitrary point in time. */ + abstract static class Ticker { + /** Returns the number of nanoseconds since this source's epoch. */ + public abstract long read(); + } + + private static class SystemTicker extends Ticker { + @Override + public long read() { + return System.nanoTime(); + } + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index b53a690104a..0e0f34cfda5 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -67,6 +67,7 @@ import io.grpc.internal.StreamListener; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; +import io.grpc.netty.NettyServerHandler.Ticker; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -198,6 +199,14 @@ protected void manualSetUp() throws Exception { // Simulate receipt of initial remote settings. ByteBuf serializedSettings = serializeSettings(new Http2Settings()); channelRead(serializedSettings); + + handler().setTickerForTest( + new Ticker() { + @Override + public long read() { + return fakeClock().getTicker().read(); + } + }); } @Test @@ -697,24 +706,112 @@ public void noGoAwaySentBeforeMaxConnectionIdleReached() throws Exception { } @Test - public void maxConnectionIdle_goAwaySent() throws Exception { + public void maxConnectionIdle_goAwaySent_pingAck() throws Exception { + maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + assertTrue(channel().isOpen()); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + channelRead(pingFrame(true /* isAck */, 0x1D1EL)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionIdle_goAwaySent_pingTimeout() throws Exception { + maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + assertTrue(channel().isOpen()); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(10, TimeUnit.SECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionIdle_activeThenRst_pingAck() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); + createStream(); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // GO_AWAY not sent when active + verifyWrite(never()).writeGoAway( + any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class), + any(ChannelPromise.class)); assertTrue(channel().isOpen()); + channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); + fakeClock().forwardNanos(maxConnectionIdleInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(10, TimeUnit.SECONDS); + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @Test - public void maxConnectionIdle_activeThenRst() throws Exception { + public void maxConnectionIdle_activeThenRst_pingTimeoutk() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); createStream(); @@ -731,11 +828,24 @@ public void maxConnectionIdle_activeThenRst() throws Exception { fakeClock().forwardNanos(maxConnectionIdleInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + channelRead(pingFrame(true /* isAck */, 0x1D1EL)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @@ -755,18 +865,68 @@ public void noGoAwaySentBeforeMaxConnectionAgeReached() throws Exception { } @Test - public void maxConnectionAge_goAwaySent() throws Exception { + public void maxConnectionAge_goAwaySent_pingAck() throws Exception { + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionAgeInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + channelRead(pingFrame(true /* isAck */, 0xA9EL)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception { + + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + assertTrue(channel().isOpen()); + + fakeClock().forwardNanos(maxConnectionAgeInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(10, TimeUnit.SECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @@ -780,32 +940,135 @@ public void maxConnectionAgeGrace_channelStillOpenDuringGracePeriod() throws Exc fakeClock().forwardNanos(maxConnectionAgeInNanos); + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); fakeClock().forwardTime(20, TimeUnit.MINUTES); + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel not closed yet assertTrue(channel().isOpen()); } @Test - public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception { + public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout() + throws Exception { maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); - maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); + maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout manualSetUp(); createStream(); fakeClock().forwardNanos(maxConnectionAgeInNanos); + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); + + // second GOA_WAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); assertTrue(channel().isOpen()); - fakeClock().forwardNanos(maxConnectionAgeGraceInNanos); + fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); + + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck() + throws Exception { + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout + manualSetUp(); + createStream(); + fakeClock().forwardNanos(maxConnectionAgeInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + long pingRoundTripMillis = 100; // less than ping timeout + fakeClock().forwardTime(pingRoundTripMillis, TimeUnit.MILLISECONDS); + channelRead(pingFrame(true /* isAck */, 0xA9EL)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + fakeClock().forwardNanos( + maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(pingRoundTripMillis) + - TimeUnit.MILLISECONDS.toNanos(2)); + + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); + + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withGraceLessThanPingTimeout() + throws Exception { + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + maxConnectionAgeGraceInNanos = TimeUnit.SECONDS.toNanos(5L); // less than ping timeout + manualSetUp(); + createStream(); + + fakeClock().forwardNanos(maxConnectionAgeInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + + fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); + + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } From 915b63838fc1a5a0827318530f22a762ee39a86e Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 26 Mar 2018 16:34:08 -0700 Subject: [PATCH 2/5] partially fix comments --- .../io/grpc/netty/NettyServerHandler.java | 53 ++++++++----------- .../io/grpc/netty/NettyServerHandlerTest.java | 30 +++++------ 2 files changed, 36 insertions(+), 47 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d8343da993e..a5c98656976 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -97,8 +97,7 @@ class NettyServerHandler extends AbstractNettyHandler { private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); private static final long KEEPALIVE_PING = 0xDEADL; - private static final long MAX_CONNECTION_AGE_PING = 0xA9EL; - private static final long MAX_CONNECTION_IDLE_PING = 0x1D1EL; + private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); private static final SystemTicker SYSTEM_TICKER = new SystemTicker(); @@ -127,9 +126,7 @@ class NettyServerHandler extends AbstractNettyHandler { @CheckForNull private ScheduledFuture maxConnectionAgeMonitor; @CheckForNull - private GracefulShutdownRunner maxAgeShutdownRunner; - @CheckForNull - private GracefulShutdownRunner maxIdleShutdownRunner; + private GracefulShutdown gracefulShutdown; private Ticker ticker = SYSTEM_TICKER; @@ -261,9 +258,11 @@ private NettyServerHandler( maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { @Override void close(ChannelHandlerContext ctx) { - maxIdleShutdownRunner = new GracefulShutdownRunner( - ctx, MAX_CONNECTION_IDLE_PING, "max_idle", null); - maxIdleShutdownRunner.run(); + if (gracefulShutdown == null) { + gracefulShutdown = new GracefulShutdown("max_idle", null); + gracefulShutdown.start(ctx); + ctx.flush(); + } } }; } @@ -323,9 +322,11 @@ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { new LogExceptionRunnable(new Runnable() { @Override public void run() { - maxAgeShutdownRunner = new GracefulShutdownRunner( - ctx, MAX_CONNECTION_AGE_PING, "max_age", maxConnectionAgeGraceInNanos); - maxAgeShutdownRunner.run(); + if (gracefulShutdown != null) { + gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos); + gracefulShutdown.start(ctx); + ctx.flush(); + } } }), maxConnectionAgeInNanos, @@ -772,12 +773,9 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce logger.log(Level.FINE, String.format("Window: %d", decoder().flowController().initialWindowSize(connection().connectionStream()))); } - } else if (data == MAX_CONNECTION_AGE_PING) { - checkNotNull(maxAgeShutdownRunner, "maxAgeShutdownRunner"); - maxAgeShutdownRunner.secondGoAwayAndClose(); - } else if (data == MAX_CONNECTION_IDLE_PING) { - checkNotNull(maxIdleShutdownRunner, "maxIdleShutdownRunner"); - maxIdleShutdownRunner.secondGoAwayAndClose(); + } else if (data == GRACEFUL_SHUTDOWN_PING) { + checkNotNull(gracefulShutdown, "gracefulShutdownRunner"); + gracefulShutdown.secondGoAwayAndClose(ctx); } else if (data != KEEPALIVE_PING) { logger.warning("Received unexpected ping ack. No ping outstanding"); } @@ -827,10 +825,7 @@ public void onPingTimeout() { } } - private final class GracefulShutdownRunner { - - ChannelHandlerContext ctx; - long payload; + private final class GracefulShutdown { String goAwayMessage; /** @@ -851,11 +846,8 @@ private final class GracefulShutdownRunner { long deadline; Future pingFuture; - GracefulShutdownRunner( - ChannelHandlerContext ctx, long payload, String goAwayMessage, + GracefulShutdown(String goAwayMessage, @Nullable Long graceTimeInNanos) { - this.ctx = ctx; - this.payload = payload; this.goAwayMessage = goAwayMessage; this.graceTimeInNanos = graceTimeInNanos; } @@ -863,14 +855,13 @@ private final class GracefulShutdownRunner { /** * Sends out first GOAWAY and ping, and schedules second GOAWAY and close. */ - void run() { + void start(final ChannelHandlerContext ctx) { goAway( ctx, Integer.MAX_VALUE, Http2Error.NO_ERROR.code(), ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), ctx.newPromise()); - ctx.flush(); long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS; if (graceTimeInNanos != null) { @@ -881,17 +872,16 @@ void run() { new Runnable() { @Override public void run() { - secondGoAwayAndClose(); + secondGoAwayAndClose(ctx); } }, gracefulShutdownPingTimeout, TimeUnit.NANOSECONDS); - encoder().writePing(ctx, false /* isAck */, payload, ctx.newPromise()); - ctx.flush(); + encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise()); } - void secondGoAwayAndClose() { + void secondGoAwayAndClose(ChannelHandlerContext ctx) { if (pingAckedOrTimeout) { return; } @@ -907,7 +897,6 @@ void secondGoAwayAndClose() { Http2Error.NO_ERROR.code(), ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), ctx.newPromise()); - ctx.flush(); // gracefully shutdown with specified grace time long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0e0f34cfda5..b119ef3f4b5 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -719,7 +719,7 @@ public void maxConnectionIdle_goAwaySent_pingAck() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -730,7 +730,7 @@ public void maxConnectionIdle_goAwaySent_pingAck() throws Exception { any(ChannelPromise.class)); assertTrue(channel().isOpen()); - channelRead(pingFrame(true /* isAck */, 0x1D1EL)); + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // second GO_AWAY sent verifyWrite().writeGoAway( @@ -754,7 +754,7 @@ public void maxConnectionIdle_goAwaySent_pingTimeout() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -794,7 +794,7 @@ public void maxConnectionIdle_activeThenRst_pingAck() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -834,13 +834,13 @@ public void maxConnectionIdle_activeThenRst_pingTimeoutk() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0x1D1EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertTrue(channel().isOpen()); - channelRead(pingFrame(true /* isAck */, 0x1D1EL)); + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // second GO_AWAY sent verifyWrite().writeGoAway( @@ -879,18 +879,18 @@ public void maxConnectionAge_goAwaySent_pingAck() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); - channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // irrelevant ping Ack verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); assertTrue(channel().isOpen()); - channelRead(pingFrame(true /* isAck */, 0xA9EL)); + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // second GO_AWAY sent verifyWrite().writeGoAway( @@ -915,7 +915,7 @@ public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception { any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -946,7 +946,7 @@ public void maxConnectionAgeGrace_channelStillOpenDuringGracePeriod() throws Exc any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -977,7 +977,7 @@ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout( any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -1012,14 +1012,14 @@ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck() any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); verifyWrite(never()).writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); long pingRoundTripMillis = 100; // less than ping timeout fakeClock().forwardTime(pingRoundTripMillis, TimeUnit.MILLISECONDS); - channelRead(pingFrame(true /* isAck */, 0xA9EL)); + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // second GO_AWAY sent verifyWrite().writeGoAway( @@ -1054,7 +1054,7 @@ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withGraceLessTha any(ChannelPromise.class)); // ping sent verifyWrite().writePing( - eq(ctx()), eq(false), eq(0xA9EL), any(ChannelPromise.class)); + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); From 41022679b938971af3294215f4debb8bb8f09b6a Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 26 Mar 2018 17:21:37 -0700 Subject: [PATCH 3/5] remove ticker --- .../io/grpc/netty/NettyServerHandler.java | 38 +------------ .../io/grpc/netty/NettyServerHandlerTest.java | 55 ++----------------- 2 files changed, 9 insertions(+), 84 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index a5c98656976..a37c4cbefaf 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -99,7 +99,6 @@ class NettyServerHandler extends AbstractNettyHandler { private static final long KEEPALIVE_PING = 0xDEADL; private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); - private static final SystemTicker SYSTEM_TICKER = new SystemTicker(); private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; @@ -128,8 +127,6 @@ class NettyServerHandler extends AbstractNettyHandler { @CheckForNull private GracefulShutdown gracefulShutdown; - private Ticker ticker = SYSTEM_TICKER; - static NettyServerHandler newHandler( ServerTransportListener transportListener, ChannelPromise channelUnused, @@ -322,7 +319,7 @@ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { new LogExceptionRunnable(new Runnable() { @Override public void run() { - if (gracefulShutdown != null) { + if (gracefulShutdown == null) { gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos); gracefulShutdown.start(ctx); ctx.flush(); @@ -840,10 +837,6 @@ private final class GracefulShutdown { */ boolean pingAckedOrTimeout; - /** - * Deadline of the shutdown. - */ - long deadline; Future pingFuture; GracefulShutdown(String goAwayMessage, @@ -864,10 +857,6 @@ void start(final ChannelHandlerContext ctx) { ctx.newPromise()); long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS; - if (graceTimeInNanos != null) { - deadline = ticker.read() + graceTimeInNanos; - gracefulShutdownPingTimeout = Math.min(gracefulShutdownPingTimeout, graceTimeInNanos); - } pingFuture = ctx.executor().schedule( new Runnable() { @Override @@ -875,7 +864,7 @@ public void run() { secondGoAwayAndClose(ctx); } }, - gracefulShutdownPingTimeout, + GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS); encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise()); @@ -902,7 +891,7 @@ void secondGoAwayAndClose(ChannelHandlerContext ctx) { long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis; if (graceTimeInNanos != null) { - gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(deadline - ticker.read()); + gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos); } try { gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis); @@ -949,25 +938,4 @@ public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2 padding, endStream, promise); } } - - @VisibleForTesting - void setTickerForTest(Ticker ticker) { - this.ticker = ticker; - } - - // TODO(zsurocking): Classes below are copied from Deadline.java. We should consider share the - // code. - - /** Time source representing nanoseconds since fixed but arbitrary point in time. */ - abstract static class Ticker { - /** Returns the number of nanoseconds since this source's epoch. */ - public abstract long read(); - } - - private static class SystemTicker extends Ticker { - @Override - public long read() { - return System.nanoTime(); - } - } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index b119ef3f4b5..8be0ba6eca2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -67,7 +67,6 @@ import io.grpc.internal.StreamListener; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; -import io.grpc.netty.NettyServerHandler.Ticker; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -199,14 +198,6 @@ protected void manualSetUp() throws Exception { // Simulate receipt of initial remote settings. ByteBuf serializedSettings = serializeSettings(new Http2Settings()); channelRead(serializedSettings); - - handler().setTickerForTest( - new Ticker() { - @Override - public long read() { - return fakeClock().getTicker().read(); - } - }); } @Test @@ -884,7 +875,7 @@ public void maxConnectionAge_goAwaySent_pingAck() throws Exception { eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); - channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); // irrelevant ping Ack + channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack verifyWrite(never()).writeGoAway( eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); @@ -982,12 +973,15 @@ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); - fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); + fakeClock().forwardNanos(TimeUnit.SECONDS.toNanos(10)); - // second GOA_WAY sent + // second GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + + fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - 2); + assertTrue(channel().isOpen()); fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); @@ -1026,49 +1020,12 @@ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck() eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); - fakeClock().forwardNanos( - maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(pingRoundTripMillis) - - TimeUnit.MILLISECONDS.toNanos(2)); - - assertTrue(channel().isOpen()); - - fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); - - // channel closed - assertTrue(!channel().isOpen()); - } - - @Test - public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withGraceLessThanPingTimeout() - throws Exception { - maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); - maxConnectionAgeGraceInNanos = TimeUnit.SECONDS.toNanos(5L); // less than ping timeout - manualSetUp(); - createStream(); - - fakeClock().forwardNanos(maxConnectionAgeInNanos); - - // first GO_AWAY sent - verifyWrite().writeGoAway( - eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), - any(ChannelPromise.class)); - // ping sent - verifyWrite().writePing( - eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); - fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); - verifyWrite(never()).writeGoAway( - eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), - any(ChannelPromise.class)); assertTrue(channel().isOpen()); fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); - // second GO_AWAY sent - verifyWrite().writeGoAway( - eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), - any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } From 0f9e258199551dec2472fb704c62349966011f83 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 26 Mar 2018 17:26:50 -0700 Subject: [PATCH 4/5] assertion --- netty/src/main/java/io/grpc/netty/NettyServerHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index a37c4cbefaf..26e4c13e780 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -771,7 +771,9 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce decoder().flowController().initialWindowSize(connection().connectionStream()))); } } else if (data == GRACEFUL_SHUTDOWN_PING) { - checkNotNull(gracefulShutdown, "gracefulShutdownRunner"); + if (gracefulShutdown == null) { + throw new AssertionError("gracefulShutdown is null"); + } gracefulShutdown.secondGoAwayAndClose(ctx); } else if (data != KEEPALIVE_PING) { logger.warning("Received unexpected ping ack. No ping outstanding"); From 61179519e9a1b0386e79d3bd5a8067e17e04c533 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 28 Mar 2018 14:54:56 -0700 Subject: [PATCH 5/5] log warning instead of throw --- netty/src/main/java/io/grpc/netty/NettyServerHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 26e4c13e780..47c628e0cca 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -772,9 +772,11 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce } } else if (data == GRACEFUL_SHUTDOWN_PING) { if (gracefulShutdown == null) { - throw new AssertionError("gracefulShutdown is null"); + // this should never happen + logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null"); + } else { + gracefulShutdown.secondGoAwayAndClose(ctx); } - gracefulShutdown.secondGoAwayAndClose(ctx); } else if (data != KEEPALIVE_PING) { logger.warning("Received unexpected ping ack. No ping outstanding"); }