Skip to content

Commit

Permalink
fix: we should isolate the client used in StreamWriter and the client…
Browse files Browse the repository at this point in the history
… used in ConnectionWorker (#1933)

* .

* .

* .

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* .

* .

* .

* .

* .

* .

* .

* .

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Jan 19, 2023
1 parent 53820c6 commit 3530672
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 79 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.2'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.2"
```

## Authentication
Expand Down
12 changes: 12 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -86,4 +86,16 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
<to>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
<to>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</to>
</difference>
</differences>
Expand Up @@ -171,11 +171,6 @@ public class ConnectionWorker implements AutoCloseable {
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;

/*
* Wraps the underlying bi-directional stream connection with server.
*/
Expand Down Expand Up @@ -209,8 +204,7 @@ public ConnectionWorker(
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient)
BigQueryWriteSettings clientSettings)
throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand All @@ -229,8 +223,8 @@ public ConnectionWorker(
this.traceId = traceId;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
// Always recreate a client for connection worker.
this.client = BigQueryWriteClient.create(clientSettings);

this.appendThread =
new Thread(
Expand Down Expand Up @@ -382,13 +376,11 @@ public void close() {
log.warning(
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
}
if (this.ownsBigQueryWriteClient) {
this.client.close();
try {
// Backend request has a 2 minute timeout, so wait a little longer than that.
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
this.client.close();
try {
// Backend request has a 2 minute timeout, so wait a little longer than that.
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}

Expand Down
Expand Up @@ -149,12 +149,7 @@ public class ConnectionWorkerPool {
/*
* A client used to interact with BigQuery.
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;
private BigQueryWriteSettings clientSettings;

/**
* The current maximum connection count. This value is gradually increased till the user defined
Expand Down Expand Up @@ -204,15 +199,13 @@ public ConnectionWorkerPool(
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
BigQueryWriteSettings clientSettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}

Expand Down Expand Up @@ -308,6 +301,7 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {

// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
return existingConnectionWorker;
Expand Down Expand Up @@ -355,8 +349,6 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
Expand All @@ -366,8 +358,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxRetryDuration,
limitExceededBehavior,
traceId,
client,
ownsBigQueryWriteClient);
clientSettings);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
Expand Down Expand Up @@ -402,8 +393,11 @@ public void close(StreamWriter streamWriter) {
log.info(
String.format(
"During closing of writeStream for %s with writer id %s, we decided to close %s "
+ "connections",
streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size()));
+ "connections, pool size after removal $s",
streamWriter.getStreamName(),
streamWriter.getWriterId(),
connectionToRemove.size(),
connectionToWriteStream.size() - 1));
connectionToWriteStream.keySet().removeAll(connectionToRemove);
} finally {
lock.unlock();
Expand Down Expand Up @@ -447,16 +441,12 @@ String getTraceId() {
return traceId;
}

boolean ownsBigQueryWriteClient() {
return ownsBigQueryWriteClient;
}

FlowController.LimitExceededBehavior limitExceededBehavior() {
return limitExceededBehavior;
}

BigQueryWriteClient bigQueryWriteClient() {
return client;
BigQueryWriteSettings bigQueryWriteSettings() {
return clientSettings;
}

static String toTableName(String streamName) {
Expand Down
Expand Up @@ -184,7 +184,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
boolean ownsBigQueryWriteClient = builder.client == null;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
if (!builder.enableConnectionPool) {
this.location = builder.location;
this.singleConnectionOrConnectionPool =
Expand All @@ -197,8 +197,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient));
clientSettings));
} else {
if (!isDefaultStream(streamName)) {
log.warning(
Expand All @@ -208,7 +207,9 @@ private StreamWriter(Builder builder) throws IOException {
"Trying to enable connection pool in non-default stream.");
}

BigQueryWriteClient client = getBigQueryWriteClient(builder);
// We need a client to perform some getWriteStream calls.
BigQueryWriteClient client =
builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings);
String location = builder.location;
if (location == null || location.isEmpty()) {
// Location is not passed in, try to fetch from RPC
Expand Down Expand Up @@ -256,14 +257,11 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient);
client.getSettings());
}));
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
// If the client is not from outside, then shutdown the client we created.
if (builder.client == null) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
Expand Down Expand Up @@ -293,19 +291,16 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException {
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
.build();
testOnlyClientCreatedTimes++;
return BigQueryWriteClient.create(stubSettings);
private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
if (builder.client != null) {
return builder.client.getSettings();
} else {
return builder.client;
return BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
.build();
}
}

Expand All @@ -316,10 +311,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(),
builder.client == null)) {
paramsValidatedFailed = "Whether using passed in clients";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class ConnectionWorkerPoolTest {
private FakeBigQueryWrite testBigQueryWrite;
private FakeScheduledExecutorService fakeExecutor;
private static MockServiceHelper serviceHelper;
private BigQueryWriteClient client;
private BigQueryWriteSettings clientSettings;

private static final String TEST_TRACE_ID = "home:job1";
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
Expand All @@ -63,12 +63,11 @@ public void setUp() throws Exception {
serviceHelper.start();
fakeExecutor = new FakeScheduledExecutorService();
testBigQueryWrite.setExecutor(fakeExecutor);
client =
BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());
clientSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build();
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5);
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6);
}
Expand Down Expand Up @@ -325,6 +324,56 @@ public void testToTableName() {
IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/"));
}

@Test
public void testCloseExternalClient()
throws IOException, InterruptedException, ExecutionException {
// Try append 100 requests.
long appendCount = 100L;
// testBigQueryWrite is used to
for (long i = 0; i < appendCount * 2; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
testBigQueryWrite.addResponse(WriteStream.newBuilder().setLocation("us").build());
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
BigQueryWriteClient externalClient =
BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());
// Create some stream writers.
List<StreamWriter> streamWriterList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
StreamWriter sw =
StreamWriter.newBuilder(
String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i),
externalClient)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setEnableConnectionPool(true)
.build();
streamWriterList.add(sw);
}

for (long i = 0; i < appendCount; i++) {
StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size()));
// Round robinly insert requests to different tables.
futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}
externalClient.close();
externalClient.awaitTermination(1, TimeUnit.MINUTES);
// Send more requests, the connections should still work.
for (long i = appendCount; i < appendCount * 2; i++) {
StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size()));
futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}
for (int i = 0; i < appendCount * 2; i++) {
AppendRowsResponse response = futures.get(i).get();
assertThat(response.getAppendResult().getOffset().getValue()).isEqualTo(i);
}
assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount * 2);
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand All @@ -333,9 +382,11 @@ private AppendRowsResponse createAppendResponse(long offset) {
}

private StreamWriter getTestStreamWriter(String streamName) throws IOException {
return StreamWriter.newBuilder(streamName, client)
return StreamWriter.newBuilder(streamName)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.build();
}

Expand Down Expand Up @@ -380,7 +431,6 @@ ConnectionWorkerPool createConnectionWorkerPool(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
/*ownsBigQueryWriteClient=*/ false);
clientSettings);
}
}
Expand Up @@ -309,8 +309,7 @@ private ConnectionWorker createConnectionWorker(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client,
/*ownsBigQueryWriteClient=*/ false);
client.getSettings());
}

private ProtoSchema createProtoSchema(String protoName) {
Expand Down
Expand Up @@ -287,7 +287,7 @@ public void readRowsNoRetryForResourceExhaustedWithRetryInfo()
throws ExecutionException, InterruptedException {
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build())
.setRetryDelay(Duration.newBuilder().setSeconds(2).setNanos(456).build())
.build();

Metadata metadata = new Metadata();
Expand Down

0 comments on commit 3530672

Please sign in to comment.