Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
Expand Down Expand Up @@ -108,6 +109,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -762,11 +764,19 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

BasicResultRetryAlgorithm<Void> resultRetryAlgorithm;
if (settings.getEnableRetryInfo()) {
resultRetryAlgorithm = new RetryInfoRetryAlgorithm<>();
} else {
resultRetryAlgorithm = new ApiResultRetryAlgorithm<>();
}

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
resultRetryAlgorithm,
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));

RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

Expand Down Expand Up @@ -1056,8 +1066,14 @@ public Map<String, String> extract(PingAndWarmRequest request) {

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
UnaryCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
innerCallable, unaryCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, unaryCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
Expand All @@ -1067,8 +1083,15 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
ServerStreamingCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);

ServerStreamingCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
innerCallable, serverStreamingCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;
private final boolean enableRetryInfo;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -255,6 +256,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;
enableRetryInfo = builder.enableRetryInfo;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -325,6 +327,15 @@ public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on server
* returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public boolean getEnableRetryInfo() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return enableRetryInfo;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -608,6 +619,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;
private boolean enableRetryInfo;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -641,6 +653,7 @@ private Builder() {
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;
this.enableRetryInfo = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -760,6 +773,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;
enableRetryInfo = settings.enableRetryInfo;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -927,6 +941,25 @@ public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/**
* Sets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public Builder setEnableRetryInfo(boolean enableRetryInfo) {
this.enableRetryInfo = enableRetryInfo;
return this;
}

/**
* Gets if RetryInfo is enabled. If true, client bases retry decision and back off time on
* server returned RetryInfo value. Otherwise, client uses {@link RetrySettings}.
*/
@BetaApi("RetryInfo is not currently stable and may change in the future")
public boolean getEnableRetryInfo() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return enableRetryInfo;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1054,6 +1087,7 @@ public String toString() {
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("enableRetryInfo", enableRetryInfo)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ private void handleAttemptError(Throwable rpcError) {
FailedMutation failedMutation = FailedMutation.create(origIndex, entryError);
allFailures.add(failedMutation);

if (!failedMutation.getError().isRetryable()) {
if (!com.google.cloud.bigtable.gaxx.retrying.ApiException.isRetryable2(
failedMutation.getError())
&& !failedMutation.getError().isRetryable()) {
permanentFailures.add(failedMutation);
} else {
// Schedule the mutation entry for the next RPC by adding it to the request builder and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;

// TODO: move this to gax later
@InternalApi
public class ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ApiExceptions. Also please add a private ctor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the pattern is that the instance class is singular and the helper utility with static methods is plural.
In this case you are adding latter. Eventually when you upstream, you will make it an instance method and it will be in ApiException. But in the transitional state I think plural makes sense


private ApiException() {}

// TODO: this should replace the existing ApiException#isRetryable() method,
// but that cant be done in bigtable, so this lives here for now.
public static boolean isRetryable2(Throwable e) {
if (RetryInfoRetryAlgorithm.extractRetryDelay(e) != null) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import java.util.Collection;

// TODO: remove this once ApiResultRetryAlgorithm is added to gax.
/**
Expand All @@ -48,23 +45,14 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(

UnaryCallSettings<?, ?> settings = callSettings;

if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
ScheduledRetryingExecutor<ResponseT> executor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);

return new RetryingCallable<>(clientContext.getDefaultCallContext(), innerCallable, executor);
}

public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> retrying(
Expand All @@ -73,18 +61,10 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
ClientContext clientContext) {

ServerStreamingCallSettings<RequestT, ResponseT> settings = callSettings;
if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new RetryInfoRetryAlgorithm<>(),
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand All @@ -93,11 +73,4 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}

private static boolean areRetriesDisabled(
Collection<StatusCode.Code> retryableCodes, RetrySettings retrySettings) {
return retrySettings.getMaxAttempts() == 1
|| retryableCodes.isEmpty()
|| (retrySettings.getMaxAttempts() == 0 && retrySettings.getTotalTimeout().isZero());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.retrying;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Durations;
import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.threeten.bp.Duration;

// TODO move this algorithm to gax
/**
* This retry algorithm checks the metadata of an exception for additional error details. If the
* metadata has a RetryInfo field, use the retry delay to set the wait time between attempts.
*/
@InternalApi
public class RetryInfoRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<ResponseT> {

@VisibleForTesting
public static final Metadata.Key<RetryInfo> RETRY_INFO_KEY =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
Duration retryDelay = extractRetryDelay(prevThrowable);
if (retryDelay != null) {
return prevSettings
.toBuilder()
.setRandomizedRetryDelay(retryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.build();
}
return null;
}

/** Returns true if previousThrowable is an {@link ApiException} that is retryable. */
@Override
public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) {
return shouldRetry(null, previousThrowable, previousResponse);
}

/**
* If {@link RetryingContext#getRetryableCodes()} is not null: Returns true if the status code of
* previousThrowable is in the list of retryable code of the {@link RetryingContext}.
*
* <p>Otherwise it returns the result of {@link #shouldRetry(Throwable, Object)}.
*/
@Override
public boolean shouldRetry(
@Nullable RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
if (extractRetryDelay(previousThrowable) != null) {
// First check if server wants us to retry
return true;
}
if (context != null && context.getRetryableCodes() != null) {
// Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list
// of codes that should be retried.
return ((previousThrowable instanceof ApiException)
&& context
.getRetryableCodes()
.contains(((ApiException) previousThrowable).getStatusCode().getCode()));
}
// Server didn't have retry information and there's no retry context, use the local status
// code config.
return previousThrowable instanceof ApiException
&& ((ApiException) previousThrowable).isRetryable();
}

@Nullable
static Duration extractRetryDelay(@Nullable Throwable throwable) {
if (throwable == null) {
return null;
}
Metadata trailers = Status.trailersFromThrowable(throwable);
if (trailers == null) {
return null;
}
RetryInfo retryInfo = trailers.get(RETRY_INFO_KEY);
if (retryInfo == null) {
return null;
}
if (!retryInfo.hasRetryDelay()) {
return null;
}
return Duration.ofMillis(Durations.toMillis(retryInfo.getRetryDelay()));
}
}