From 7e04fc42efc830ae85736f9e3008f3b68c3a140a Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Mon, 26 Feb 2018 17:38:11 -0800 Subject: [PATCH] core: allow application to provide all threads - okhttp channel --- .../io/grpc/okhttp/OkHttpChannelBuilder.java | 34 +++++++++++++++--- .../grpc/okhttp/OkHttpChannelBuilderTest.java | 35 +++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java index 068ab0e87bb..8e3efb26585 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java @@ -16,6 +16,7 @@ package io.grpc.okhttp; +import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; @@ -105,6 +106,7 @@ public static OkHttpChannelBuilder forTarget(String target) { } private Executor transportExecutor; + private ScheduledExecutorService scheduledExecutorService; private SSLSocketFactory sslSocketFactory; private HostnameVerifier hostnameVerifier; @@ -306,11 +308,28 @@ public final OkHttpChannelBuilder useTransportSecurity() { return this; } + /** + * Provides a custom scheduled executor service. + * + *

It's an optional parameter. If the user has not provided a scheduled executor service when + * the channel is built, the builder will use a static cached thread pool. + * + * @return this + * + * @since 1.11.0 + */ + public final OkHttpChannelBuilder scheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = + checkNotNull(scheduledExecutorService, "scheduledExecutorService"); + return this; + } + @Override @Internal protected final ClientTransportFactory buildTransportFactory() { boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; - return new OkHttpTransportFactory(transportExecutor, + return new OkHttpTransportFactory(transportExecutor, scheduledExecutorService, createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(), enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory); @@ -379,6 +398,7 @@ SSLSocketFactory createSocketFactory() { static final class OkHttpTransportFactory implements ClientTransportFactory { private final Executor executor; private final boolean usingSharedExecutor; + private final boolean usingSharedScheduler; private final TransportTracer.Factory transportTracerFactory; @Nullable private final SSLSocketFactory socketFactory; @@ -390,11 +410,11 @@ static final class OkHttpTransportFactory implements ClientTransportFactory { private final AtomicBackoff keepAliveTimeNanos; private final long keepAliveTimeoutNanos; private final boolean keepAliveWithoutCalls; - private final ScheduledExecutorService timeoutService = - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + private final ScheduledExecutorService timeoutService; private boolean closed; private OkHttpTransportFactory(Executor executor, + @Nullable ScheduledExecutorService timeoutService, @Nullable SSLSocketFactory socketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, @@ -404,6 +424,9 @@ private OkHttpTransportFactory(Executor executor, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory) { + usingSharedScheduler = timeoutService == null; + this.timeoutService = usingSharedScheduler + ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService; this.socketFactory = socketFactory; this.hostnameVerifier = hostnameVerifier; this.connectionSpec = connectionSpec; @@ -471,7 +494,10 @@ public void close() { return; } closed = true; - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); + + if (usingSharedScheduler) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); + } if (usingSharedExecutor) { SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java index 1f362eb6cba..4eca7a13a78 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java @@ -16,14 +16,20 @@ package io.grpc.okhttp; +import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import com.squareup.okhttp.ConnectionSpec; import io.grpc.NameResolver; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourceHolder; import java.net.InetSocketAddress; +import java.util.concurrent.ScheduledExecutorService; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -128,5 +134,34 @@ public void usePlaintextCreatesNullSocketFactory() { builder.usePlaintext(true); assertNull(builder.createSocketFactory()); } + + @Test + public void scheduledExecutorService_default() { + OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo"); + ClientTransportFactory clientTransportFactory = builder.buildTransportFactory(); + assertSame( + SharedResourceHolder.get(TIMER_SERVICE), + clientTransportFactory.getScheduledExecutorService()); + + SharedResourceHolder.release( + TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService()); + clientTransportFactory.close(); + } + + @Test + public void scheduledExecutorService_custom() { + OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo"); + ScheduledExecutorService scheduledExecutorService = + new FakeClock().getScheduledExecutorService(); + + OkHttpChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService); + assertSame(builder, builder1); + + ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory(); + + assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService()); + + clientTransportFactory.close(); + } }