Skip to content

Commit

Permalink
fix: add client shutdown if request waiting in request queue for too …
Browse files Browse the repository at this point in the history
…long. (#2017)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: close before retry connection

* fix: close before retry connection

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add client side timeout if inflight request wait too long

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Mar 1, 2023
1 parent de00447 commit 91da88b
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 4 deletions.
Expand Up @@ -33,6 +33,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -66,6 +67,14 @@ class ConnectionWorker implements AutoCloseable {
// Maximum wait time on inflight quota before error out.
private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000;

/*
* Maximum time waiting for request callback before shutting down the connection.
*
* We will constantly checking how much time we have been waiting for the next request callback
* if we wait too much time we will start shutting down the connections and clean up the queues.
*/
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);

private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
Expand Down Expand Up @@ -273,7 +282,6 @@ public void run() {
log.warning(
"Exception thrown from append loop, thus stream writer is shutdown due to exception: "
+ e.toString());
e.printStackTrace();
lock.lock();
try {
connectionFinalStatus = e;
Expand Down Expand Up @@ -507,7 +515,7 @@ public void close() {
} finally {
this.lock.unlock();
}
log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
log.info("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
try {
appendThread.join();
} catch (InterruptedException e) {
Expand All @@ -525,6 +533,7 @@ public void close() {
// Backend request has a 2 minute timeout, so wait a little longer than that.
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
log.warning("Client await termination timeout in writer id " + writerId);
}

try {
Expand Down Expand Up @@ -569,6 +578,11 @@ private void appendLoop() {
this.lock.lock();
try {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
// Check whether we should error out the current append loop.
if (inflightRequestQueue.size() > 0) {
throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp);
}

// Copy the streamConnectionIsConnected guarded by lock to a local variable.
// In addition, only reconnect if there is a retriable error.
streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null;
Expand All @@ -583,6 +597,7 @@ private void appendLoop() {
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
requestWrapper.trySetRequestInsertQueueTime();
this.inflightRequestQueue.addLast(requestWrapper);
localQueue.addLast(requestWrapper);
}
Expand Down Expand Up @@ -703,6 +718,17 @@ private void appendLoop() {
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
}

private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
throw new RuntimeException(
String.format(
"Request has waited in inflight queue for %sms for writer %s, "
+ "which is over maximum wait time %s",
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString()));
}
}

/*
* Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue.
*
Expand Down Expand Up @@ -740,6 +766,7 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
}
this.lock.lock();
try {
log.warning("Donecallback is not triggered within timeout frame for writer " + writerId);
if (connectionFinalStatus == null) {
connectionFinalStatus =
new StatusRuntimeException(
Expand Down Expand Up @@ -883,7 +910,7 @@ private boolean isConnectionErrorRetriable(Throwable t) {
}

private void doneCallback(Throwable finalStatus) {
log.fine(
log.info(
"Received done callback. Stream: "
+ streamName
+ " worker id: "
Expand Down Expand Up @@ -923,7 +950,9 @@ private void doneCallback(Throwable finalStatus) {
"Connection finished with error "
+ finalStatus.toString()
+ " for stream "
+ streamName);
+ streamName
+ " with write id: "
+ writerId);
}
}
} finally {
Expand Down Expand Up @@ -955,12 +984,21 @@ static final class AppendRequestAndResponse {
// The writer that issues the call of the request.
final StreamWriter streamWriter;

Instant requestCreationTimeStamp;

AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
}

void trySetRequestInsertQueueTime() {
// Only set the first time the caller tries to set the timestamp.
if (requestCreationTimeStamp == null) {
requestCreationTimeStamp = Instant.now();
}
}
}

/** Returns the current workload of this worker. */
Expand Down Expand Up @@ -1051,6 +1089,11 @@ static void setMaxInflightQueueWaitTime(long waitTime) {
INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime;
}

@VisibleForTesting
static void setMaxInflightRequestWaitTime(Duration waitTime) {
MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
}

@AutoValue
abstract static class TableSchemaAndTimestamp {

Expand Down
Expand Up @@ -61,6 +61,7 @@ public class ConnectionWorkerTest {
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10));
serviceHelper =
new MockServiceHelper(
UUID.randomUUID().toString(), Arrays.<MockGrpcService>asList(testBigQueryWrite));
Expand Down Expand Up @@ -607,4 +608,106 @@ public void testLoadIsOverWhelmed() {
Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
assertThat(load2.isOverwhelmed()).isFalse();
}

@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
null,
createProtoSchema("foo"),
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(
sendTestMessage(
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
}

for (int i = 0; i < appendCount; i++) {
int finalI = i;
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
}

// The future append will directly fail.
ExecutionException ex =
assertThrows(
ExecutionException.class,
() ->
sendTestMessage(
connectionWorker,
sw1,
createFooProtoRows(new String[] {String.valueOf(100)}),
100)
.get());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
}

@Test
public void testLongTimeIdleWontFail() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
null,
createProtoSchema("foo"),
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client.getSettings());

long appendCount = 10;
for (int i = 0; i < appendCount * 2; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(
sendTestMessage(
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
}
// Sleep 2 seconds to make sure request queue is empty.
Thread.sleep(2000);
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 0);
for (int i = 0; i < appendCount; i++) {
futures.add(
sendTestMessage(
connectionWorker,
sw1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i + appendCount));
}
for (int i = 0; i < appendCount * 2; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
}
}

0 comments on commit 91da88b

Please sign in to comment.