Skip to content

Commit

Permalink
Revert "okhttp: add max connection idle at OkHttpServer (#9494)" (#9528)
Browse files Browse the repository at this point in the history
This reverts commit 7291ad4.
  • Loading branch information
YifeiZhuang authored Sep 8, 2022
1 parent 53a2d50 commit 95b9d6d
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 119 deletions.
28 changes: 0 additions & 28 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.DoNotCall;
Expand Down Expand Up @@ -64,10 +62,6 @@
public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpServerBuilder> {
private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName());
private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;

static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL;
Expand Down Expand Up @@ -116,7 +110,6 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
int maxInboundMetadataSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;

@VisibleForTesting
OkHttpServerBuilder(
Expand Down Expand Up @@ -185,27 +178,6 @@ public OkHttpServerBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit)
return this;
}

/**
* Sets a custom max connection idle time, connection being idle for longer than which will be
* gracefully terminated. Idleness duration is defined since the most recent time the number of
* outstanding RPCs became zero or the connection establishment. An unreasonably small value might
* be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
* max connection idle.
*/
@Override
public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) {
checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s",
maxConnectionIdle);
maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle);
if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
}
if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) {
maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO;
}
return this;
}

/**
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
* without any read activity on the connection, the connection is considered dead. An unreasonably
Expand Down
23 changes: 0 additions & 23 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.grpc.okhttp;

import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -30,7 +28,6 @@
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.ServerTransport;
Expand Down Expand Up @@ -94,7 +91,6 @@ final class OkHttpServerTransport implements ServerTransport,
private ScheduledExecutorService scheduledExecutorService;
private Attributes attributes;
private KeepAliveManager keepAliveManager;
private MaxConnectionIdleManager maxConnectionIdleManager;

private final Object lock = new Object();
@GuardedBy("lock")
Expand Down Expand Up @@ -193,11 +189,6 @@ private void startIo(SerializingExecutor serializingExecutor) {
keepAliveManager.onTransportStarted();
}

if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) {
Expand Down Expand Up @@ -320,9 +311,6 @@ private void terminated() {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
}
if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination();
}
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
scheduledExecutorService =
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
Expand Down Expand Up @@ -381,9 +369,6 @@ public OutboundFlowController.StreamState[] getActiveStreams() {
void streamClosed(int streamId, boolean flush) {
synchronized (lock) {
streams.remove(streamId);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportIdle();
}
if (gracefulShutdown && streams.isEmpty()) {
frameWriter.close();
} else {
Expand Down Expand Up @@ -448,7 +433,6 @@ static final class Config {
final int flowControlWindow;
final int maxInboundMessageSize;
final int maxInboundMetadataSize;
final long maxConnectionIdleNanos;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -468,7 +452,6 @@ public Config(
flowControlWindow = builder.flowControlWindow;
maxInboundMessageSize = builder.maxInboundMessageSize;
maxInboundMetadataSize = builder.maxInboundMetadataSize;
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
}
}

Expand Down Expand Up @@ -714,9 +697,6 @@ public void headers(boolean outFinished,
authority == null ? null : asciiString(authority),
statsTraceCtx,
tracer);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportActive();
}
streams.put(streamId, stream);
listener.streamCreated(streamForApp, method, metadata);
stream.onStreamAllocated();
Expand Down Expand Up @@ -973,9 +953,6 @@ private void respondWithHttpError(
synchronized (lock) {
Http2ErrorStreamState stream =
new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportActive();
}
streams.put(streamId, stream);
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, true);
Expand Down
74 changes: 6 additions & 68 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
public class OkHttpServerTransportTest {
private static final int TIME_OUT_MS = 2000;
private static final int INITIAL_WINDOW_SIZE = 65535;
private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1);

private MockServerTransportListener mockTransportListener = new MockServerTransportListener();
private ServerTransportListener transportListener
Expand All @@ -106,11 +105,10 @@ public class OkHttpServerTransportTest {
private ExecutorService threadPool = Executors.newCachedThreadPool();
private HandshakerSocketFactory handshakerSocketFactory
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
private final FakeClock fakeClock = new FakeClock();
private OkHttpServerBuilder serverBuilder
= new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory)
.executor(new FakeClock().getScheduledExecutorService()) // Executor unused
.scheduledExecutorService(fakeClock.getScheduledExecutorService())
.scheduledExecutorService(new FakeClock().getScheduledExecutorService())
.transportExecutor(new Executor() {
@Override public void execute(Runnable runnable) {
if (runnable instanceof OkHttpServerTransport.FrameHandler) {
Expand All @@ -121,8 +119,7 @@ public class OkHttpServerTransportTest {
}
}
})
.flowControlWindow(INITIAL_WINDOW_SIZE)
.maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS);
.flowControlWindow(INITIAL_WINDOW_SIZE);

@Rule public final Timeout globalTimeout = Timeout.seconds(10);

Expand All @@ -149,63 +146,6 @@ public void startThenShutdown() throws Exception {
shutdownAndTerminate(/*lastStreamId=*/ 0);
}

@Test
public void maxConnectionIdleTimer() throws Exception {
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER));
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
assertThat(streamListener.messages.peek()).isNull();
assertThat(streamListener.halfClosedCalled).isTrue();

streamListener.stream.close(Status.OK, new Metadata());

List<Header> responseTrailers = Arrays.asList(
new Header(":status", "200"),
CONTENT_TYPE_HEADER,
new Header("grpc-status", "0"));
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead)
.headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS);

fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1);
}

@Test
public void maxConnectionIdleTimer_respondWithError() throws Exception {
initTransport();
handshake();

clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER,
new Header("host", "example.com:80"),
new Header("host", "example.com:80")));
clientFrameWriter.flush();

verifyHttpError(
1, 400, Status.Code.INTERNAL, "Multiple host headers disallowed. RFC7230 section 5.4");

fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1);
}

@Test
public void startThenShutdownTwice() throws Exception {
initTransport();
Expand Down Expand Up @@ -376,8 +316,7 @@ public void activeRpc_delaysShutdownTermination() throws Exception {
clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size());
pingPong();

serverTransport.shutdown();
verifyGracefulShutdown(1);
shutdownAndVerifyGraceful(1);
verify(transportListener, never()).transportTerminated();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
Expand Down Expand Up @@ -1099,8 +1038,8 @@ private Metadata metadata(String... keysAndValues) {
return metadata;
}

private void verifyGracefulShutdown(int lastStreamId)
throws IOException {
private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
serverTransport.shutdown();
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
Expand All @@ -1113,8 +1052,7 @@ private void verifyGracefulShutdown(int lastStreamId)

private void shutdownAndTerminate(int lastStreamId) throws IOException {
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
serverTransport.shutdown();
verifyGracefulShutdown(lastStreamId);
shutdownAndVerifyGraceful(lastStreamId);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
Expand Down

0 comments on commit 95b9d6d

Please sign in to comment.