Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a flag to add / remove routing cookie from callable chain #2032

Merged
merged 6 commits into from Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -55,14 +55,14 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) {
* COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial
* metadata for the same keys.
*/
void extractCookiesFromMetadata(@Nullable Metadata trailers) {
if (trailers == null) {
void extractCookiesFromMetadata(@Nullable Metadata metadata) {
if (metadata == null) {
return;
}
for (String key : trailers.keys()) {
for (String key : metadata.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
String value = metadata.get(metadataKey);
cookies.put(metadataKey, value);
}
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
Expand Down Expand Up @@ -185,11 +186,14 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

Expand Down Expand Up @@ -371,11 +375,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -411,9 +411,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -485,7 +483,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
withRetries(retrying1, innerSettings);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,7 +566,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
withRetries(withBigtableTracer, settings.sampleRowKeysSettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -607,7 +605,7 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.mutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand All @@ -631,19 +629,25 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
baseCallable,
withCookie,
bulkMutationFlowController,
bulkMutationDynamicFlowControlStats,
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

Expand All @@ -654,9 +658,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -810,7 +812,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -851,8 +853,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -932,16 +933,13 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1010,7 +1008,7 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT>
readChangeStreamUserCallable =
Expand All @@ -1021,10 +1019,7 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -1037,11 +1032,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand All @@ -1062,6 +1053,27 @@ public Map<String, String> extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
ServerStreamingCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingSettings;
Expand Down Expand Up @@ -211,6 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -252,6 +254,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -313,6 +316,15 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -595,6 +607,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -627,6 +640,7 @@ private Builder() {
primedTableIds = ImmutableList.of();
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -745,6 +759,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -893,6 +908,25 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Sets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public Builder setEnableRoutingCookie(boolean enableRoutingCookie) {
this.enableRoutingCookie = enableRoutingCookie;
return this;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1019,6 +1053,7 @@ public String toString() {
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -322,6 +323,7 @@ public void reversed() {
}

@Test
@Ignore("Test taking too long to run, ignore for now")
public void reversedWithForcedResumption() throws IOException, InterruptedException {
assume()
.withMessage("reverse scans are not supported in the emulator")
Expand Down