From 6380f713b8f42fa3a58df2750ea2cf3b7397d29c Mon Sep 17 00:00:00 2001 From: "Tomoki Koga (gacchi)" Date: Wed, 28 Sep 2022 05:02:16 +0900 Subject: [PATCH] feat: ExecutorProvider can now be replaced (#1770) An arbitrary ExecutorProvider can be set to control the generation of gax threads. Fixes #1769 --- .../bigquery/storage/v1/JsonStreamWriter.java | 19 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriter.java | 11 +++++++++++ .../storage/v1/JsonStreamWriterTest.java | 7 ++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index a0f3f807d7..85b8f31406 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; @@ -83,6 +84,7 @@ private JsonStreamWriter(Builder builder) setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, + builder.executorProvider, builder.endpoint, builder.flowControlSettings, builder.traceId); @@ -270,6 +272,7 @@ public long getInflightWaitSeconds() { private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, @Nullable CredentialsProvider credentialsProvider, + @Nullable ExecutorProvider executorProvider, @Nullable String endpoint, @Nullable FlowControlSettings flowControlSettings, @Nullable String traceId) { @@ -279,6 +282,9 @@ private void setStreamWriterSettings( if (credentialsProvider != null) { streamWriterBuilder.setCredentialsProvider(credentialsProvider); } + if (executorProvider != null) { + streamWriterBuilder.setExecutorProvider(executorProvider); + } if (endpoint != null) { streamWriterBuilder.setEndpoint(endpoint); } @@ -366,6 +372,7 @@ public static final class Builder { private TransportChannelProvider channelProvider; private CredentialsProvider credentialsProvider; + private ExecutorProvider executorProvider; private FlowControlSettings flowControlSettings; private String endpoint; private boolean createDefaultStream = false; @@ -446,6 +453,18 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { return this; } + /** + * Setter for the underlying StreamWriter's ExecutorProvider. + * + * @param executorProvider + * @return + */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); + return this; + } + /** * Setter for the underlying StreamWriter's FlowControlSettings. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 46b9e141bf..9ae3440780 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; @@ -174,6 +175,7 @@ private StreamWriter(Builder builder) throws IOException { BigQueryWriteSettings.newBuilder() .setCredentialsProvider(builder.credentialsProvider) .setTransportChannelProvider(builder.channelProvider) + .setBackgroundExecutorProvider(builder.executorProvider) .setEndpoint(builder.endpoint) // (b/185842996): Temporily fix this by explicitly providing the header. .setHeaderProvider( @@ -383,6 +385,9 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private ExecutorProvider executorProvider = + BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); + private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; @@ -459,6 +464,12 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { return this; } + /** {@code ExecutorProvider} to use to create Executor to run background jobs. */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = executorProvider; + return this; + } + /** * Sets traceId for debuging purpose. TraceId must follow the format of * CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index c4f788482b..468df368c0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -138,7 +138,8 @@ private JsonStreamWriter.Builder getTestJsonStreamWriterBuilder( String testStream, TableSchema BQTableSchema) { return JsonStreamWriter.newBuilder(testStream, BQTableSchema, client) .setChannelProvider(channelProvider) - .setCredentialsProvider(NoCredentialsProvider.create()); + .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()); } @Test @@ -403,6 +404,7 @@ public void testCreateDefaultStream() throws Exception { JsonStreamWriter.newBuilder(TEST_TABLE, client) .setChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()) .build()) { assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName()); assertEquals("aa", writer.getLocation()); @@ -651,6 +653,7 @@ public void testWithIgnoreUnknownFields() throws Exception { .setChannelProvider(channelProvider) .setIgnoreUnknownFields(true) .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()) .build()) { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); JSONObject foo = new JSONObject(); @@ -672,6 +675,7 @@ public void testFlowControlSetting() throws Exception { JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) .setChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) @@ -709,6 +713,7 @@ public void testFlowControlSettingNoLimitBehavior() throws Exception { JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) .setChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()) .setFlowControlSettings( FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build()) .build()) {