Skip to content

Commit

Permalink
feat: ExecutorProvider can now be replaced (#1770)
Browse files Browse the repository at this point in the history
An arbitrary ExecutorProvider can be set to control the generation of gax threads.

Fixes #1769
  • Loading branch information
tomoponzoo committed Sep 27, 2022
1 parent 419637e commit 6380f71
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
Expand Down Expand Up @@ -83,6 +84,7 @@ private JsonStreamWriter(Builder builder)
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
builder.executorProvider,
builder.endpoint,
builder.flowControlSettings,
builder.traceId);
Expand Down Expand Up @@ -270,6 +272,7 @@ public long getInflightWaitSeconds() {
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable ExecutorProvider executorProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
@Nullable String traceId) {
Expand All @@ -279,6 +282,9 @@ private void setStreamWriterSettings(
if (credentialsProvider != null) {
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
if (executorProvider != null) {
streamWriterBuilder.setExecutorProvider(executorProvider);
}
if (endpoint != null) {
streamWriterBuilder.setEndpoint(endpoint);
}
Expand Down Expand Up @@ -366,6 +372,7 @@ public static final class Builder {

private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;
private ExecutorProvider executorProvider;
private FlowControlSettings flowControlSettings;
private String endpoint;
private boolean createDefaultStream = false;
Expand Down Expand Up @@ -446,6 +453,18 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
return this;
}

/**
* Setter for the underlying StreamWriter's ExecutorProvider.
*
* @param executorProvider
* @return
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider =
Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
return this;
}

/**
* Setter for the underlying StreamWriter's FlowControlSettings.
*
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
Expand Down Expand Up @@ -174,6 +175,7 @@ private StreamWriter(Builder builder) throws IOException {
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
Expand Down Expand Up @@ -383,6 +385,9 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private ExecutorProvider executorProvider =
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();

private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;

Expand Down Expand Up @@ -459,6 +464,12 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
return this;
}

/** {@code ExecutorProvider} to use to create Executor to run background jobs. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

/**
* Sets traceId for debuging purpose. TraceId must follow the format of
* CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x.
Expand Down
Expand Up @@ -138,7 +138,8 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder(
String testStream, TableSchema BQTableSchema) {
return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build());
}

@Test
Expand Down Expand Up @@ -403,6 +404,7 @@ public void testCreateDefaultStream() throws Exception {
JsonStreamWriter.newBuilder(TEST_TABLE, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
.build()) {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
assertEquals("aa", writer.getLocation());
Expand Down Expand Up @@ -651,6 +653,7 @@ public void testWithIgnoreUnknownFields() throws Exception {
.setChannelProvider(channelProvider)
.setIgnoreUnknownFields(true)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
JSONObject foo = new JSONObject();
Expand All @@ -672,6 +675,7 @@ public void testFlowControlSetting() throws Exception {
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
Expand Down Expand Up @@ -709,6 +713,7 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception {
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build())
.build()) {
Expand Down

0 comments on commit 6380f71

Please sign in to comment.