Skip to content

Commit

Permalink
feat: support setting an async executor provider (#1263)
Browse files Browse the repository at this point in the history
* feat: support setting an async executor provider

* fix: set defaults in builder
  • Loading branch information
olavloite committed Jun 30, 2021
1 parent 3f0fa63 commit 369c8a7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,17 @@ public ServiceRpc create(SpannerOptions options) {
private static final AtomicInteger DEFAULT_POOL_COUNT = new AtomicInteger();

/** {@link ExecutorProvider} that is used for {@link AsyncResultSet}. */
interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
public interface CloseableExecutorProvider extends ExecutorProvider, AutoCloseable {
/** Overridden to suppress the throws declaration of the super interface. */
@Override
void close();
}

static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
/**
* Implementation of {@link CloseableExecutorProvider} that uses a fixed single {@link
* ScheduledExecutorService}.
*/
public static class FixedCloseableExecutorProvider implements CloseableExecutorProvider {
private final ScheduledExecutorService executor;

private FixedCloseableExecutorProvider(ScheduledExecutorService executor) {
Expand All @@ -500,7 +504,7 @@ public boolean shouldAutoClose() {
}

/** Creates a FixedCloseableExecutorProvider. */
static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
public static FixedCloseableExecutorProvider create(ScheduledExecutorService executor) {
return new FixedCloseableExecutorProvider(executor);
}
}
Expand All @@ -516,8 +520,19 @@ static CloseableExecutorProvider createDefaultAsyncExecutorProvider() {
return createAsyncExecutorProvider(8, 60L, TimeUnit.SECONDS);
}

@VisibleForTesting
static CloseableExecutorProvider createAsyncExecutorProvider(
/**
* Creates a {@link CloseableExecutorProvider} that can be used as an {@link ExecutorProvider} for
* the async API. The {@link ExecutorProvider} will lazily create up to poolSize threads. The
* backing threads will automatically be shutdown if they have not been used during the keep-alive
* time. The backing threads are created as daemon threads.
*
* @param poolSize the maximum number of threads to create in the pool
* @param keepAliveTime the time that an unused thread in the pool should be kept alive
* @param unit the time unit used for the keepAliveTime
* @return a {@link CloseableExecutorProvider} that can be used for {@link
* SpannerOptions.Builder#setAsyncExecutorProvider(CloseableExecutorProvider)}
*/
public static CloseableExecutorProvider createAsyncExecutorProvider(
int poolSize, long keepAliveTime, TimeUnit unit) {
String format =
String.format("spanner-async-pool-%d-thread-%%d", DEFAULT_POOL_COUNT.incrementAndGet());
Expand Down Expand Up @@ -1018,6 +1033,26 @@ public Builder setCompressorName(@Nullable String compressorName) {
return this;
}

/**
* Sets the {@link ExecutorProvider} to use for high-level async calls that need an executor,
* such as fetching results for an {@link AsyncResultSet}.
*
* <p>Async methods will use a sensible default if no custom {@link ExecutorProvider} has been
* set. The default {@link ExecutorProvider} uses a cached thread pool containing a maximum of 8
* threads. The pool is lazily initialized and will not create any threads if the user
* application does not use any async methods. It will also scale down the thread usage if the
* async load allows for that.
*
* <p>Call {@link SpannerOptions#createAsyncExecutorProvider(int, long, TimeUnit)} to create a
* provider with a custom pool size or call {@link
* FixedCloseableExecutorProvider#create(ScheduledExecutorService)} to create a {@link
* CloseableExecutorProvider} from a standard Java {@link ScheduledExecutorService}.
*/
public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
this.asyncExecutorProvider = provider;
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -1198,7 +1233,7 @@ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
return options;
}

CloseableExecutorProvider getAsyncExecutorProvider() {
public CloseableExecutorProvider getAsyncExecutorProvider() {
return asyncExecutorProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
Expand All @@ -29,6 +31,7 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
Expand All @@ -54,6 +57,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -892,4 +896,16 @@ public void testSpannerCallContextTimeoutConfigurator_WithTimeouts() {
.getTimeout())
.isEqualTo(Duration.ofSeconds(6L));
}

@Test
public void testCustomAsyncExecutorProvider() {
ScheduledExecutorService service = mock(ScheduledExecutorService.class);
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service))
.build();
assertSame(service, options.getAsyncExecutorProvider().getExecutor());
}
}

0 comments on commit 369c8a7

Please sign in to comment.