Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
Expand Down Expand Up @@ -105,6 +106,7 @@ public static OkHttpChannelBuilder forTarget(String target) {
}

private Executor transportExecutor;
private ScheduledExecutorService scheduledExecutorService;

private SSLSocketFactory sslSocketFactory;
private HostnameVerifier hostnameVerifier;
Expand Down Expand Up @@ -306,11 +308,28 @@ public final OkHttpChannelBuilder useTransportSecurity() {
return this;
}

/**
* Provides a custom scheduled executor service.
*
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
* the channel is built, the builder will use a static cached thread pool.
*
* @return this
*
* @since 1.11.0
*/
public final OkHttpChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
return this;
}

@Override
@Internal
protected final ClientTransportFactory buildTransportFactory() {
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(transportExecutor,
return new OkHttpTransportFactory(transportExecutor, scheduledExecutorService,
createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory);
Expand Down Expand Up @@ -379,6 +398,7 @@ SSLSocketFactory createSocketFactory() {
static final class OkHttpTransportFactory implements ClientTransportFactory {
private final Executor executor;
private final boolean usingSharedExecutor;
private final boolean usingSharedScheduler;
private final TransportTracer.Factory transportTracerFactory;
@Nullable
private final SSLSocketFactory socketFactory;
Expand All @@ -390,11 +410,11 @@ static final class OkHttpTransportFactory implements ClientTransportFactory {
private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private final ScheduledExecutorService timeoutService =
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private final ScheduledExecutorService timeoutService;
private boolean closed;

private OkHttpTransportFactory(Executor executor,
@Nullable ScheduledExecutorService timeoutService,
@Nullable SSLSocketFactory socketFactory,
@Nullable HostnameVerifier hostnameVerifier,
ConnectionSpec connectionSpec,
Expand All @@ -404,6 +424,9 @@ private OkHttpTransportFactory(Executor executor,
long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
this.socketFactory = socketFactory;
this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = connectionSpec;
Expand Down Expand Up @@ -471,7 +494,10 @@ public void close() {
return;
}
closed = true;
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);

if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}

if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor);
Expand Down
35 changes: 35 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@

package io.grpc.okhttp;

import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;

import com.squareup.okhttp.ConnectionSpec;
import io.grpc.NameResolver;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -128,5 +134,34 @@ public void usePlaintextCreatesNullSocketFactory() {
builder.usePlaintext(true);
assertNull(builder.createSocketFactory());
}

@Test
public void scheduledExecutorService_default() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo");
ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
assertSame(
SharedResourceHolder.get(TIMER_SERVICE),
clientTransportFactory.getScheduledExecutorService());

SharedResourceHolder.release(
TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}

@Test
public void scheduledExecutorService_custom() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo");
ScheduledExecutorService scheduledExecutorService =
new FakeClock().getScheduledExecutorService();

OkHttpChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);

ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();

assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());

clientTransportFactory.close();
}
}