Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add RetrySettings use to Write API samples. #2419

Merged
merged 4 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public Builder setCompressorName(String compressorName) {
* Enable client lib automatic retries on request level errors.
*
* <pre>
* Immeidate Retry code:
* Immediate Retry code:
* ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
* Backoff Retry code:
* RESOURCE_EXHAUSTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -37,6 +38,7 @@
import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class JsonWriterStreamCdc {

Expand Down Expand Up @@ -108,6 +110,18 @@ private static void query(String query) {
public static void writeToDefaultStream(
String projectId, String datasetName, String tableName, JSONArray data)
throws DescriptorValidationException, InterruptedException, IOException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
// To use the UPSERT functionality, the table schema needs to be padded with an additional
// column "_change_type".
TableSchema tableSchema =
Expand Down Expand Up @@ -159,7 +173,9 @@ public static void writeToDefaultStream(
// Use the JSON stream writer to send records in JSON format.
TableName parentTable = TableName.of(projectId, datasetName, tableName);
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema)
.setRetrySettings(retrySettings)
.build()) {

ApiFuture<AppendRowsResponse> future = writer.append(data);
// The append method is asynchronous. Rather than waiting for the method to complete,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
Expand All @@ -36,12 +37,12 @@
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class ParallelWriteCommittedStream {

Expand Down Expand Up @@ -157,13 +158,26 @@ private void writeToStream(
lastMetricsSuccessCount = 0;
lastMetricsFailureCount = 0;
}
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(
writeStream.getTableSchema());
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
try (StreamWriter writer =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(protoSchema)
.setRetrySettings(retrySettings)
.setTraceId("SAMPLE:parallel_append")
.build()) {
while (System.currentTimeMillis() < deadlineMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

// [START bigquerystorage_jsonstreamwriter_buffered]
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteBufferedStream {

Expand Down Expand Up @@ -61,11 +63,25 @@ public static void writeBufferedStream(String projectId, String datasetName, Str
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
// Write two batches to the stream, each with 10 JSON records.
for (int i = 0; i < 2; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
Expand All @@ -37,6 +38,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteCommittedStream {

Expand Down Expand Up @@ -113,11 +115,25 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.setRetrySettings(retrySettings)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_pending]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
Expand All @@ -41,6 +41,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WritePendingStream {

Expand Down Expand Up @@ -129,6 +130,19 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();

// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(parentTable.toString())
Expand All @@ -140,7 +154,9 @@ void initialize(TableName parentTable, BigQueryWriteClient client)
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build();
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build();
}

public void append(JSONArray data, long offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package com.example.bigquerystorage;

// [START bigquerystorage_jsonstreamwriter_default]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
Expand All @@ -35,12 +35,9 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
Expand All @@ -49,6 +46,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.threeten.bp.Duration;

public class WriteToDefaultStream {

Expand Down Expand Up @@ -97,7 +95,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
jsonArr.put(record);
}

writer.append(new AppendContext(jsonArr, 0));
writer.append(new AppendContext(jsonArr));
}

// Final cleanup for the stream during worker teardown.
Expand Down Expand Up @@ -130,26 +128,15 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo
private static class AppendContext {

JSONArray data;
int retryCount = 0;

AppendContext(JSONArray data, int retryCount) {
AppendContext(JSONArray data) {
this.data = data;
this.retryCount = retryCount;
}
}

private static class DataWriter {

private static final int MAX_RETRY_COUNT = 3;
private static final int MAX_RECREATE_COUNT = 3;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
Code.ABORTED,
Code.CANCELLED,
Code.FAILED_PRECONDITION,
Code.DEADLINE_EXCEEDED,
Code.UNAVAILABLE);

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
Expand All @@ -163,6 +150,19 @@ private static class DataWriter {

public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Configure in-stream automatic retry settings.
// Error codes that are immediately retried:
// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED
// Error codes that are retried with exponential backoff:
// * RESOURCE_EXHAUSTED
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
Expand All @@ -183,6 +183,7 @@ public void initialize(TableName parentTable)
// column, apply the default value to the missing value field.
.setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE)
.setRetrySettings(retrySettings)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove the retry code down on Line 263. Also check or other writers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Removed the retry logic, modified writers in the other samples, and added a comment to each RetrySettings creation.

.build();
}

Expand Down Expand Up @@ -244,26 +245,6 @@ public void onSuccess(AppendRowsResponse response) {
}

public void onFailure(Throwable throwable) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
done();
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

if (throwable instanceof AppendSerializationError) {
AppendSerializationError ase = (AppendSerializationError) throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
Expand All @@ -282,7 +263,7 @@ public void onFailure(Throwable throwable) {
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
try {
this.parent.append(new AppendContext(dataNew, 0));
this.parent.append(new AppendContext(dataNew));
} catch (DescriptorValidationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
Expand Down
Loading