Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 102 additions & 31 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand All @@ -96,6 +97,8 @@
class NettyServerHandler extends AbstractNettyHandler {
private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
private static final long KEEPALIVE_PING = 0xDEADL;
private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);

private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
Expand All @@ -121,6 +124,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private MaxConnectionIdleManager maxConnectionIdleManager;
@CheckForNull
private ScheduledFuture<?> maxConnectionAgeMonitor;
@CheckForNull
private GracefulShutdown gracefulShutdown;

static NettyServerHandler newHandler(
ServerTransportListener transportListener,
Expand Down Expand Up @@ -250,17 +255,10 @@ private NettyServerHandler(
maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) {
@Override
void close(ChannelHandlerContext ctx) {
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), "max_idle"),
ctx.newPromise());
ctx.flush();
try {
NettyServerHandler.this.close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_idle", null);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
};
Expand Down Expand Up @@ -321,25 +319,10 @@ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
// send GO_AWAY
ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age");
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
debugData,
ctx.newPromise());

// gracefully shutdown with specified grace time
long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis();
try {
gracefulShutdownTimeoutMillis(
TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos));
close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
} finally {
gracefulShutdownTimeoutMillis(savedGracefulShutdownTime);
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
}),
Expand Down Expand Up @@ -787,6 +770,13 @@ public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exce
logger.log(Level.FINE, String.format("Window: %d",
decoder().flowController().initialWindowSize(connection().connectionStream())));
}
} else if (data == GRACEFUL_SHUTDOWN_PING) {
if (gracefulShutdown == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I remember why I put that comment. This is doing a check based on what is received. In your earlier response you said:

It will never throw a RuntimeException unless there's a bug in the code.

But the bug could be in the remote code, not in this code. That's why I had suggested a warning.

// this should never happen
logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
} else {
gracefulShutdown.secondGoAwayAndClose(ctx);
}
} else if (data != KEEPALIVE_PING) {
logger.warning("Received unexpected ping ack. No ping outstanding");
}
Expand All @@ -803,7 +793,6 @@ private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger
@Override
public void ping() {
ChannelFuture pingFuture = encoder().writePing(
// slice KEEPALIVE_PING because tls handler may modify the reader index
ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
ctx.flush();
if (transportTracer != null) {
Expand Down Expand Up @@ -837,6 +826,88 @@ public void onPingTimeout() {
}
}

private final class GracefulShutdown {
String goAwayMessage;

/**
* The grace time between starting graceful shutdown and closing the netty channel,
* {@code null} is unspecified.
*/
@CheckForNull
Long graceTimeInNanos;

/**
* True if ping is Acked or ping is timeout.
*/
boolean pingAckedOrTimeout;

Future<?> pingFuture;

GracefulShutdown(String goAwayMessage,
@Nullable Long graceTimeInNanos) {
this.goAwayMessage = goAwayMessage;
this.graceTimeInNanos = graceTimeInNanos;
}

/**
* Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
*/
void start(final ChannelHandlerContext ctx) {
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
ctx.newPromise());

long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS;
pingFuture = ctx.executor().schedule(
new Runnable() {
@Override
public void run() {
secondGoAwayAndClose(ctx);
}
},
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
TimeUnit.NANOSECONDS);

encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
}

void secondGoAwayAndClose(ChannelHandlerContext ctx) {
if (pingAckedOrTimeout) {
return;
}
pingAckedOrTimeout = true;

checkNotNull(pingFuture, "pingFuture");
pingFuture.cancel(false);

// send the second GOAWAY with last stream id
goAway(
ctx,
connection().remote().lastStreamCreated(),
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
ctx.newPromise());

// gracefully shutdown with specified grace time
long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis;
if (graceTimeInNanos != null) {
gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
}
try {
gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
} finally {
gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
}
}
}

// Use a frame writer so that we know when frames are through flow control and actually being
// written.
private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
Expand Down
Loading