diff --git a/README.md b/README.md index ba5c85d2f4..53f4d9a837 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.27.0') +implementation platform('com.google.cloud:libraries-bom:26.29.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 94a0278069..09c51663c2 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -970,7 +970,7 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r lock.lock(); try { requestWrapper.retryCount++; - if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { + if (this.retrySettings != null && useBackoffForError(errorCode, streamName)) { // Trigger exponential backoff in append loop when request is resent for quota errors. // createNextAttempt correctly initializes the retry delay; createfirstAttempt does not // include a positive delay, just 0. @@ -1148,6 +1148,17 @@ private boolean isConnectionErrorRetriable(Code statusCode) { || statusCode == Code.DEADLINE_EXCEEDED; } + private boolean useBackoffForError(Code statusCode, String streamName) { + // Default stream uses backoff for INTERNAL, as THROTTLED errors are more likely with default + // streams. RESOURCE_EXHAUSTED streams are used for backoff for each stream type. + if (isDefaultStreamName(streamName)) { + if (statusCode == Code.INTERNAL) { + return true; + } + } + return statusCode == Code.RESOURCE_EXHAUSTED; + } + private void doneCallback(Throwable finalStatus) { log.info( "Received done callback. Stream: " diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 77d7b04e13..b053abdd5f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -2047,6 +2047,44 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except } } + @Test + public void testAppendInternalErrorRetryExponentialBackoff() throws Exception { + StreamWriter writer = getTestStreamWriterRetryEnabled(); + + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(0)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, + com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build())); + + ApiFuture future = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + future.get(); + }); + assertEquals( + Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + + ArrayList instants = testBigQueryWrite.getLatestRequestReceivedInstants(); + Instant previousInstant = instants.get(0); + // Include initial attempt + assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); + double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95; + for (int i = 1; i < instants.size(); i++) { + Instant currentInstant = instants.get(i); + double differenceInMillis = + java.time.Duration.between(previousInstant, currentInstant).toMillis(); + assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); + assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; + previousInstant = currentInstant; + } + } + @Test public void testAppendSuccessAndNonRetryableError() throws Exception { StreamWriter writer = getTestStreamWriterRetryEnabled(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 8db590b396..cfbd9a9efd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -1608,53 +1608,4 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi assertEquals("50", queryIter.next().get(0).getStringValue()); } } - - @Test - public void testDefaultRequestLimit() - throws IOException, InterruptedException, ExecutionException { - DatasetId datasetId = - DatasetId.of("bq-write-api-java-retry-test", RemoteBigQueryHelper.generateDatasetName()); - DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build(); - bigquery.create(datasetInfo); - try { - String tableName = "no_error_table"; - TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName); - Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); - Schema originalSchema = Schema.of(col1); - TableInfo tableInfo = - TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); - bigquery.create(tableInfo); - TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName); - try (StreamWriter streamWriter = - StreamWriter.newBuilder(parent.toString() + "/_default") - .setWriterSchema(CreateProtoSchemaWithColField()) - .build()) { - ApiFuture response = - streamWriter.append( - CreateProtoRows( - new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")})); - try { - response.get(); - Assert.fail("Large request should fail with InvalidArgumentError"); - } catch (ExecutionException ex) { - assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass()); - io.grpc.StatusRuntimeException actualError = - (io.grpc.StatusRuntimeException) ex.getCause(); - // This verifies that the Beam connector can consume this custom exception's grpc - // StatusCode - // TODO(yiru): temp fix to unblock test, while final fix is being rolled out. - if (actualError.getStatus().getCode() != Code.INTERNAL) { - assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); - assertThat( - actualError - .getStatus() - .getDescription() - .contains("AppendRows request too large: 19923131 limit 10485760")); - } - } - } - } finally { - RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString()); - } - } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java index 3493fb0255..61aef2d6aa 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -16,21 +16,38 @@ package com.google.cloud.bigquery.storage.v1.it; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; + +import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.ProtoSchema; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.DescriptorProtos.DescriptorProto; +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; import com.google.protobuf.Descriptors.DescriptorValidationException; +import io.grpc.Status.Code; import java.io.IOException; +import java.util.concurrent.ExecutionException; import java.util.logging.Logger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -80,6 +97,15 @@ public static void afterClass() { } } + ProtoRows CreateProtoRows(String[] messages) { + ProtoRows.Builder rows = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + return rows.build(); + } + @Test public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() throws IOException, InterruptedException, DescriptorValidationException { @@ -104,4 +130,88 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() /* requestCount=*/ 901, /* rowBatchSize=*/ 1); } + + // Moved to ITBigQueryWriteNonQuotaRetryTest from ITBigQueryWriteManualClientTest, as it requires + // usage of the project this file uses to inject errors (bq-write-api-java-retry-test). + @Test + public void testDefaultRequestLimit() + throws IOException, InterruptedException, ExecutionException { + DatasetId datasetId = + DatasetId.of(NON_QUOTA_RETRY_PROJECT_ID, RemoteBigQueryHelper.generateDatasetName()); + DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build(); + bigquery.create(datasetInfo); + try { + String tableName = "no_error_table"; + TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema originalSchema = Schema.of(col1); + TableInfo tableInfo = + TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build(); + bigquery.create(tableInfo); + ProtoSchema schema = + ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProto.newBuilder() + .setName("testProto") + .addField( + FieldDescriptorProto.newBuilder() + .setName("col1") + .setNumber(1) + .setType(FieldDescriptorProto.Type.TYPE_STRING) + .build()) + .build()) + .build(); + TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(parent.toString() + "/_default") + .setWriterSchema(schema) + .build()) { + ApiFuture response = + streamWriter.append( + CreateProtoRows( + new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")})); + try { + AppendRowsResponse resp = response.get(); + LOG.info( + "Message succeded. Dataset info: " + + datasetInfo.toString() + + " tableinfo: " + + tableInfo.toString() + + " parent: " + + parent + + "streamWriter: " + + streamWriter.toString() + + "response: " + + resp); + Assert.fail("Large request should fail with InvalidArgumentError"); + } catch (ExecutionException ex) { + LOG.info( + "Message failed. Dataset info: " + + datasetInfo.toString() + + " tableinfo: " + + tableInfo.toString() + + " parent: " + + parent + + "streamWriter: " + + streamWriter); + assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass()); + io.grpc.StatusRuntimeException actualError = + (io.grpc.StatusRuntimeException) ex.getCause(); + // This verifies that the Beam connector can consume this custom exception's grpc + // StatusCode + // TODO(yiru): temp fix to unblock test, while final fix is being rolled out. + if (actualError.getStatus().getCode() != Code.INTERNAL) { + assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode()); + assertThat( + actualError + .getStatus() + .getDescription() + .contains("AppendRows request too large: 19923131 limit 10485760")); + } + } + } + } finally { + RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString()); + } + } }