Skip to content

Commit

Permalink
feat: add public api to stream writer to set the maximum wait time
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Mar 30, 2023
1 parent a989ac6 commit 1726c85
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
Expand Up @@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable {
* We will constantly checking how much time we have been waiting for the next request callback
* if we wait too much time we will start shutting down the connections and clean up the queues.
*/
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);

private Lock lock;
private Condition hasMessageInWaitingQueue;
Expand Down
Expand Up @@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() {
: null;
}

/**
* Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very
* low chance, it's possible for append request to be waiting indefintely for request callback
* when Google networking SDK does not detect the networking breakage. The default timeout is 15
* minutes. We are investigating the root cause for callback not triggered by networking SDK.
*/
public static void setMaxRequestCallbackWaitTime(Duration waitTime) {
ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
}

long getCreationTimestamp() {
return creationTimestamp;
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {}
@Before
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000));
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
serviceHelper =
new MockServiceHelper(
Expand Down Expand Up @@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}

@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1));
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
int finalI = i;
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
}
}

@Test
public void testAppendWithResetSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
Expand Down

0 comments on commit 1726c85

Please sign in to comment.