diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 53e7c2e74b..2a2637e002 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -65,6 +65,9 @@ integration) -DtrimStackTrace=false \ -Dclirr.skip=true \ -Denforcer.skip=true \ + -Dit.test=!ITBigQueryWrite*RetryTest \ + -Dsurefire.failIfNoSpecifiedTests=false \ + -Dfailsafe.failIfNoSpecifiedTests=false \ -fae \ verify RETURN_CODE=$? diff --git a/README.md b/README.md index ccf1fd95ae..5ac5115b67 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.26.0') +implementation platform('com.google.cloud:libraries-bom:26.27.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 52373596ce..98825fadd8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -985,14 +985,14 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r Long offset = requestWrapper.message.hasOffset() ? requestWrapper.message.getOffset().getValue() : -1; if (isDefaultStreamName(streamName) || offset == -1) { - log.fine( + log.info( String.format( "Retrying default stream message in stream %s for in-stream error: %s, retry count:" + " %s", streamName, errorCode, requestWrapper.retryCount)); addMessageToFrontOfWaitingQueue(requestWrapper); } else { - log.fine( + log.info( String.format( "Retrying exclusive message in stream %s at offset %d for in-stream error: %s, retry" + " count: %s", @@ -1089,6 +1089,7 @@ private void requestCallback(AppendRowsResponse response) { // Retries need to happen on the same thread as queue locking may occur if (response.hasError()) { if (retryOnRetryableError(Code.values()[response.getError().getCode()], requestWrapper)) { + log.info("Attempting to retry on error: " + response.getError().toString()); return; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 0724f33546..ccae51dcbd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -389,15 +389,15 @@ public void testCloseExternalClient() // Create some stream writers. List streamWriterList = new ArrayList<>(); for (int i = 0; i < 4; i++) { - StreamWriter sw = + streamWriterList.add( StreamWriter.newBuilder( String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i), externalClient) + .setEnableConnectionPool(true) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) - .setEnableConnectionPool(true) - .build(); - streamWriterList.add(sw); + .setLocation("us") + .build()); } for (long i = 0; i < appendCount; i++) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java new file mode 100644 index 0000000000..3493fb0255 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteNonQuotaRetryTest { + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject INTERNAL in-stream errors every + // 10 messages. This is done to verify in-stream message retries. + private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test"; + private static BigQueryWriteClient client; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } + + @Test + public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + NON_QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java new file mode 100644 index 0000000000..f567ca5487 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Integration tests for BigQuery Write API. */ +public class ITBigQueryWriteQuotaRetryTest { + private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + // This project is configured on the server to inject RESOURCE_EXHAUSTED in-stream errors every + // 10 messages. This is done to verify in-stream message retries. + private static final String QUOTA_RETRY_PROJECT_ID = "bq-writeapi-java-quota-retry"; + private static BigQueryWriteClient client; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build()))) + .build(); + bigquery.create(tableInfo); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testJsonStreamWriterCommittedStreamWithQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runExclusiveRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + WriteStream.Type.COMMITTED, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } + + @Test + public void testJsonStreamWriterDefaultStreamWithQuotaRetry() + throws IOException, InterruptedException, DescriptorValidationException { + WriteRetryTestUtil.runDefaultRetryTest( + bigquery, + client, + DATASET, + QUOTA_RETRY_PROJECT_ID, + /* requestCount=*/ 901, + /* rowBatchSize=*/ 1); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java new file mode 100644 index 0000000000..44d3dd307a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/WriteRetryTestUtil.java @@ -0,0 +1,173 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertFalse; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.threeten.bp.Duration; + +public class WriteRetryTestUtil { + private static final Logger LOG = + Logger.getLogger( + com.google.cloud.bigquery.storage.v1.it.ITBigQueryWriteQuotaRetryTest.class.getName()); + + public static void runExclusiveRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + WriteStream.Type streamType, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "RetryTest"; + TableId tableId = TableId.of(dataset, tableName); + Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build(); + Schema schema = Schema.of(col1); + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream(WriteStream.newBuilder().setType(streamType).build()) + .build()); + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatchSize)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + Assert.assertEquals( + allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatchSize); + } catch (ExecutionException ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } + + public static void runDefaultRetryTest( + BigQuery bigquery, + BigQueryWriteClient client, + String dataset, + String projectId, + int requestCount, + int rowBatchSize) + throws IOException, InterruptedException, DescriptorValidationException { + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + String tableName = "JsonTableDefaultStream"; + TableFieldSchema TEST_STRING = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("test_str") + .build(); + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build(); + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(dataset, tableName), + StandardTableDefinition.of( + Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build()))) + .build(); + + bigquery.create(tableInfo); + TableName parent = TableName.of(projectId, dataset, tableName); + + ArrayList> allResponses = new ArrayList<>(requestCount); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .setRetrySettings(retrySettings) + .build()) { + for (int k = 0; k < requestCount; k++) { + JSONObject row = new JSONObject(); + row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + JSONArray jsonArr = new JSONArray(); + // 3MB batch. + for (int j = 0; j < rowBatchSize; j++) { + jsonArr.put(row); + } + LOG.info("Appending: " + k + "/" + requestCount); + allResponses.add(jsonStreamWriter.append(jsonArr)); + } + LOG.info("Waiting for all responses to come back"); + for (int i = 0; i < requestCount; i++) { + LOG.info("Waiting for request " + i); + try { + assertFalse(allResponses.get(i).get().hasError()); + } catch (Exception ex) { + Assert.fail("Unexpected error " + ex); + } + } + } + } +}