Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Exponentially backoff on INTERNAL errors for Default streams #2358

Merged
merged 10 commits into from Dec 14, 2023
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation platform('com.google.cloud:libraries-bom:26.29.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Expand Up @@ -970,7 +970,7 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
lock.lock();
try {
requestWrapper.retryCount++;
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
if (this.retrySettings != null && useBackoffForError(errorCode, streamName)) {
// Trigger exponential backoff in append loop when request is resent for quota errors.
// createNextAttempt correctly initializes the retry delay; createfirstAttempt does not
// include a positive delay, just 0.
Expand Down Expand Up @@ -1148,6 +1148,17 @@ private boolean isConnectionErrorRetriable(Code statusCode) {
|| statusCode == Code.DEADLINE_EXCEEDED;
}

private boolean useBackoffForError(Code statusCode, String streamName) {
// Default stream uses backoff for INTERNAL, as THROTTLED errors are more likely with default
// streams. RESOURCE_EXHAUSTED streams are used for backoff for each stream type.
if (isDefaultStreamName(streamName)) {
if (statusCode == Code.INTERNAL) {
return true;
}
}
return statusCode == Code.RESOURCE_EXHAUSTED;
}

private void doneCallback(Throwable finalStatus) {
log.info(
"Received done callback. Stream: "
Expand Down
Expand Up @@ -2047,6 +2047,44 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except
}
}

@Test
public void testAppendInternalErrorRetryExponentialBackoff() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();

testBigQueryWrite.addResponse(
new DummyResponseSupplierWillFailThenSucceed(
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()));

ApiFuture<AppendRowsResponse> future =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);

ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
future.get();
});
assertEquals(
Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode());

ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
Instant previousInstant = instants.get(0);
// Include initial attempt
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95;
for (int i = 1; i < instants.size(); i++) {
Instant currentInstant = instants.get(i);
double differenceInMillis =
java.time.Duration.between(previousInstant, currentInstant).toMillis();
assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS);
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
previousInstant = currentInstant;
}
}

@Test
public void testAppendSuccessAndNonRetryableError() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();
Expand Down
Expand Up @@ -1608,53 +1608,4 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
assertEquals("50", queryIter.next().get(0).getStringValue());
}
}

@Test
public void testDefaultRequestLimit()
throws IOException, InterruptedException, ExecutionException {
DatasetId datasetId =
DatasetId.of("bq-write-api-java-retry-test", RemoteBigQueryHelper.generateDatasetName());
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
try {
String tableName = "no_error_table";
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(CreateProtoSchemaWithColField())
.build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
CreateProtoRows(
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
try {
response.get();
Assert.fail("Large request should fail with InvalidArgumentError");
} catch (ExecutionException ex) {
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
io.grpc.StatusRuntimeException actualError =
(io.grpc.StatusRuntimeException) ex.getCause();
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
if (actualError.getStatus().getCode() != Code.INTERNAL) {
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
}
} finally {
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
}
}
}
Expand Up @@ -16,21 +16,38 @@

package com.google.cloud.bigquery.storage.v1.it;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -80,6 +97,15 @@ public static void afterClass() {
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
Expand All @@ -104,4 +130,88 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

// Moved to ITBigQueryWriteNonQuotaRetryTest from ITBigQueryWriteManualClientTest, as it requires
// usage of the project this file uses to inject errors (bq-write-api-java-retry-test).
@Test
public void testDefaultRequestLimit()
throws IOException, InterruptedException, ExecutionException {
DatasetId datasetId =
DatasetId.of(NON_QUOTA_RETRY_PROJECT_ID, RemoteBigQueryHelper.generateDatasetName());
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
try {
String tableName = "no_error_table";
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
ProtoSchema schema =
ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProto.newBuilder()
.setName("testProto")
.addField(
FieldDescriptorProto.newBuilder()
.setName("col1")
.setNumber(1)
.setType(FieldDescriptorProto.Type.TYPE_STRING)
.build())
.build())
.build();
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(schema)
.build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
CreateProtoRows(
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
try {
AppendRowsResponse resp = response.get();
LOG.info(
"Message succeded. Dataset info: "
+ datasetInfo.toString()
+ " tableinfo: "
+ tableInfo.toString()
+ " parent: "
+ parent
+ "streamWriter: "
+ streamWriter.toString()
+ "response: "
+ resp);
Assert.fail("Large request should fail with InvalidArgumentError");
} catch (ExecutionException ex) {
LOG.info(
"Message failed. Dataset info: "
+ datasetInfo.toString()
+ " tableinfo: "
+ tableInfo.toString()
+ " parent: "
+ parent
+ "streamWriter: "
+ streamWriter);
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
io.grpc.StatusRuntimeException actualError =
(io.grpc.StatusRuntimeException) ex.getCause();
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
if (actualError.getStatus().getCode() != Code.INTERNAL) {
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
}
} finally {
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
}
}
}