Skip to content

Commit

Permalink
feat: add a flag to add / remove routing cookie from callable chain
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 16, 2023
1 parent ccc2764 commit 62d740d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

Expand Down Expand Up @@ -371,9 +373,12 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -411,7 +416,10 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<Query, RowT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -654,7 +662,10 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<BulkMutation, Void> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -938,8 +949,10 @@ public Map<String, String> extract(
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<String, ByteStringRange> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -1021,8 +1034,10 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);
ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesServerStreamingCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand All @@ -1037,9 +1052,12 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);
UnaryCallable<RequestT, ResponseT> withCookie = traced;
if (settings.getEnableRoutingCookie()) {
// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
withCookie = new CookiesUnaryCallable<>(traced);
}

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -252,6 +253,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -313,6 +315,14 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -595,6 +605,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -627,6 +638,7 @@ private Builder() {
primedTableIds = ImmutableList.of();
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -745,6 +757,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -893,6 +906,23 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Sets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public Builder setEnableRoutingCookie(boolean enableRoutingCookie) {
this.enableRoutingCookie = enableRoutingCookie;
return this;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1019,6 +1049,7 @@ public String toString() {
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -83,6 +84,7 @@ public class CookiesHolderTest {

private Server server;
private final FakeService fakeService = new FakeService();
private BigtableDataSettings.Builder settings;
private BigtableDataClient client;
private final List<Metadata> serverMetadata = new ArrayList<>();

Expand Down Expand Up @@ -138,6 +140,8 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
.build())
.setRetryableCodes(StatusCode.Code.UNAVAILABLE);

this.settings = settings;

client = BigtableDataClient.create(settings.build());
}

Expand Down Expand Up @@ -379,7 +383,7 @@ public void sendHeaders(Metadata headers) {
}

@Test
public void testAllMethodsAreCalled() throws InterruptedException {
public void testAllMethodsAreCalled() {
// This test ensures that all methods respect the retry cookie except for the ones that are
// explicitly added to the methods list. It requires that any newly method is exercised in this
// test. This is enforced by introspecting grpc method descriptors.
Expand Down Expand Up @@ -422,6 +426,53 @@ public void testAllMethodsAreCalled() throws InterruptedException {
assertThat(methods).containsExactlyElementsIn(expected);
}

@Test
public void testDisableRoutingCookie() throws IOException {
// This test disables routing cookie in the client settings and ensures that none of the routing
// cookie
// is added.
settings.stubSettings().setEnableRoutingCookie(false);
try (BigtableDataClient client = BigtableDataClient.create(settings.build())) {
client.readRows(Query.create("fake-table")).iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v"));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("key").setCell("cf", "q", "v")));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.sampleRowKeys("fake-table");
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.checkAndMutateRow(
ConditionalRowMutation.create("fake-table", "key")
.then(Mutation.create().setCell("cf", "q", "v")));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.readModifyWriteRow(
ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v"));
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);
fakeService.count.set(0);

client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext();
assertThat(fakeService.count.get()).isEqualTo(2);

assertThat(methods).isEmpty();
}
}

static class FakeService extends BigtableGrpc.BigtableImplBase {

private boolean returnCookie = true;
Expand Down
Loading

0 comments on commit 62d740d

Please sign in to comment.