Skip to content

Commit

Permalink
fix: fix one potential root cause of deadlock in connection worker (#…
Browse files Browse the repository at this point in the history
…1955)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Jan 25, 2023
1 parent 1ad4563 commit 598ce5e
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 65 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.2'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.2"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3"
```

## Authentication
Expand Down
Expand Up @@ -40,6 +40,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand All @@ -63,6 +65,7 @@ public class ConnectionWorker implements AutoCloseable {
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
private static Duration maxRetryDuration = Duration.ofMinutes(5);
private ExecutorService threadPool = Executors.newFixedThreadPool(1);

/*
* The identifier of the current stream to write to. This stream name can change during
Expand Down Expand Up @@ -288,7 +291,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
requestWrapper.appendResult.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed"),
.withDescription("Connection is already closed during append"),
streamName,
writerId));
return requestWrapper.appendResult;
Expand Down Expand Up @@ -382,6 +385,18 @@ public void close() {
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}

try {
threadPool.shutdown();
threadPool.awaitTermination(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
"Close on thread pool for "
+ streamName
+ " is interrupted with exception: "
+ e.toString());
}
}

/*
Expand Down Expand Up @@ -639,35 +654,44 @@ private void requestCallback(AppendRowsResponse response) {
} finally {
this.lock.unlock();
}
if (response.hasError()) {
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
log.fine(String.format("Got error message: %s", response.toString()));
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < response.getRowErrorsCount(); i++) {
RowError rowError = response.getRowErrors(i);
rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage());
}
AppendSerializtionError exception =
new AppendSerializtionError(
response.getError().getCode(),
response.getError().getMessage(),
streamName,
rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
} else {
requestWrapper.appendResult.set(response);
}

// We need a separte thread pool to unblock the next request callback.
// Otherwise user may call append inside request callback, which may be blocked on waiting
// on in flight quota, causing deadlock as requests can't be popped out of queue until
// the current request callback finishes.
threadPool.submit(
() -> {
if (response.hasError()) {
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
log.fine(String.format("Got error message: %s", response.toString()));
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < response.getRowErrorsCount(); i++) {
RowError rowError = response.getRowErrors(i);
rowIndexToErrorMessage.put(
Math.toIntExact(rowError.getIndex()), rowError.getMessage());
}
AppendSerializtionError exception =
new AppendSerializtionError(
response.getError().getCode(),
response.getError().getMessage(),
streamName,
rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
} else {
requestWrapper.appendResult.set(response);
}
});
}

private boolean isRetriableError(Throwable t) {
Expand Down
Expand Up @@ -20,7 +20,10 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
Expand All @@ -34,6 +37,7 @@
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -282,6 +286,64 @@ public void testAppendSuccess() throws Exception {
writer.close();
}

@Test
public void testAppendSuccess_RetryDirectlyInCallback() throws Exception {
// Set a relatively small in flight request counts.
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setMaxRetryDuration(java.time.Duration.ofSeconds(5))
.setMaxInflightRequests(5)
.build();

// Fail the first request, in the request callback of the first request we will insert another
// 10 requests. Those requests can't be processed until the previous request callback has
// been finished.
long appendCount = 20;
for (int i = 0; i < appendCount; i++) {
if (i == 0) {
testBigQueryWrite.addResponse(
createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message"));
}
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// We will trigger 10 more requests in the request callback of the following request.
ProtoRows protoRows = createProtoRows(new String[] {String.valueOf(-1)});
ApiFuture<AppendRowsResponse> future = writer.append(protoRows, -1);
ApiFutures.addCallback(
future, new AppendCompleteCallback(writer, protoRows), MoreExecutors.directExecutor());

StatusRuntimeException actualError =
assertFutureException(StatusRuntimeException.class, future);

Sleeper.DEFAULT.sleep(1000);
writer.close();
}

static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

private final StreamWriter mainStreamWriter;
private final ProtoRows protoRows;
private int retryCount = 0;

public AppendCompleteCallback(StreamWriter mainStreamWriter, ProtoRows protoRows) {
this.mainStreamWriter = mainStreamWriter;
this.protoRows = protoRows;
}

public void onSuccess(AppendRowsResponse response) {
// Donothing
}

public void onFailure(Throwable throwable) {
for (int i = 0; i < 10; i++) {
this.mainStreamWriter.append(protoRows);
}
}
}

@Test
public void testUpdatedSchemaFetch_multiplexing() throws Exception {
testUpdatedSchemaFetch(/*enableMultiplexing=*/ true);
Expand Down
Expand Up @@ -39,8 +39,6 @@
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
Expand Down Expand Up @@ -188,16 +186,14 @@ static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsRespo

private final DataWriter parent;
private final AppendContext appendContext;
// Prepare a thread pool
static ExecutorService pool = Executors.newFixedThreadPool(50);

public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
this.parent = parent;
this.appendContext = appendContext;
}

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success%n");
System.out.format("Append success\n");
done();
}

Expand All @@ -209,22 +205,17 @@ public void onFailure(Throwable throwable) {
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
// Use a separate thread to avoid potentially blocking while we are in a callback.
pool.submit(
() -> {
try {
// Since default stream appends are not ordered, we can simply retry the
// appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s%n", e);
}
});
// Mark the existing attempt as done since it's being retried.
done();
return;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
done();
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

if (throwable instanceof AppendSerializtionError) {
Expand All @@ -241,21 +232,19 @@ public void onFailure(Throwable throwable) {
}
}

// Mark the existing attempt as done since we got a response for it
done();

// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
pool.submit(
() -> {
try {
this.parent.append(new AppendContext(dataNew, 0));
} catch (Exception e2) {
System.out.format("Failed to retry append with filtered rows: %s%n", e2);
}
});
try {
this.parent.append(new AppendContext(dataNew, 0));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// Mark the existing attempt as done since we got a response for it
done();
return;
}
}
Expand All @@ -267,7 +256,6 @@ public void onFailure(Throwable throwable) {
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error that arrived: %s%n", throwable);
done();
}

Expand Down

0 comments on commit 598ce5e

Please sign in to comment.