From f6af33fe2a2e6ea3e93ae79b01d5900ff8554a63 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 20 Mar 2023 17:21:35 -0700 Subject: [PATCH 1/3] okhttp: forceful close after MAX_CONNECTION_AGE_GRACE_TIME --- .../io/grpc/okhttp/OkHttpServerTransport.java | 19 ++++++++++++++----- .../okhttp/OkHttpServerTransportTest.java | 18 ++++++++++++------ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index 1fd98079ede..805c88f458a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; import okio.BufferedSource; @@ -73,6 +74,9 @@ final class OkHttpServerTransport implements ServerTransport, ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport { private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName()); private static final int GRACEFUL_SHUTDOWN_PING = 0x1111; + + private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1); + private static final int KEEPALIVE_PING = 0xDEAD; private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method"); private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT"); @@ -250,10 +254,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount @Override public void shutdown() { - shutdown(TimeUnit.SECONDS.toNanos(1L)); + shutdown(null); } - private void shutdown(Long graceTimeInNanos) { + private void shutdown(@Nullable Long graceTimeInNanos) { synchronized (lock) { if (gracefulShutdown || abruptShutdown) { return; @@ -267,7 +271,8 @@ private void shutdown(Long graceTimeInNanos) { // we also set a timer to limit the upper bound in case the PING is excessively stalled or // the client is malicious. secondGoawayTimer = scheduledExecutorService.schedule( - this::triggerGracefulSecondGoaway, graceTimeInNanos, TimeUnit.NANOSECONDS); + () -> triggerGracefulSecondGoaway(graceTimeInNanos), + GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS); frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]); frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING); frameWriter.flush(); @@ -275,7 +280,7 @@ private void shutdown(Long graceTimeInNanos) { } } - private void triggerGracefulSecondGoaway() { + private void triggerGracefulSecondGoaway(@Nullable Long gracePeriodNanos) { synchronized (lock) { if (secondGoawayTimer == null) { return; @@ -289,6 +294,10 @@ private void triggerGracefulSecondGoaway() { } else { frameWriter.flush(); } + if (gracePeriodNanos != null) { + forcefulCloseTimer = scheduledExecutorService.schedule( + this::triggerForcefulClose, gracePeriodNanos, TimeUnit.NANOSECONDS); + } } } @@ -926,7 +935,7 @@ public void ping(boolean ack, int payload1, int payload2) { return; } if (GRACEFUL_SHUTDOWN_PING == payload) { - triggerGracefulSecondGoaway(); + triggerGracefulSecondGoaway(null); return; } log.log(Level.INFO, "Received unexpected ping ack: " + payload); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index b58f21b0a52..f087f7fe528 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -70,6 +70,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import okio.Buffer; import okio.BufferedSource; import okio.ByteString; @@ -170,7 +171,7 @@ public void maxConnectionAge() throws Exception { pingPong(); fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5 fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1)); - verifyGracefulShutdown(1); + verifyGracefulShutdown(1, TimeUnit.SECONDS.toNanos(1)); } @Test @@ -204,7 +205,7 @@ public void maxConnectionIdleTimer() throws Exception { fakeClock.forwardNanos(MAX_CONNECTION_IDLE); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); - verifyGracefulShutdown(1); + verifyGracefulShutdown(1, null); } @Test @@ -228,7 +229,7 @@ public void maxConnectionIdleTimer_respondWithError() throws Exception { pingPong(); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); - verifyGracefulShutdown(1); + verifyGracefulShutdown(1, null); } @Test @@ -402,7 +403,7 @@ public void activeRpc_delaysShutdownTermination() throws Exception { pingPong(); serverTransport.shutdown(); - verifyGracefulShutdown(1); + verifyGracefulShutdown(1, null); verify(transportListener, never()).transportTerminated(); MockStreamListener streamListener = mockTransportListener.newStreams.pop(); @@ -1219,7 +1220,7 @@ private Metadata metadata(String... keysAndValues) { return metadata; } - private void verifyGracefulShutdown(int lastStreamId) + private void verifyGracefulShutdown(int lastStreamId, @Nullable Long gracePeriodNanos) throws IOException { assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY); @@ -1229,12 +1230,16 @@ private void verifyGracefulShutdown(int lastStreamId) clientFrameWriter.flush(); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); verify(clientFramesRead).goAway(lastStreamId, ErrorCode.NO_ERROR, ByteString.EMPTY); + if (gracePeriodNanos != null) { + assertThat(fakeClock.forwardNanos(gracePeriodNanos)).isEqualTo(1); + assertThat(socket.isClosed()).isTrue(); + } } private void shutdownAndTerminate(int lastStreamId) throws IOException { assertThat(serverTransport.getActiveStreams().length).isEqualTo(0); serverTransport.shutdown(); - verifyGracefulShutdown(lastStreamId); + verifyGracefulShutdown(lastStreamId, null); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse(); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); } @@ -1369,6 +1374,7 @@ public synchronized void close() throws IOException { // PipedInputStream can only be woken by PipedOutputStream, so PipedOutputStream.close() is // a better imitation of Socket.close(). inputStreamSource.close(); + super.close(); } } From e3c569065dbd24009391940332355265db45d092 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 21 Mar 2023 11:19:32 -0700 Subject: [PATCH 2/3] graceful shutdown --- .../io/grpc/okhttp/OkHttpServerTransport.java | 22 +++++++++--------- .../okhttp/OkHttpServerTransportTest.java | 23 ++++++++----------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index 805c88f458a..7ef5b84446f 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -60,7 +60,6 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; import okio.BufferedSource; @@ -136,6 +135,8 @@ final class OkHttpServerTransport implements ServerTransport, /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */ @GuardedBy("lock") private ScheduledFuture forcefulCloseTimer; + @GuardedBy("lock") + private Long gracefulShutdownPeriod = null; public OkHttpServerTransport(Config config, Socket bareSocket) { this.config = Preconditions.checkNotNull(config, "config"); @@ -233,8 +234,11 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { long maxConnectionAgeInNanos = (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos); + synchronized (lock) { + gracefulShutdownPeriod = config.maxConnectionAgeGraceInNanos; + } maxConnectionAgeMonitor = scheduledExecutorService.schedule( - new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)), + new LogExceptionRunnable(this::shutdown), maxConnectionAgeInNanos, TimeUnit.NANOSECONDS); } @@ -254,10 +258,6 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount @Override public void shutdown() { - shutdown(null); - } - - private void shutdown(@Nullable Long graceTimeInNanos) { synchronized (lock) { if (gracefulShutdown || abruptShutdown) { return; @@ -271,7 +271,7 @@ private void shutdown(@Nullable Long graceTimeInNanos) { // we also set a timer to limit the upper bound in case the PING is excessively stalled or // the client is malicious. secondGoawayTimer = scheduledExecutorService.schedule( - () -> triggerGracefulSecondGoaway(graceTimeInNanos), + this::triggerGracefulSecondGoaway, GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS); frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]); frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING); @@ -280,7 +280,7 @@ private void shutdown(@Nullable Long graceTimeInNanos) { } } - private void triggerGracefulSecondGoaway(@Nullable Long gracePeriodNanos) { + private void triggerGracefulSecondGoaway() { synchronized (lock) { if (secondGoawayTimer == null) { return; @@ -294,9 +294,9 @@ private void triggerGracefulSecondGoaway(@Nullable Long gracePeriodNanos) { } else { frameWriter.flush(); } - if (gracePeriodNanos != null) { + if (gracefulShutdownPeriod != null) { forcefulCloseTimer = scheduledExecutorService.schedule( - this::triggerForcefulClose, gracePeriodNanos, TimeUnit.NANOSECONDS); + this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS); } } } @@ -935,7 +935,7 @@ public void ping(boolean ack, int payload1, int payload2) { return; } if (GRACEFUL_SHUTDOWN_PING == payload) { - triggerGracefulSecondGoaway(null); + triggerGracefulSecondGoaway(); return; } log.log(Level.INFO, "Received unexpected ping ack: " + payload); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index f087f7fe528..92d6abb60eb 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -70,7 +70,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import okio.Buffer; import okio.BufferedSource; import okio.ByteString; @@ -156,7 +155,7 @@ public void startThenShutdown() throws Exception { @Test public void maxConnectionAge() throws Exception { serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS) - .maxConnectionAgeGrace(1, TimeUnit.SECONDS); + .maxConnectionAgeGrace(3, TimeUnit.SECONDS); initTransport(); handshake(); clientFrameWriter.headers(1, Arrays.asList( @@ -170,8 +169,10 @@ public void maxConnectionAge() throws Exception { new Header("some-client-sent-trailer", "trailer-value"))); pingPong(); fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5 - fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1)); - verifyGracefulShutdown(1, TimeUnit.SECONDS.toNanos(1)); + verifyGracefulShutdown(1); + pingPong(); + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(3)); + assertThat(socket.isClosed()).isTrue(); } @Test @@ -205,7 +206,7 @@ public void maxConnectionIdleTimer() throws Exception { fakeClock.forwardNanos(MAX_CONNECTION_IDLE); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); - verifyGracefulShutdown(1, null); + verifyGracefulShutdown(1); } @Test @@ -229,7 +230,7 @@ public void maxConnectionIdleTimer_respondWithError() throws Exception { pingPong(); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); fakeClock.forwardNanos(MAX_CONNECTION_IDLE); - verifyGracefulShutdown(1, null); + verifyGracefulShutdown(1); } @Test @@ -403,7 +404,7 @@ public void activeRpc_delaysShutdownTermination() throws Exception { pingPong(); serverTransport.shutdown(); - verifyGracefulShutdown(1, null); + verifyGracefulShutdown(1); verify(transportListener, never()).transportTerminated(); MockStreamListener streamListener = mockTransportListener.newStreams.pop(); @@ -1220,7 +1221,7 @@ private Metadata metadata(String... keysAndValues) { return metadata; } - private void verifyGracefulShutdown(int lastStreamId, @Nullable Long gracePeriodNanos) + private void verifyGracefulShutdown(int lastStreamId) throws IOException { assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY); @@ -1230,16 +1231,12 @@ private void verifyGracefulShutdown(int lastStreamId, @Nullable Long gracePeriod clientFrameWriter.flush(); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue(); verify(clientFramesRead).goAway(lastStreamId, ErrorCode.NO_ERROR, ByteString.EMPTY); - if (gracePeriodNanos != null) { - assertThat(fakeClock.forwardNanos(gracePeriodNanos)).isEqualTo(1); - assertThat(socket.isClosed()).isTrue(); - } } private void shutdownAndTerminate(int lastStreamId) throws IOException { assertThat(serverTransport.getActiveStreams().length).isEqualTo(0); serverTransport.shutdown(); - verifyGracefulShutdown(lastStreamId, null); + verifyGracefulShutdown(lastStreamId); assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse(); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); } From 6486b86531a8896d68cc75300426596cb2a41765 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 27 Mar 2023 11:06:24 -0700 Subject: [PATCH 3/3] server.shutdown should not trigger forcefulclosure --- .../java/io/grpc/okhttp/OkHttpServerTransport.java | 11 +++++++---- .../io/grpc/okhttp/OkHttpServerTransportTest.java | 10 ++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index 7ef5b84446f..5ec393e1073 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; import okio.BufferedSource; @@ -234,11 +235,8 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { long maxConnectionAgeInNanos = (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos); - synchronized (lock) { - gracefulShutdownPeriod = config.maxConnectionAgeGraceInNanos; - } maxConnectionAgeMonitor = scheduledExecutorService.schedule( - new LogExceptionRunnable(this::shutdown), + new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)), maxConnectionAgeInNanos, TimeUnit.NANOSECONDS); } @@ -258,11 +256,16 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount @Override public void shutdown() { + shutdown(null); + } + + private void shutdown(@Nullable Long gracefulShutdownPeriod) { synchronized (lock) { if (gracefulShutdown || abruptShutdown) { return; } gracefulShutdown = true; + this.gracefulShutdownPeriod = gracefulShutdownPeriod; if (frameWriter == null) { handshakeShutdown = true; GrpcUtil.closeQuietly(bareSocket); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index 92d6abb60eb..816272fbc98 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -175,6 +175,16 @@ public void maxConnectionAge() throws Exception { assertThat(socket.isClosed()).isTrue(); } + @Test + public void maxConnectionAge_shutdown() throws Exception { + serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS) + .maxConnectionAgeGrace(3, TimeUnit.SECONDS); + initTransport(); + handshake(); + shutdownAndTerminate(0); + assertThat(fakeClock.numPendingTasks()).isEqualTo(0); + } + @Test public void maxConnectionIdleTimer() throws Exception { initTransport();