From 179193a18ba31c4c82946dc48671512fdac335c8 Mon Sep 17 00:00:00 2001 From: Evan Greco Date: Mon, 13 Nov 2023 10:49:36 -0800 Subject: [PATCH] feat: Add integration tests with RetrySettings enabled. (#2275) * Add integration tests with RetrySettings enabled. Initially, these tests are not run automatically as the running Service Account requires permissions on special GCP projects that inject instream errors into streams to test retries. There will be a following nightly build that will run these tests. --------- Co-authored-by: Evan Greco Co-authored-by: Owl Bot --- .kokoro/build.sh | 3 + README.md | 2 +- .../bigquery/storage/v1/ConnectionWorker.java | 5 +- .../storage/v1/ConnectionWorkerPoolTest.java | 8 +- .../it/ITBigQueryWriteNonQuotaRetryTest.java | 107 +++++++++++ .../v1/it/ITBigQueryWriteQuotaRetryTest.java | 107 +++++++++++ .../storage/v1/it/WriteRetryTestUtil.java | 173 ++++++++++++++++++ 7 files changed, 398 insertions(+), 7 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 53e7c2e74b..2a2637e002 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,6 +65,9 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ + -Dit.test=!ITBigQueryWrite*RetryTest \ + -Dsurefire.failIfNoSpecifiedTests=false \ + -Dfailsafe.failIfNoSpecifiedTests=false \ -fae \ verify RETURN_CODE=$? diff --git a/README.md b/README.md index ccf1fd95ae..5ac5115b67 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.26.0') +implementation platform('com.google.cloud:libraries-bom:26.27.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 52373596ce..98825fadd8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -985,14 +985,14 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r Long offset = requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1; if (isDefaultStreamName(streamName) || offset == -1) { - log.fine( + log.info( String.format( "Retrying default stream message in stream %s for in-stream error: %s, retry count:" + " %s", streamName, errorCode, requestWrapper.retryCount)); addMessageToFrontOfWaitingQueue(requestWrapper); } else { - log.fine( + log.info( String.format( "Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry" + " count: %s", @@ -1089,6 +1089,7 @@ private void requestCallback(AppendRowsResponse response) { // Retries need to happen on the same thread as queue locking may occur if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { + log.info("Attempting to retry on error: " + response.getError().toString()); return; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 0724f33546..ccae51dcbd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -389,15 +389,15 @@ public void testCloseExternalClient() // Create some stream writers. List streamWriterList = new ArrayList<>(); for (int i = 0; i < 4; i++) { - StreamWriter sw = + streamWriterList.add( StreamWriter.newBuilder( String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i), externalClient) + .setEnableConnectionPool(true) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) - .setEnableConnectionPool(true) - .build(); - streamWriterList.add(sw); + .setLocation("us") + .build()); } for (long i = 0; i < appendCount; i++) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java new file mode 100644 index 0000000000..3493fb0255 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteNonQuotaRetryTest { + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject INTERNAL in-stream errors every + // 10 messages. This is done to verify in-stream message retries. + private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; + private static BigQueryWriteClient client; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } + + @Test + public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java new file mode 100644 index 0000000000..f567ca5487 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteQuotaRetryTest { + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject RESOURCE_EXHAUSTED in-stream errors every + // 10 messages. This is done to verify in-stream message retries. + private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry"; + private static BigQueryWriteClient client; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } + + @Test + public void testJsonStreamWriterDefaultStreamWithQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java new file mode 100644 index 0000000000..44d3dd307a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java @@ -0,0 +1,173 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertFalse; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.threeten.bp.Duration; + +public class WriteRetryTestUtil { + private static final Logger LOG = + Logger.getLogger( + com.google.cloud.bigquery.storage.v1.it.ITBigQueryWriteQuotaRetryTest.class.getName()); + + public static void runExclusiveRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + WriteStream.Type streamType, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "RetryTest"; + TableId tableId = TableId.of(dataset, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(WriteStream.newBuilder().setType(streamType).build()) + .build()); + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatchSize)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatchSize); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + public static void runDefaultRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(dataset, tableName), + StandardTableDefinition.of( + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +}