Skip to content
Permalink
Browse files
feat: retry certain RESOURCE_EXHAUSTED errors observed during ReadRow…
…s and report retry attempts (#1257)

Bq Storage Read service will start returning a retryable RESOURCE_EXHAUSTED error in the next few weeks when a read session's parallelism is considered to be excessive, so this PR expands retry handling logic for ReadRows with 2 changes:
1. If a ReadRows request fails with a RESOURCE_EXHAUSTED error and the error has an associated RetryInfo, it is now considered to be retryable and retry delay is set according to the RetryInfo.
1. If the client decides to retry, it now notifies the user with the provided RetryAttemptListener object. This will be useful as a negative feedback mechanism for future SplitReadStream requests which in return will reduce the likelihood of receiving the new retryable RESOURCE_EXHAUSTED error.
  • Loading branch information
esert-g committed Aug 24, 2021
1 parent 0edb25d commit d56e1caf91297d7c2e1e4a9ce1463c04e44619c0
Showing with 731 additions and 26 deletions.
  1. +41 −0 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java
  2. +3 −1 ...-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java
  3. +33 −1 ...loud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java
  4. +17 −3 ...uerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java
  5. +31 −4 ...age/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java
  6. +3 −1 ...igquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java
  7. +33 −1 ...querystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java
  8. +14 −2 ...age/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java
  9. +32 −4 ...rc/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java
  10. +3 −1 ...d-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java
  11. +33 −1 ...bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java
  12. +17 −3 ...torage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java
  13. +31 −4 ...rc/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java
  14. +80 −0 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java
  15. +120 −0 ...ud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java
  16. +120 −0 ...erystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java
  17. +120 −0 ...gquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java
@@ -15,12 +15,53 @@
*/
package com.google.cloud.bigquery.storage.util;

import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** Static utility methods for working with Errors returned from the service. */
public class Errors {
private Errors() {};

public static class IsRetryableStatusResult {
public boolean isRetryable = false;
public Duration retryDelay = null;
}

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

/**
* Returns true iff the Status indicates an error that is retryable.
*
* <p>Generally, internal errors are not considered retryable, however there are certain transient
* network issues that appear as internal but are in fact retryable.
*
* <p>Resource exhausted errors are only considered retryable if metadata contains a serialized
* RetryInfo object.
*/
public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) {
IsRetryableStatusResult result = new IsRetryableStatusResult();

result.isRetryable = isRetryableInternalStatus(status);
if (!result.isRetryable
&& status.getCode() == Status.Code.RESOURCE_EXHAUSTED
&& metadata != null
&& metadata.containsKey(KEY_RETRY_INFO)) {
RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
if (retryInfo.hasRetryDelay()) {
result.isRetryable = true;
result.retryDelay =
Duration.ofSeconds(
retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos());
}
}

return result;
}

/**
* Returns true iff the Status indicates and internal error that is retryable.
*
@@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
*/
protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryReadStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
@@ -27,6 +27,8 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

@@ -69,6 +71,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to splitReadStream. */
public UnaryCallSettings<SplitReadStreamRequest, SplitReadStreamResponse>
splitReadStreamSettings() {
@@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
@@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods(

@Override
public BigQueryReadSettings build() throws IOException {
return new BigQueryReadSettings(this);
BigQueryReadSettings settings = new BigQueryReadSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
@@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryReadStub stub;
private final BigQueryReadStubSettings stubSettings;
private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryReadStub create(
EnhancedBigQueryReadStubSettings settings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryReadStubSettings.Builder baseSettingsBuilder =
BigQueryReadStubSettings.newBuilder()
@@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryReadStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryReadStub(
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
GrpcBigQueryReadStub stub,
BigQueryReadStubSettings stubSettings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

@@ -123,7 +137,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
@@ -21,6 +21,8 @@
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import org.threeten.bp.Duration;

@@ -30,17 +32,41 @@
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener;

public ApiResultRetryAlgorithm() {
this(null);
}

public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) {
super();
this.retryAttemptListener = retryAttemptListener;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata);
if (result.isRetryable) {
// If result.retryDelay isn't null, we know exactly how long we must wait, so both regular
// and randomized delays are the same.
Duration retryDelay = result.retryDelay;
Duration randomizedRetryDelay = result.retryDelay;
if (retryDelay == null) {
retryDelay = prevSettings.getRetryDelay();
randomizedRetryDelay = DEADLINE_SLEEP_DURATION;
}
if (retryAttemptListener != null) {
retryAttemptListener.onRetryAttempt(status, metadata);
}
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRetryDelay(retryDelay)
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
.setRandomizedRetryDelay(randomizedRetryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
@@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
if (Errors.isRetryableStatus(status, metadata).isRetryable) {
return true;
}
}
@@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu
*/
protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryStorageStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
@@ -37,6 +37,8 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings;
import com.google.protobuf.Empty;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

@@ -78,6 +80,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */
public UnaryCallSettings<
BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
@@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
@@ -229,7 +259,9 @@ public UnaryCallSettings.Builder<FinalizeStreamRequest, Empty> finalizeStreamSet

@Override
public BigQueryStorageSettings build() throws IOException {
return new BigQueryStorageSettings(this);
BigQueryStorageSettings settings = new BigQueryStorageSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
@@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
@@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryStorageStub stub;
private final BigQueryStorageStubSettings stubSettings;
private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryStorageStub create(
EnhancedBigQueryStorageStubSettings settings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryStorageStubSettings.Builder baseSettingsBuilder =
BigQueryStorageStubSettings.newBuilder()
@@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryStorageStub(
GrpcBigQueryStorageStub stub,
BigQueryStorageStubSettings stubSettings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

@@ -145,7 +157,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =

0 comments on commit d56e1ca

Please sign in to comment.