Skip to content

Commit

Permalink
feat: send feature flag when flow control is enabled (#1731)
Browse files Browse the repository at this point in the history
* feat: send feature flag when flow control is enabled

* address comment

* update
  • Loading branch information
mutianf committed May 2, 2023
1 parent b518d68 commit ba147c3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
Expand Up @@ -32,6 +32,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.auth.Credentials;
import com.google.bigtable.v2.FeatureFlags;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.Version;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
Expand All @@ -50,7 +51,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -221,6 +225,8 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
readChangeStreamSettings;
private final UnaryCallSettings<PingAndWarmRequest, Void> pingAndWarmSettings;

private final FeatureFlags featureFlags;

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

Expand Down Expand Up @@ -259,6 +265,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
builder.generateInitialChangeStreamPartitionsSettings.build();
readChangeStreamSettings = builder.readChangeStreamSettings.build();
pingAndWarmSettings = builder.pingAndWarmSettings.build();
featureFlags = builder.featureFlags.build();
}

/** Create a new builder. */
Expand Down Expand Up @@ -598,6 +605,8 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
readChangeStreamSettings;
private final UnaryCallSettings.Builder<PingAndWarmRequest, Void> pingAndWarmSettings;

private FeatureFlags.Builder featureFlags;

/**
* Initializes a new Builder with sane defaults for all settings.
*
Expand All @@ -621,16 +630,6 @@ private Builder() {
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

// Inject the UserAgent in addition to api-client header
Map<String, String> headers =
ImmutableMap.<String, String>builder()
.putAll(
BigtableStubSettings.defaultApiClientHeaderProviderBuilder().build().getHeaders())
// GrpcHeaderInterceptor treats the `user-agent` as a magic string
.put("user-agent", "bigtable-java/" + Version.VERSION)
.build();
setInternalHeaderProvider(FixedHeaderProvider.create(headers));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();

Expand Down Expand Up @@ -729,6 +728,8 @@ private Builder() {
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());

featureFlags = FeatureFlags.newBuilder();
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand All @@ -753,6 +754,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
settings.generateInitialChangeStreamPartitionsSettings.toBuilder();
readChangeStreamSettings = settings.readChangeStreamSettings.toBuilder();
pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder();
featureFlags = settings.featureFlags.toBuilder();
}
// <editor-fold desc="Private Helpers">

Expand Down Expand Up @@ -970,6 +972,34 @@ public EnhancedBigtableStubSettings build() {
BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId));
this.setTransportChannelProvider(channelProviderBuilder.build());
}

if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
// only set mutate rows feature flag when this feature is enabled
featureFlags.setMutateRowsRateLimit(true);
}

// Serialize the web64 encode the bigtable feature flags
ByteArrayOutputStream boas = new ByteArrayOutputStream();
try {
featureFlags.build().writeTo(boas);
} catch (IOException e) {
throw new IllegalStateException(
"Unexpected IOException while serializing feature flags", e);
}
byte[] serializedFlags = boas.toByteArray();
byte[] encodedFlags = Base64.getUrlEncoder().encode(serializedFlags);

// Inject the UserAgent in addition to api-client header
Map<String, String> headers =
ImmutableMap.<String, String>builder()
.putAll(
BigtableStubSettings.defaultApiClientHeaderProviderBuilder().build().getHeaders())
// GrpcHeaderInterceptor treats the `user-agent` as a magic string
.put("user-agent", "bigtable-java/" + Version.VERSION)
.put("bigtable-features", new String(encodedFlags, StandardCharsets.UTF_8))
.build();
setInternalHeaderProvider(FixedHeaderProvider.create(headers));

return new EnhancedBigtableStubSettings(this);
}
// </editor-fold>
Expand Down
Expand Up @@ -831,8 +831,8 @@ public void testToString() {
nonStaticFields++;
}
}
// failure will signal about adding a new settings property
assertThat(SETTINGS_LIST.length).isEqualTo(nonStaticFields);
// failure will signal about adding a new settings property - feature flag field
assertThat(SETTINGS_LIST.length).isEqualTo(nonStaticFields - 1);
}

void checkToString(EnhancedBigtableStubSettings settings) {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.FeatureFlags;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
Expand All @@ -45,6 +46,7 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -77,6 +79,7 @@
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -486,6 +489,45 @@ public void testCallContextPropagatedInReadBatcher()
}
}

@Test
public void testBulkMutationFlowControlFeatureFlagIsSet() throws Exception {
BulkMutation bulkMutation =
BulkMutation.create("my-table")
.add(RowMutationEntry.create("row-key").setCell("cf", "q", "value"));

// Test the header is set when the feature is enabled
EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder();
settings.bulkMutateRowsSettings().setServerInitiatedFlowControl(true);
EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build());
stub.bulkMutateRowsCallable().call(bulkMutation);
assertThat(metadataInterceptor.headers).hasSize(1);
Metadata metadata = metadataInterceptor.headers.take();
String encodedFlags =
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
byte[] decodedFlags = Base64.getDecoder().decode(encodedFlags);
FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags);
assertThat(featureFlags.getMutateRowsRateLimit()).isTrue();
}

@Test
public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception {
BulkMutation bulkMutation =
BulkMutation.create("my-table")
.add(RowMutationEntry.create("row-key").setCell("cf", "q", "value"));

EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder();
settings.bulkMutateRowsSettings().setServerInitiatedFlowControl(false);
EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build());
stub.bulkMutateRowsCallable().call(bulkMutation);
assertThat(metadataInterceptor.headers).hasSize(1);
Metadata metadata = metadataInterceptor.headers.take();
String encodedFlags =
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
byte[] decodedFlags = Base64.getDecoder().decode(encodedFlags);
FeatureFlags featureFlags = FeatureFlags.parseFrom(decodedFlags);
assertThat(featureFlags.getMutateRowsRateLimit()).isFalse();
}

private static class MetadataInterceptor implements ServerInterceptor {
final BlockingQueue<Metadata> headers = Queues.newLinkedBlockingDeque();

Expand Down

0 comments on commit ba147c3

Please sign in to comment.