From 17526f7a5582fd0008b26c08b4689af411ef60fa Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 8 Apr 2022 11:29:02 -0700 Subject: [PATCH 1/2] okhttp: Use ObjectPool for executors internally in Builder This matches what we do in ManagedChannelImplBuilder and NettyChannelBuilder. It also fixes a (probably unimportant) bug where the factory returned from swapChannelCredentials() didn't have its references to the executors so could not outlive the parent factory. --- .../io/grpc/okhttp/OkHttpChannelBuilder.java | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java index df5f8781fee..ffc29138b95 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java @@ -37,13 +37,15 @@ import io.grpc.internal.AtomicBackoff; import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ManagedChannelImplBuilder; import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider; import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; -import io.grpc.internal.SharedResourceHolder; +import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourceHolder.Resource; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.grpc.okhttp.internal.CipherSuite; import io.grpc.okhttp.internal.ConnectionSpec; @@ -137,6 +139,8 @@ public void close(Executor executor) { ((ExecutorService) executor).shutdown(); } }; + private static final ObjectPool DEFAULT_TRANSPORT_EXECUTOR_POOL = + SharedResourcePool.forResource(SHARED_EXECUTOR); /** Creates a new builder for the given server host and port. */ public static OkHttpChannelBuilder forAddress(String host, int port) { @@ -168,8 +172,9 @@ public static OkHttpChannelBuilder forTarget(String target, ChannelCredentials c return new OkHttpChannelBuilder(target, creds, result.callCredentials, result.factory); } - private Executor transportExecutor; - private ScheduledExecutorService scheduledExecutorService; + private ObjectPool transportExecutorPool = DEFAULT_TRANSPORT_EXECUTOR_POOL; + private ObjectPool scheduledExecutorServicePool = + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); private SocketFactory socketFactory; private SSLSocketFactory sslSocketFactory; @@ -247,7 +252,11 @@ OkHttpChannelBuilder setTransportTracerFactory(TransportTracer.Factory transport * to shutdown the executor when appropriate. */ public OkHttpChannelBuilder transportExecutor(@Nullable Executor transportExecutor) { - this.transportExecutor = transportExecutor; + if (transportExecutor == null) { + this.transportExecutorPool = DEFAULT_TRANSPORT_EXECUTOR_POOL; + } else { + this.transportExecutorPool = new FixedObjectPool<>(transportExecutor); + } return this; } @@ -468,8 +477,8 @@ public OkHttpChannelBuilder useTransportSecurity() { */ public OkHttpChannelBuilder scheduledExecutorService( ScheduledExecutorService scheduledExecutorService) { - this.scheduledExecutorService = - checkNotNull(scheduledExecutorService, "scheduledExecutorService"); + this.scheduledExecutorServicePool = + new FixedObjectPool<>(checkNotNull(scheduledExecutorService, "scheduledExecutorService")); return this; } @@ -508,8 +517,8 @@ public OkHttpChannelBuilder maxInboundMessageSize(int max) { OkHttpTransportFactory buildTransportFactory() { boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; return new OkHttpTransportFactory( - transportExecutor, - scheduledExecutorService, + transportExecutorPool, + scheduledExecutorServicePool, socketFactory, createSslSocketFactory(), hostnameVerifier, @@ -712,9 +721,10 @@ public SslSocketFactoryResult withCallCredentials(CallCredentials callCreds) { */ @Internal static final class OkHttpTransportFactory implements ClientTransportFactory { + private final ObjectPool executorPool; final Executor executor; - private final boolean usingSharedExecutor; - private final boolean usingSharedScheduler; + private final ObjectPool timeoutServicePool; + private final ScheduledExecutorService timeoutService; final TransportTracer.Factory transportTracerFactory; final SocketFactory socketFactory; @Nullable final SSLSocketFactory sslSocketFactory; @@ -729,13 +739,12 @@ static final class OkHttpTransportFactory implements ClientTransportFactory { final int flowControlWindow; private final boolean keepAliveWithoutCalls; final int maxInboundMetadataSize; - private final ScheduledExecutorService timeoutService; final boolean useGetForSafeMethods; private boolean closed; private OkHttpTransportFactory( - Executor executor, - @Nullable ScheduledExecutorService timeoutService, + ObjectPool executorPool, + ObjectPool timeoutServicePool, @Nullable SocketFactory socketFactory, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, @@ -749,9 +758,10 @@ private OkHttpTransportFactory( int maxInboundMetadataSize, TransportTracer.Factory transportTracerFactory, boolean useGetForSafeMethods) { - usingSharedScheduler = timeoutService == null; - this.timeoutService = usingSharedScheduler - ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService; + this.executorPool = executorPool; + this.executor = executorPool.getObject(); + this.timeoutServicePool = timeoutServicePool; + this.timeoutService = timeoutServicePool.getObject(); this.socketFactory = socketFactory; this.sslSocketFactory = sslSocketFactory; this.hostnameVerifier = hostnameVerifier; @@ -766,15 +776,8 @@ private OkHttpTransportFactory( this.maxInboundMetadataSize = maxInboundMetadataSize; this.useGetForSafeMethods = useGetForSafeMethods; - usingSharedExecutor = executor == null; this.transportTracerFactory = Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory"); - if (usingSharedExecutor) { - // The executor was unspecified, using the shared executor. - this.executor = SharedResourceHolder.get(SHARED_EXECUTOR); - } else { - this.executor = executor; - } } @Override @@ -821,8 +824,8 @@ public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials ch return null; } ClientTransportFactory factory = new OkHttpTransportFactory( - executor, - timeoutService, + executorPool, + timeoutServicePool, socketFactory, result.factory, hostnameVerifier, @@ -846,13 +849,8 @@ public void close() { } closed = true; - if (usingSharedScheduler) { - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); - } - - if (usingSharedExecutor) { - SharedResourceHolder.release(SHARED_EXECUTOR, executor); - } + executorPool.returnObject(executor); + timeoutServicePool.returnObject(timeoutService); } } } From 1b70f03ffee1cd87b0080bf9b3f2be89fd274cfd Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 8 Apr 2022 11:52:35 -0700 Subject: [PATCH 2/2] okhttp: Allow keepalive scheduled executor to be overridden Users should be able to inject all executors. The transport shouldn't be hard-coded to create the TIMER_SERVICE, especially since a scheduler is already available to the builder. --- .../io/grpc/okhttp/OkHttpChannelBuilder.java | 16 ++++++++-------- .../io/grpc/okhttp/OkHttpClientTransport.java | 9 +++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java index ffc29138b95..e4df0b50e24 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java @@ -723,8 +723,8 @@ public SslSocketFactoryResult withCallCredentials(CallCredentials callCreds) { static final class OkHttpTransportFactory implements ClientTransportFactory { private final ObjectPool executorPool; final Executor executor; - private final ObjectPool timeoutServicePool; - private final ScheduledExecutorService timeoutService; + private final ObjectPool scheduledExecutorServicePool; + final ScheduledExecutorService scheduledExecutorService; final TransportTracer.Factory transportTracerFactory; final SocketFactory socketFactory; @Nullable final SSLSocketFactory sslSocketFactory; @@ -744,7 +744,7 @@ static final class OkHttpTransportFactory implements ClientTransportFactory { private OkHttpTransportFactory( ObjectPool executorPool, - ObjectPool timeoutServicePool, + ObjectPool scheduledExecutorServicePool, @Nullable SocketFactory socketFactory, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, @@ -760,8 +760,8 @@ private OkHttpTransportFactory( boolean useGetForSafeMethods) { this.executorPool = executorPool; this.executor = executorPool.getObject(); - this.timeoutServicePool = timeoutServicePool; - this.timeoutService = timeoutServicePool.getObject(); + this.scheduledExecutorServicePool = scheduledExecutorServicePool; + this.scheduledExecutorService = scheduledExecutorServicePool.getObject(); this.socketFactory = socketFactory; this.sslSocketFactory = sslSocketFactory; this.hostnameVerifier = hostnameVerifier; @@ -812,7 +812,7 @@ public void run() { @Override public ScheduledExecutorService getScheduledExecutorService() { - return timeoutService; + return scheduledExecutorService; } @Nullable @@ -825,7 +825,7 @@ public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials ch } ClientTransportFactory factory = new OkHttpTransportFactory( executorPool, - timeoutServicePool, + scheduledExecutorServicePool, socketFactory, result.factory, hostnameVerifier, @@ -850,7 +850,7 @@ public void close() { closed = true; executorPool.returnObject(executor); - timeoutServicePool.returnObject(timeoutService); + scheduledExecutorServicePool.returnObject(scheduledExecutorService); } } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index d3ae4066559..3194f0c02e7 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -17,7 +17,6 @@ package io.grpc.okhttp; import static com.google.common.base.Preconditions.checkState; -import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; @@ -52,7 +51,6 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; import io.grpc.internal.SerializingExecutor; -import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler; @@ -162,6 +160,7 @@ private static Map buildErrorCodeToStatusMap() { private final Executor executor; // Wrap on executor, to guarantee some operations be executed serially. private final SerializingExecutor serializingExecutor; + private final ScheduledExecutorService scheduler; private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; @@ -191,7 +190,6 @@ private static Map buildErrorCodeToStatusMap() { @GuardedBy("lock") private final Deque pendingStreams = new LinkedList<>(); private final ConnectionSpec connectionSpec; - private ScheduledExecutorService scheduler; private KeepAliveManager keepAliveManager; private boolean enableKeepAlive; private long keepAliveTimeNanos; @@ -262,6 +260,8 @@ private OkHttpClientTransport( this.initialWindowSize = transportFactory.flowControlWindow; this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor"); serializingExecutor = new SerializingExecutor(transportFactory.executor); + this.scheduler = Preconditions.checkNotNull( + transportFactory.scheduledExecutorService, "scheduledExecutorService"); // Client initiated streams are odd, server initiated ones are even. Server should not need to // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. nextStreamId = 3; @@ -472,7 +472,6 @@ public Runnable start(Listener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); if (enableKeepAlive) { - scheduler = SharedResourceHolder.get(TIMER_SERVICE); keepAliveManager = new KeepAliveManager( new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls); @@ -949,8 +948,6 @@ private void stopIfNecessary() { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); - // KeepAliveManager should stop using the scheduler after onTransportTermination gets called. - scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler); } if (ping != null) {