From 3530672f0bddfacb973fb0fc1d30aabb4ffefccb Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 18 Jan 2023 17:19:39 -0800 Subject: [PATCH] fix: we should isolate the client used in StreamWriter and the client used in ConnectionWorker (#1933) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * . * . * . * . * . * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * . * . * . * . * . * . * . * . * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- README.md | 4 +- .../clirr-ignored-differences.xml | 12 ++++ .../bigquery/storage/v1/ConnectionWorker.java | 24 +++---- .../storage/v1/ConnectionWorkerPool.java | 34 ++++----- .../bigquery/storage/v1/StreamWriter.java | 43 +++++------- .../storage/v1/ConnectionWorkerPoolTest.java | 70 ++++++++++++++++--- .../storage/v1/ConnectionWorkerTest.java | 3 +- .../v1beta2/BigQueryReadClientTest.java | 2 +- 8 files changed, 113 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index fa5a31e151..5c8c5d12da 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.2' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.2" ``` ## Authentication diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index b33dced533..92f0b258c1 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -86,4 +86,16 @@ com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean) + + 7005 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean) + ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) + + + 7005 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool + ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean) + ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 50086e95e2..4a32f57239 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -171,11 +171,6 @@ public class ConnectionWorker implements AutoCloseable { */ private BigQueryWriteClient client; - /* - * If true, the client above is created by this writer and should be closed. - */ - private boolean ownsBigQueryWriteClient = false; - /* * Wraps the underlying bi-directional stream connection with server. */ @@ -209,8 +204,7 @@ public ConnectionWorker( Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, - BigQueryWriteClient client, - boolean ownsBigQueryWriteClient) + BigQueryWriteSettings clientSettings) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -229,8 +223,8 @@ public ConnectionWorker( this.traceId = traceId; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); - this.client = client; - this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; + // Always recreate a client for connection worker. + this.client = BigQueryWriteClient.create(clientSettings); this.appendThread = new Thread( @@ -382,13 +376,11 @@ public void close() { log.warning( "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); } - if (this.ownsBigQueryWriteClient) { - this.client.close(); - try { - // Backend request has a 2 minute timeout, so wait a little longer than that. - this.client.awaitTermination(150, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - } + this.client.close(); + try { + // Backend request has a 2 minute timeout, so wait a little longer than that. + this.client.awaitTermination(150, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 121b1d0e28..40f21b72cb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -149,12 +149,7 @@ public class ConnectionWorkerPool { /* * A client used to interact with BigQuery. */ - private BigQueryWriteClient client; - - /* - * If true, the client above is created by this writer and should be closed. - */ - private boolean ownsBigQueryWriteClient = false; + private BigQueryWriteSettings clientSettings; /** * The current maximum connection count. This value is gradually increased till the user defined @@ -204,15 +199,13 @@ public ConnectionWorkerPool( java.time.Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, - BigQueryWriteClient client, - boolean ownsBigQueryWriteClient) { + BigQueryWriteSettings clientSettings) { this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; this.limitExceededBehavior = limitExceededBehavior; this.traceId = traceId; - this.client = client; - this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; + this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); } @@ -308,6 +301,7 @@ private ConnectionWorker createOrReuseConnectionWorker( } return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); } else { + // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { return existingConnectionWorker; @@ -355,8 +349,6 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); } - // currently we use different header for the client in each connection worker to be different - // as the backend require the header to have the same write_stream field as request body. ConnectionWorker connectionWorker = new ConnectionWorker( streamName, @@ -366,8 +358,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w maxRetryDuration, limitExceededBehavior, traceId, - client, - ownsBigQueryWriteClient); + clientSettings); connectionWorkerPool.add(connectionWorker); log.info( String.format( @@ -402,8 +393,11 @@ public void close(StreamWriter streamWriter) { log.info( String.format( "During closing of writeStream for %s with writer id %s, we decided to close %s " - + "connections", - streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size())); + + "connections, pool size after removal $s", + streamWriter.getStreamName(), + streamWriter.getWriterId(), + connectionToRemove.size(), + connectionToWriteStream.size() - 1)); connectionToWriteStream.keySet().removeAll(connectionToRemove); } finally { lock.unlock(); @@ -447,16 +441,12 @@ String getTraceId() { return traceId; } - boolean ownsBigQueryWriteClient() { - return ownsBigQueryWriteClient; - } - FlowController.LimitExceededBehavior limitExceededBehavior() { return limitExceededBehavior; } - BigQueryWriteClient bigQueryWriteClient() { - return client; + BigQueryWriteSettings bigQueryWriteSettings() { + return clientSettings; } static String toTableName(String streamName) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ff7dad474d..edc7240ad7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -184,7 +184,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool( private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; - boolean ownsBigQueryWriteClient = builder.client == null; + BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); if (!builder.enableConnectionPool) { this.location = builder.location; this.singleConnectionOrConnectionPool = @@ -197,8 +197,7 @@ private StreamWriter(Builder builder) throws IOException { builder.maxRetryDuration, builder.limitExceededBehavior, builder.traceId, - getBigQueryWriteClient(builder), - ownsBigQueryWriteClient)); + clientSettings)); } else { if (!isDefaultStream(streamName)) { log.warning( @@ -208,7 +207,9 @@ private StreamWriter(Builder builder) throws IOException { "Trying to enable connection pool in non-default stream."); } - BigQueryWriteClient client = getBigQueryWriteClient(builder); + // We need a client to perform some getWriteStream calls. + BigQueryWriteClient client = + builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings); String location = builder.location; if (location == null || location.isEmpty()) { // Location is not passed in, try to fetch from RPC @@ -256,14 +257,11 @@ private StreamWriter(Builder builder) throws IOException { builder.maxRetryDuration, builder.limitExceededBehavior, builder.traceId, - client, - ownsBigQueryWriteClient); + client.getSettings()); })); validateFetchedConnectonPool(builder); - // Shut down the passed in client. Internally we will create another client inside connection - // pool for every new connection worker. - if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient() - && ownsBigQueryWriteClient) { + // If the client is not from outside, then shutdown the client we created. + if (builder.client == null) { client.shutdown(); try { client.awaitTermination(150, TimeUnit.SECONDS); @@ -293,19 +291,16 @@ static boolean isDefaultStream(String streamName) { return streamMatcher.find(); } - private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException { - if (builder.client == null) { - BigQueryWriteSettings stubSettings = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setBackgroundExecutorProvider(builder.executorProvider) - .setEndpoint(builder.endpoint) - .build(); - testOnlyClientCreatedTimes++; - return BigQueryWriteClient.create(stubSettings); + private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { + if (builder.client != null) { + return builder.client.getSettings(); } else { - return builder.client; + return BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setTransportChannelProvider(builder.channelProvider) + .setBackgroundExecutorProvider(builder.executorProvider) + .setEndpoint(builder.endpoint) + .build(); } } @@ -316,10 +311,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { paramsValidatedFailed = "Trace id"; - } else if (!Objects.equals( - this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(), - builder.client == null)) { - paramsValidatedFailed = "Whether using passed in clients"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), builder.limitExceededBehavior)) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 961ad3fdc1..415c35329a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -48,7 +48,7 @@ public class ConnectionWorkerPoolTest { private FakeBigQueryWrite testBigQueryWrite; private FakeScheduledExecutorService fakeExecutor; private static MockServiceHelper serviceHelper; - private BigQueryWriteClient client; + private BigQueryWriteSettings clientSettings; private static final String TEST_TRACE_ID = "home:job1"; private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default"; @@ -63,12 +63,11 @@ public void setUp() throws Exception { serviceHelper.start(); fakeExecutor = new FakeScheduledExecutorService(); testBigQueryWrite.setExecutor(fakeExecutor); - client = - BigQueryWriteClient.create( - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(NoCredentialsProvider.create()) - .setTransportChannelProvider(serviceHelper.createChannelProvider()) - .build()); + clientSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build(); ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5); ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6); } @@ -325,6 +324,56 @@ public void testToTableName() { IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/")); } + @Test + public void testCloseExternalClient() + throws IOException, InterruptedException, ExecutionException { + // Try append 100 requests. + long appendCount = 100L; + // testBigQueryWrite is used to + for (long i = 0; i < appendCount * 2; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + testBigQueryWrite.addResponse(WriteStream.newBuilder().setLocation("us").build()); + List> futures = new ArrayList<>(); + BigQueryWriteClient externalClient = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + // Create some stream writers. + List streamWriterList = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + StreamWriter sw = + StreamWriter.newBuilder( + String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i), + externalClient) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setEnableConnectionPool(true) + .build(); + streamWriterList.add(sw); + } + + for (long i = 0; i < appendCount; i++) { + StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size())); + // Round robinly insert requests to different tables. + futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + externalClient.close(); + externalClient.awaitTermination(1, TimeUnit.MINUTES); + // Send more requests, the connections should still work. + for (long i = appendCount; i < appendCount * 2; i++) { + StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size())); + futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i)); + } + for (int i = 0; i < appendCount * 2; i++) { + AppendRowsResponse response = futures.get(i).get(); + assertThat(response.getAppendResult().getOffset().getValue()).isEqualTo(i); + } + assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount * 2); + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( @@ -333,9 +382,11 @@ private AppendRowsResponse createAppendResponse(long offset) { } private StreamWriter getTestStreamWriter(String streamName) throws IOException { - return StreamWriter.newBuilder(streamName, client) + return StreamWriter.newBuilder(streamName) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(serviceHelper.createChannelProvider()) .build(); } @@ -380,7 +431,6 @@ ConnectionWorkerPool createConnectionWorkerPool( maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, - client, - /*ownsBigQueryWriteClient=*/ false); + clientSettings); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 8db4b072b1..3d3d3f5a7c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -309,8 +309,7 @@ private ConnectionWorker createConnectionWorker( maxRetryDuration, FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, - client, - /*ownsBigQueryWriteClient=*/ false); + client.getSettings()); } private ProtoSchema createProtoSchema(String protoName) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index a551146bbc..3323e4e7ab 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -287,7 +287,7 @@ public void readRowsNoRetryForResourceExhaustedWithRetryInfo() throws ExecutionException, InterruptedException { RetryInfo retryInfo = RetryInfo.newBuilder() - .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .setRetryDelay(Duration.newBuilder().setSeconds(2).setNanos(456).build()) .build(); Metadata metadata = new Metadata();