Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle retry info so client respect the delay server sets #2026

Merged
merged 14 commits into from
Dec 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) {
* COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial
* metadata for the same keys.
*/
void extractCookiesFromMetadata(@Nullable Metadata trailers) {
if (trailers == null) {
void extractCookiesFromMetadata(@Nullable Metadata metadata) {
if (metadata == null) {
return;
}
for (String key : trailers.keys()) {
for (String key : metadata.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
String value = metadata.get(metadataKey);
cookies.put(metadataKey, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
Expand Down Expand Up @@ -107,6 +108,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.retrying.RetryInfoRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -185,11 +187,13 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

Expand Down Expand Up @@ -371,11 +375,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -411,9 +411,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -485,7 +483,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
withRetries(retrying1, innerSettings);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,7 +566,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
withRetries(withBigtableTracer, settings.sampleRowKeysSettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -607,7 +605,7 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.mutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand All @@ -631,19 +629,25 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
baseCallable,
withCookie,
bulkMutationFlowController,
bulkMutationDynamicFlowControlStats,
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

Expand All @@ -654,9 +658,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -760,11 +762,18 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryAlgorithm<Void> retryAlgorithm;
ExponentialRetryAlgorithm exponentialRetryAlgorithm =
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock());
if (settings.getEnableRetryInfo()) {
retryAlgorithm =
new RetryAlgorithm<>(new RetryInfoRetryAlgorithm<>(), exponentialRetryAlgorithm);
} else {
retryAlgorithm =
new RetryAlgorithm<>(new ApiResultRetryAlgorithm<>(), exponentialRetryAlgorithm);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}

RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

Expand Down Expand Up @@ -810,7 +819,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -851,8 +860,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -932,16 +940,13 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1010,7 +1015,7 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT>
readChangeStreamUserCallable =
Expand All @@ -1021,10 +1026,7 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -1037,11 +1039,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand All @@ -1062,6 +1060,38 @@ public Map<String, String> extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying = com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(innerCallable, unaryCallSettings, clientContext);
} else {
retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {

ServerStreamingCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down