Skip to content

Commit

Permalink
feat: add timeout to inflight queue waiting (#1957)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Jan 25, 2023
1 parent dcb234b commit 3159b12
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
Expand Up @@ -61,6 +61,9 @@
class ConnectionWorker implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

// Maximum wait time on inflight quota before error out.
private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000;

private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
Expand Down Expand Up @@ -322,7 +325,14 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
this.inflightBytes += requestWrapper.messageSize;
waitingRequestQueue.addLast(requestWrapper);
hasMessageInWaitingQueue.signal();
maybeWaitForInflightQuota();
try {
maybeWaitForInflightQuota();
} catch (StatusRuntimeException ex) {
--this.inflightRequests;
waitingRequestQueue.pollLast();
this.inflightBytes -= requestWrapper.messageSize;
throw ex;
}
return requestWrapper.appendResult;
} finally {
this.lock.unlock();
Expand All @@ -347,6 +357,15 @@ private void maybeWaitForInflightQuota() {
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
long current_wait_time = System.currentTimeMillis() - start_time;
if (current_wait_time > INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI) {
throw new StatusRuntimeException(
Status.fromCode(Code.CANCELLED)
.withDescription(
String.format(
"Interrupted while waiting for quota due to long waiting time %sms",
current_wait_time)));
}
}
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
}
Expand All @@ -373,7 +392,6 @@ public void close() {
log.fine("Waiting for append thread to finish. Stream: " + streamName);
try {
appendThread.join();
log.info("User close complete. Stream: " + streamName);
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
Expand All @@ -387,6 +405,7 @@ public void close() {
}

try {
log.fine("Begin shutting down user callback thread pool for stream " + streamName);
threadPool.shutdown();
threadPool.awaitTermination(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Expand All @@ -396,7 +415,10 @@ public void close() {
+ streamName
+ " is interrupted with exception: "
+ e.toString());
throw new IllegalStateException(
"Thread pool shutdown is interrupted for stream " + streamName);
}
log.info("User close finishes for stream " + streamName);
}

/*
Expand Down Expand Up @@ -858,6 +880,11 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
}
}

@VisibleForTesting
static void setMaxInflightQueueWaitTime(long waitTime) {
INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime;
}

@AutoValue
abstract static class TableSchemaAndTimestamp {
// Shows the timestamp updated schema is reported from response
Expand Down
Expand Up @@ -16,6 +16,9 @@
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
Expand All @@ -28,7 +31,9 @@
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -52,6 +57,7 @@ public class ConnectionWorkerTest {
@Before
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
serviceHelper =
new MockServiceHelper(
UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite));
Expand Down Expand Up @@ -281,6 +287,63 @@ public void testAppendInSameStream_switchSchema() throws Exception {
}
}

@Test
public void testAppendButInflightQueueFull() throws Exception {
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
createProtoSchema("foo"),
6,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
ProtoSchema schema1 = createProtoSchema("foo");

long appendCount = 6;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 6 requests, since the max queue size is 5 we will stuck at the 6th request.
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
long startTime = System.currentTimeMillis();
// At the last request we wait more than 500 millisecond for inflight quota.
if (i == 5) {
assertThrows(
StatusRuntimeException.class,
() -> {
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(5)}),
5);
});
long timeDiff = System.currentTimeMillis() - startTime;
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 5);
assertTrue(timeDiff > 500);
} else {
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
}
}

for (int i = 0; i < appendCount - 1; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down
Expand Up @@ -105,6 +105,7 @@ public StreamWriterTest() throws DescriptorValidationException {}
@Before
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
serviceHelper =
new MockServiceHelper(
UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite));
Expand Down

0 comments on commit 3159b12

Please sign in to comment.