Skip to content

Commit

Permalink
feat: allow java client to handle schema change during same stream na…
Browse files Browse the repository at this point in the history
…me (#1964)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Jan 31, 2023
1 parent 3159b12 commit 305f71e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 31 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.4.0')
implementation platform('com.google.cloud:libraries-bom:26.5.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
```

## Authentication
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -115,7 +115,7 @@
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
Expand Down
Expand Up @@ -222,7 +222,6 @@ public ConnectionWorker(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
this.writerSchema = writerSchema;
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
Expand Down Expand Up @@ -432,7 +431,7 @@ private void appendLoop() {

// Indicate whether we are at the first request after switching destination.
// True means the schema and other metadata are needed.
boolean firstRequestForDestinationSwitch = true;
boolean firstRequestForTableOrSchemaSwitch = true;
// Represent whether we have entered multiplexing.
boolean isMultiplexing = false;

Expand Down Expand Up @@ -483,25 +482,35 @@ private void appendLoop() {
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata. Reset everytime after reconnection.
firstRequestForDestinationSwitch = true;
firstRequestForTableOrSchemaSwitch = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();

// Consider we enter multiplexing if we met a different non empty stream name.
if (!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName)) {
// Always respect the first writer schema seen by the loop.
if (writerSchema == null) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
}
// Consider we enter multiplexing if we met a different non empty stream name or we meet
// a new schema for the same stream name.
// For the schema comparision we don't use message differencer to speed up the comparing
// process. `equals(...)` can bring us false positive, e.g. two repeated field can be
// considered the same but is not considered equals(). However as long as it's never provide
// false negative we will always correctly pass writer schema to backend.
if ((!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName))
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForDestinationSwitch = true;
firstRequestForTableOrSchemaSwitch = true;
}

if (firstRequestForDestinationSwitch) {
if (firstRequestForTableOrSchemaSwitch) {
// If we are at the first request for every table switch, including the first request in
// the connection, we will attach both stream name and table schema to the request.
// We don't support change of schema change during multiplexing for the saeme stream name.
destinationSet.add(streamName);
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
Expand All @@ -511,17 +520,11 @@ private void appendLoop() {
originalRequestBuilder.clearWriteStream();
}

// We don't use message differencer to speed up the comparing process.
// `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
// same but is not considered equals(). However as long as it's never provide false negative
// we will always correctly pass writer schema to backend.
if (firstRequestForDestinationSwitch
|| !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
} else {
// During non table/schema switch requests, clear writer schema.
if (!firstRequestForTableOrSchemaSwitch) {
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
firstRequestForDestinationSwitch = false;
firstRequestForTableOrSchemaSwitch = false;

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
Expand Down
Expand Up @@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema3)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: t1, schema: schema3)
// (writer_stream: t1, schema: _)
// (writer_stream: t1, schema: schema1)
// (writer_stream: t1, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
Expand All @@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception {
.isEqualTo("foo");
break;
case 1:
assertThat(serverRequest.getWriteStream()).isEmpty();
if (i == 1) {
assertThat(serverRequest.getWriteStream()).isEmpty();
} else {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
case 2:
assertThat(serverRequest.getWriteStream()).isEmpty();
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("bar");
break;
case 3:
assertThat(serverRequest.getWriteStream()).isEmpty();
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
Expand Down

0 comments on commit 305f71e

Please sign in to comment.