From 2ce48f1032199494b35ce3997ec7ea0f425fa4f1 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 28 Aug 2020 11:54:15 -0400 Subject: [PATCH 1/8] feat: extend channel priming logic to also send fake requests --- .../data/v2/internal/RefreshChannel.java | 54 ------- .../data/v2/stub/BigtableChannelPrimer.java | 132 ++++++++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 127 ++++++++++------- .../v2/stub/EnhancedBigtableStubSettings.java | 31 ++-- .../data/v2/internal/RefreshChannelTest.java | 38 ----- .../data/v2/stub/ChannelPrimingTest.java | 129 +++++++++++++++++ .../v2/stub/metrics/MetricsTracerTest.java | 13 +- 7 files changed, 364 insertions(+), 160 deletions(-) delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java deleted file mode 100644 index e34ecd750d..0000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.internal; - -import com.google.api.core.BetaApi; -import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.ChannelPrimer; -import io.grpc.ConnectivityState; -import io.grpc.ManagedChannel; -import java.util.concurrent.TimeUnit; - -/** - * Establish a connection to the Cloud Bigtable service on managedChannel - * - *

This class is considered an internal implementation detail and not meant to be used by - * applications. - */ -@BetaApi("This API depends on gRPC experimental API") -@InternalApi -public final class RefreshChannel implements ChannelPrimer { - - /** - * primeChannel establishes a connection to Cloud Bigtable service. This typically take less than - * 1s. In case of service failure, an upper limit of 10s prevents primeChannel from looping - * forever. - */ - @Override - public void primeChannel(ManagedChannel managedChannel) { - for (int i = 0; i < 10; i++) { - ConnectivityState connectivityState = managedChannel.getState(true); - if (connectivityState == ConnectivityState.READY) { - break; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - break; - } - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java new file mode 100644 index 0000000000..ab6b0c2c7b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -0,0 +1,132 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; + +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.ChannelPrimer; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.FixedWatchdogProvider; +import com.google.auth.Credentials; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.ImmutableList; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.threeten.bp.Duration; + +/** + * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to + * the active {@link com.google.api.gax.grpc.ChannelPool}. + */ +@BetaApi() +class BigtableChannelPrimer implements ChannelPrimer { + private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); + + EnhancedBigtableStubSettings settingsTemplate; + List tableIds; + + static BigtableChannelPrimer create(Credentials credentials, String projectId, String instanceId, String appProfileId, List tableIds) { + EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() + .setProjectId(projectId) + .setInstanceId(instanceId) + .setAppProfileId(appProfileId) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .setExecutorProvider( + InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(1) + .build() + ) + // Disable watchdog creation - its unnecessary. + .setStreamWatchdogProvider(FixedWatchdogProvider.create(null)); + + // Disable retries for priming request + Duration timeout = Duration.ofSeconds(1); + builder.readRowSettings().setRetrySettings( + builder.readRowSettings().getRetrySettings().toBuilder() + .setMaxAttempts(0) + .setJittered(false) + .setInitialRpcTimeout(timeout) + .setMaxRpcTimeout(timeout) + .setTotalTimeout(timeout) + .build() + ); + return new BigtableChannelPrimer(builder.build(), tableIds); + } + private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + this.settingsTemplate = settingsTemplate; + this.tableIds = ImmutableList.copyOf(tableIds); + } + + @Override + public void primeChannel(ManagedChannel managedChannel) { + try { + primeChannelUnsafe(managedChannel); + } catch (IOException|RuntimeException e) { + LOG.warning(String.format("Unexpected error while trying to prime the channel: %s", e)); + } + } + + private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { + if (tableIds.isEmpty()) { + waitForChannelReady(managedChannel); + } else { + sendPrimeRequests(managedChannel); + } + } + + void waitForChannelReady(ManagedChannel managedChannel) { + for (int i = 0; i < 10; i++) { + ConnectivityState connectivityState = managedChannel.getState(true); + if (connectivityState == ConnectivityState.READY) { + break; + } + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } + + void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { + // Wrap the channel in a temporary stub + EnhancedBigtableStubSettings primingSettings = settingsTemplate.toBuilder() + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(managedChannel) + ) + ).build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { + Map> primeFutures = new HashMap<>(); + + // Prime all of the table ids in parallel + for (String tableId : tableIds) { + ApiFuture f = stub.readRowCallable() + .futureCall(Query.create(tableId).filter(FILTERS.block())); + + primeFutures.put(tableId, f); + } + + // Wait for all of the prime requests to complete. + for (Map.Entry> entry : primeFutures.entrySet()) { + try { + entry.getValue().get(); + } catch (Throwable e) { + LOG.warning(String + .format("Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + } + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 8d9d2fc70c..bcae63d274 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -20,10 +20,12 @@ import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; @@ -38,6 +40,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.api.gax.tracing.TracedUnaryCallable; +import com.google.auth.Credentials; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.CheckAndMutateRowRequest; import com.google.bigtable.v2.CheckAndMutateRowResponse; @@ -120,65 +123,85 @@ public class EnhancedBigtableStub implements AutoCloseable { public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { - ClientContext clientContext = ClientContext.create(settings); + settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder()); - return new EnhancedBigtableStub( - settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder()); + return new EnhancedBigtableStub(settings, ClientContext.create(settings)); + } + + @InternalApi("Visible for testing") + public static EnhancedBigtableStubSettings finalizeSettings(EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) + throws IOException { + EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); + if (settings.isRefreshingChannel()) { + // Fix the credentials so that they can be shared + Credentials credentials = null; + if (settings.getCredentialsProvider() != null) { + credentials = settings.getCredentialsProvider().getCredentials(); + } + builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + // Inject the primer + InstantiatingGrpcChannelProvider transportProvider = (InstantiatingGrpcChannelProvider)settings.getTransportChannelProvider(); + + builder.setTransportChannelProvider( + transportProvider.toBuilder() + .setChannelPrimer(BigtableChannelPrimer.create(credentials, settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId(), settings.getPrimedTableIds())) + .build() + ); + } + + // Inject Opencensus instrumentation + builder.setTracerFactory( + new CompositeTracerFactory( + ImmutableList.of( + // Add OpenCensus Tracing + new OpencensusTracerFactory( + ImmutableMap.builder() + // Annotate traces with the same tags as metrics + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), + settings.getProjectId()) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), + settings.getInstanceId()) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), + settings.getAppProfileId()) + // Also annotate traces with library versions + .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) + .put("grpc", GaxGrpcProperties.getGrpcVersion()) + .put( + "gapic", + GaxProperties.getLibraryVersion( + EnhancedBigtableStubSettings.class)) + .build()), + // Add OpenCensus Metrics + MetricsTracerFactory.create( + tagger, + stats, + ImmutableMap.builder() + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID, + TagValue.create(settings.getProjectId())) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID, + TagValue.create(settings.getInstanceId())) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, + TagValue.create(settings.getAppProfileId())) + .build()), + // Add user configured tracer + settings.getTracerFactory()))); + + return builder.build(); } @InternalApi("Visible for testing") public EnhancedBigtableStub( EnhancedBigtableStubSettings settings, - ClientContext clientContext, - Tagger tagger, - StatsRecorder statsRecorder) { + ClientContext clientContext) { this.settings = settings; - - this.clientContext = - clientContext - .toBuilder() - .setTracerFactory( - new CompositeTracerFactory( - ImmutableList.of( - // Add OpenCensus Tracing - new OpencensusTracerFactory( - ImmutableMap.builder() - // Annotate traces with the same tags as metrics - .put( - RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), - settings.getProjectId()) - .put( - RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), - settings.getInstanceId()) - .put( - RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), - settings.getAppProfileId()) - // Also annotate traces with library versions - .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) - .put("grpc", GaxGrpcProperties.getGrpcVersion()) - .put( - "gapic", - GaxProperties.getLibraryVersion( - EnhancedBigtableStubSettings.class)) - .build()), - // Add OpenCensus Metrics - MetricsTracerFactory.create( - tagger, - statsRecorder, - ImmutableMap.builder() - .put( - RpcMeasureConstants.BIGTABLE_PROJECT_ID, - TagValue.create(settings.getProjectId())) - .put( - RpcMeasureConstants.BIGTABLE_INSTANCE_ID, - TagValue.create(settings.getInstanceId())) - .put( - RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, - TagValue.create(settings.getAppProfileId())) - .build()), - // Add user configured tracer - clientContext.getTracerFactory()))) - .build(); + this.clientContext = clientContext; this.requestContext = RequestContext.create( settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 1906228a30..b7f8cc2d5f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -28,7 +28,6 @@ import com.google.api.gax.rpc.StubSettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.cloud.bigtable.data.v2.internal.RefreshChannel; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -150,6 +149,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -188,6 +188,7 @@ private EnhancedBigtableStubSettings(Builder builder) { instanceId = builder.instanceId; appProfileId = builder.appProfileId; isRefreshingChannel = builder.isRefreshingChannel; + primedTableIds = builder.primedTableIds; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -226,6 +227,9 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } + @BetaApi("Channel priming is not currently stable and might change in the future") + public List getPrimedTableIds() { return primedTableIds; } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -483,6 +487,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -505,6 +510,7 @@ public static class Builder extends StubSettings.Builder getPrimedTableIds() { return primedTableIds; } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -760,17 +782,10 @@ public EnhancedBigtableStubSettings build() { Preconditions.checkState(projectId != null, "Project id must be set"); Preconditions.checkState(instanceId != null, "Instance id must be set"); - // Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh - // connections to Cloud Bigtable service if (isRefreshingChannel) { Preconditions.checkArgument( getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider, "refreshingChannel only works with InstantiatingGrpcChannelProviders"); - InstantiatingGrpcChannelProvider.Builder channelBuilder = - ((InstantiatingGrpcChannelProvider) getTransportChannelProvider()) - .toBuilder() - .setChannelPrimer(new RefreshChannel()); - setTransportChannelProvider(channelBuilder.build()); } return new EnhancedBigtableStubSettings(this); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java deleted file mode 100644 index c41fa4d2a5..0000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.internal; - -import io.grpc.ConnectivityState; -import io.grpc.ManagedChannel; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -@RunWith(JUnit4.class) -public class RefreshChannelTest { - // RefreshChannel should establish connection to the server through managedChannel.getState(true) - @Test - public void testGetStateIsCalled() { - RefreshChannel refreshChannel = new RefreshChannel(); - ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class); - - Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true); - - refreshChannel.primeChannel(managedChannel); - Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true); - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java new file mode 100644 index 0000000000..9481bb6d81 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java @@ -0,0 +1,129 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.FakeServiceGrpc.FakeServiceImplBase; +import com.google.auth.Credentials; +import com.google.auth.oauth2.IdToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class ChannelPrimingTest { + @Rule + public final MockitoRule mockitoJUnit = MockitoJUnit.rule(); + + private Server server; + private FakeService fakeService; + private EnhancedBigtableStub stub; + private FakeCrendentialsProvider fakeCredsProvider; + + @Before + public void setup() throws IOException { + final int port; + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } + + server = ServerBuilder.forPort(port) + .addService(new FakeService()) + .build(); + + fakeCredsProvider = new FakeCrendentialsProvider(); + + EnhancedBigtableStubSettings settings = EnhancedBigtableStubSettings.newBuilder() + .setProjectId("my-project") + .setInstanceId("my-instance") + .setAppProfileId("my-app-profile") + .setRefreshingChannel(true) + .setPrimedTableId("table1", "table2") + .setCredentialsProvider(fakeCredsProvider) + .setEndpoint("localhost:" + port) + .build(); + + stub = EnhancedBigtableStub.create(settings); + } + + @After + public void teardown() { + if (stub != null) { + stub.close(); + } + if (server != null) { + server.shutdown(); + } + } + + @Test + public void testMoo() { + + } + + static class FakeCrendentialsProvider implements CredentialsProvider { + private AtomicInteger generation = new AtomicInteger(); + + @Override + public Credentials getCredentials() throws IOException { + return new FakeCredentials("generation" + generation.getAndIncrement()); + } + } + + static class FakeCredentials extends Credentials { + private final String token; + + FakeCredentials(String token) { + this.token = token; + } + + @Override + public String getAuthenticationType() { + return "fake"; + } + + @Override + public Map> getRequestMetadata(URI uri) throws IOException { + return ImmutableMap.>of("key", ImmutableList.of(token)); + } + + @Override + public boolean hasRequestMetadata() { + return true; + } + + @Override + public boolean hasRequestMetadataOnly() { + return true; + } + + @Override + public void refresh() throws IOException { + } + } + + static class FakeService extends BigtableGrpc.BigtableImplBase { + List requests = new ArrayList<>(); + + + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 9fd78c5cd5..369c846d57 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -120,14 +120,11 @@ public void setUp() throws Exception { .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) .build(); - EnhancedBigtableStubSettings stubSettings = settings.getStubSettings(); - - stub = - new EnhancedBigtableStub( - stubSettings, - ClientContext.create(stubSettings), - Tags.getTagger(), - localStats.getStatsRecorder()); + EnhancedBigtableStubSettings stubSettings = EnhancedBigtableStub.finalizeSettings( + settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); + + + stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); } @After From 411445385668e6f79a3e19652ff05efb5b41fa9b Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 28 Aug 2020 17:41:31 -0400 Subject: [PATCH 2/8] finish --- .../data/v2/BigtableDataSettings.java | 38 +++- .../data/v2/stub/BigtableChannelPrimer.java | 108 ++++++---- .../data/v2/stub/EnhancedBigtableStub.java | 30 +-- .../v2/stub/EnhancedBigtableStubSettings.java | 23 +- .../v2/stub/BigtableChannelPrimerTest.java | 196 ++++++++++++++++++ .../data/v2/stub/ChannelPrimingTest.java | 129 ------------ .../v2/stub/metrics/MetricsTracerTest.java | 6 +- 7 files changed, 330 insertions(+), 200 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index 437ebc653b..8dd0fa6d97 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.base.Strings; import io.grpc.ManagedChannelBuilder; +import java.util.List; import java.util.logging.Logger; import javax.annotation.Nonnull; @@ -185,11 +186,20 @@ public String getAppProfileId() { } /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public boolean isRefreshingChannel() { return stubSettings.isRefreshingChannel(); } + /** + * Gets the table ids that will be used to send warmup requests when {@link + * #isRefreshingChannel()} is enabled. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public List getPrimingTableIds() { + return stubSettings.getPrimedTableIds(); + } + /** Returns the underlying RPC settings. */ public EnhancedBigtableStubSettings getStubSettings() { return stubSettings; @@ -307,18 +317,40 @@ public CredentialsProvider getCredentialsProvider() { * connections, which causes the client to renegotiate the gRPC connection in the request path, * which causes periodic spikes in latency */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public Builder setRefreshingChannel(boolean isRefreshingChannel) { stubSettings.setRefreshingChannel(isRefreshingChannel); return this; } /** Gets if channels will gracefully refresh connections to Cloud Bigtable service */ - @BetaApi("This API depends on experimental gRPC APIs") + @BetaApi("Channel priming is not currently stable and may change in the future") public boolean isRefreshingChannel() { return stubSettings.isRefreshingChannel(); } + /** + * Configure the tables that can be used to prime a channel during a refresh. + * + *

These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a + * channel is refreshed, it will send a request to each table to warm up the serverside caches + * before admitting the new channel into the channel pool. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public Builder setPrimingTableIds(String... tableIds) { + stubSettings.setPrimedTableIds(tableIds); + return this; + } + + /** + * Gets the table ids that will be used to send warmup requests when {@link + * #setRefreshingChannel(boolean)} is enabled. + */ + @BetaApi("Channel priming is not currently stable and may change in the future") + public List getPrimingTableIds() { + return stubSettings.getPrimedTableIds(); + } + /** * Returns the underlying settings for making RPC calls. The settings should be changed with * care. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index ab6b0c2c7b..5dd1106e8c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -2,6 +2,7 @@ import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; +import com.google.api.client.util.Preconditions; import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.gax.core.FixedCredentialsProvider; @@ -9,17 +10,18 @@ import com.google.api.gax.grpc.ChannelPrimer; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; -import com.google.api.gax.rpc.FixedWatchdogProvider; import com.google.auth.Credentials; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.threeten.bp.Duration; @@ -27,42 +29,55 @@ /** * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to * the active {@link com.google.api.gax.grpc.ChannelPool}. + * + *

This implementation is subject to change in the future, but currently it will prime the + * channel by sending a ReadRow request for a hardcoded, non-existent row key. */ -@BetaApi() +@BetaApi("Channel priming is not currently stable and might change in the future") class BigtableChannelPrimer implements ChannelPrimer { private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); - EnhancedBigtableStubSettings settingsTemplate; - List tableIds; - - static BigtableChannelPrimer create(Credentials credentials, String projectId, String instanceId, String appProfileId, List tableIds) { - EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() - .setProjectId(projectId) - .setInstanceId(instanceId) - .setAppProfileId(appProfileId) - .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) - .setExecutorProvider( - InstantiatingExecutorProvider.newBuilder() - .setExecutorThreadCount(1) - .build() - ) - // Disable watchdog creation - its unnecessary. - .setStreamWatchdogProvider(FixedWatchdogProvider.create(null)); + static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row"); + private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); + + private final EnhancedBigtableStubSettings settingsTemplate; + private final List tableIds; + + static BigtableChannelPrimer create( + Credentials credentials, + String projectId, + String instanceId, + String appProfileId, + List tableIds) { + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(projectId) + .setInstanceId(instanceId) + .setAppProfileId(appProfileId) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .setExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); // Disable retries for priming request - Duration timeout = Duration.ofSeconds(1); - builder.readRowSettings().setRetrySettings( - builder.readRowSettings().getRetrySettings().toBuilder() - .setMaxAttempts(0) - .setJittered(false) - .setInitialRpcTimeout(timeout) - .setMaxRpcTimeout(timeout) - .setTotalTimeout(timeout) - .build() - ); + builder + .readRowSettings() + .setRetrySettings( + builder + .readRowSettings() + .getRetrySettings() + .toBuilder() + .setMaxAttempts(1) + .setJittered(false) + .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setTotalTimeout(PRIME_REQUEST_TIMEOUT) + .build()); return new BigtableChannelPrimer(builder.build(), tableIds); } - private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + + private BigtableChannelPrimer( + EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); this.settingsTemplate = settingsTemplate; this.tableIds = ImmutableList.copyOf(tableIds); } @@ -71,8 +86,10 @@ private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate, Lis public void primeChannel(ManagedChannel managedChannel) { try { primeChannelUnsafe(managedChannel); - } catch (IOException|RuntimeException e) { - LOG.warning(String.format("Unexpected error while trying to prime the channel: %s", e)); + } catch (IOException | RuntimeException e) { + LOG.warning( + String.format("Unexpected error while trying to prime a channel: %s", e.getMessage())); + e.printStackTrace(); } } @@ -84,7 +101,7 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio } } - void waitForChannelReady(ManagedChannel managedChannel) { + private void waitForChannelReady(ManagedChannel managedChannel) { for (int i = 0; i < 10; i++) { ConnectivityState connectivityState = managedChannel.getState(true); if (connectivityState == ConnectivityState.READY) { @@ -98,22 +115,23 @@ void waitForChannelReady(ManagedChannel managedChannel) { } } - void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { + private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { // Wrap the channel in a temporary stub - EnhancedBigtableStubSettings primingSettings = settingsTemplate.toBuilder() - .setTransportChannelProvider( - FixedTransportChannelProvider.create( - GrpcTransportChannel.create(managedChannel) - ) - ).build(); + EnhancedBigtableStubSettings primingSettings = + settingsTemplate + .toBuilder() + .setTransportChannelProvider( + FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel))) + .build(); try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { Map> primeFutures = new HashMap<>(); // Prime all of the table ids in parallel for (String tableId : tableIds) { - ApiFuture f = stub.readRowCallable() - .futureCall(Query.create(tableId).filter(FILTERS.block())); + ApiFuture f = + stub.readRowCallable() + .futureCall(Query.create(tableId).rowKey(PRIMING_ROW_KEY).filter(FILTERS.block())); primeFutures.put(tableId, f); } @@ -123,8 +141,12 @@ void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { try { entry.getValue().get(); } catch (Throwable e) { - LOG.warning(String - .format("Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + if (e instanceof ExecutionException) { + e = e.getCause(); + } + LOG.warning( + String.format( + "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index bcae63d274..7e177442f3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -128,10 +128,11 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) return new EnhancedBigtableStub(settings, ClientContext.create(settings)); } - @InternalApi("Visible for testing") - public static EnhancedBigtableStubSettings finalizeSettings(EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) + public static EnhancedBigtableStubSettings finalizeSettings( + EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats) throws IOException { EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); + if (settings.isRefreshingChannel()) { // Fix the credentials so that they can be shared Credentials credentials = null; @@ -141,13 +142,20 @@ public static EnhancedBigtableStubSettings finalizeSettings(EnhancedBigtableStub builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); // Inject the primer - InstantiatingGrpcChannelProvider transportProvider = (InstantiatingGrpcChannelProvider)settings.getTransportChannelProvider(); + InstantiatingGrpcChannelProvider transportProvider = + (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider(); builder.setTransportChannelProvider( - transportProvider.toBuilder() - .setChannelPrimer(BigtableChannelPrimer.create(credentials, settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId(), settings.getPrimedTableIds())) - .build() - ); + transportProvider + .toBuilder() + .setChannelPrimer( + BigtableChannelPrimer.create( + credentials, + settings.getProjectId(), + settings.getInstanceId(), + settings.getAppProfileId(), + settings.getPrimedTableIds())) + .build()); } // Inject Opencensus instrumentation @@ -172,8 +180,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(EnhancedBigtableStub .put("grpc", GaxGrpcProperties.getGrpcVersion()) .put( "gapic", - GaxProperties.getLibraryVersion( - EnhancedBigtableStubSettings.class)) + GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class)) .build()), // Add OpenCensus Metrics MetricsTracerFactory.create( @@ -196,10 +203,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(EnhancedBigtableStub return builder.build(); } - @InternalApi("Visible for testing") - public EnhancedBigtableStub( - EnhancedBigtableStubSettings settings, - ClientContext clientContext) { + public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext clientContext) { this.settings = settings; this.clientContext = clientContext; this.requestContext = diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index b7f8cc2d5f..d843265d1e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -227,8 +227,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } + /** Gets the tables that will be primed during a channel refresh. */ @BetaApi("Channel priming is not currently stable and might change in the future") - public List getPrimedTableIds() { return primedTableIds; } + public List getPrimedTableIds() { + return primedTableIds; + } /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { @@ -618,7 +621,6 @@ private Builder(EnhancedBigtableStubSettings settings) { isRefreshingChannel = settings.isRefreshingChannel; primedTableIds = settings.primedTableIds; - // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); readRowSettings = settings.readRowSettings.toBuilder(); @@ -707,7 +709,11 @@ public String getAppProfileId() { } /** - * Sets if channels will gracefully refresh connections to Cloud Bigtable service + * Sets if channels will gracefully refresh connections to Cloud Bigtable service. + * + *

When enabled, this will wait for the connection to complete the SSL handshake. The effect + * can be enhanced by configuring table ids that can be used warm serverside caches using {@link + * #setPrimedTableIds(String...)}. * * @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel */ @@ -717,10 +723,7 @@ public Builder setRefreshingChannel(boolean isRefreshingChannel) { return this; } - /** - * Configures which table will be primed when a connection is created. - * - */ + /** Configures which tables will be primed when a connection is created. */ @BetaApi("Channel priming is not currently stable and might change in the future") public Builder setPrimedTableIds(String... tableIds) { this.primedTableIds = ImmutableList.copyOf(tableIds); @@ -733,9 +736,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Tables that will be primed during a channel refresh */ + /** Gets the tables that will be primed during a channel refresh. */ @BetaApi("Channel priming is not currently stable and might change in the future") - public List getPrimedTableIds() { return primedTableIds; } + public List getPrimedTableIds() { + return primedTableIds; + } /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java new file mode 100644 index 0000000000..66448ed474 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -0,0 +1,196 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFunction; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.RowFilter; +import com.google.bigtable.v2.RowSet; +import com.google.common.collect.ImmutableList; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BigtableChannelPrimerTest { + private static final String TOKEN_VALUE = "fake-token"; + + int port; + Server server; + FakeService fakeService; + MetadataInterceptor metadataInterceptor; + BigtableChannelPrimer primer; + ManagedChannel channel; + private LogHandler logHandler; + + @Before + public void setup() throws IOException { + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } catch (IOException e) { + e.printStackTrace(); + } + + fakeService = new FakeService(); + metadataInterceptor = new MetadataInterceptor(); + server = + ServerBuilder.forPort(port).intercept(metadataInterceptor).addService(fakeService).build(); + server.start(); + + primer = + BigtableChannelPrimer.create( + OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), + "fake-project", + "fake-instance", + "fake-app-profile", + ImmutableList.of("table1", "table2")); + + channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); + + logHandler = new LogHandler(); + Logger.getLogger(BigtableChannelPrimer.class.toString()).addHandler(logHandler); + } + + @After + public void teardown() { + Logger.getLogger(BigtableChannelPrimer.class.toString()).removeHandler(logHandler); + channel.shutdown(); + server.shutdown(); + } + + @Test + public void testCredentials() { + primer.primeChannel(channel); + + for (Metadata metadata : metadataInterceptor.metadataList) { + assertThat(metadata.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))) + .isEqualTo("Bearer " + TOKEN_VALUE); + } + channel.shutdown(); + } + + @Test + public void testRequests() { + final Queue requests = new ConcurrentLinkedQueue<>(); + + fakeService.readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest req) { + requests.add(req); + return ReadRowsResponse.getDefaultInstance(); + } + }; + primer.primeChannel(channel); + + assertThat(requests) + .containsExactly( + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/table1") + .setAppProfileId("fake-app-profile") + .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build(), + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/table2") + .setAppProfileId("fake-app-profile") + .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) + .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) + .setRowsLimit(1) + .build()); + } + + @Test + public void testErrorsAreLogged() { + fakeService.readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest req) { + throw new StatusRuntimeException(Status.FAILED_PRECONDITION); + } + }; + primer.primeChannel(channel); + + assertThat(logHandler.logs).hasSize(2); + for (LogRecord log : logHandler.logs) { + assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); + } + } + + private static class MetadataInterceptor implements ServerInterceptor { + ConcurrentLinkedQueue metadataList = new ConcurrentLinkedQueue<>(); + + @Override + public Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + metadataList.add(metadata); + + return serverCallHandler.startCall(serverCall, metadata); + } + } + + static class FakeService extends BigtableImplBase { + private ApiFunction readRowsCallback = + new ApiFunction() { + @Override + public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { + return ReadRowsResponse.getDefaultInstance(); + } + }; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + try { + responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onCompleted(); + } catch (RuntimeException e) { + responseObserver.onError(e); + } + } + } + + private static class LogHandler extends Handler { + private ConcurrentLinkedQueue logs = new ConcurrentLinkedQueue<>(); + + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java deleted file mode 100644 index 9481bb6d81..0000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/ChannelPrimingTest.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.google.cloud.bigtable.data.v2.stub; - -import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.testing.FakeServiceGrpc.FakeServiceImplBase; -import com.google.auth.Credentials; -import com.google.auth.oauth2.IdToken; -import com.google.auth.oauth2.OAuth2Credentials; -import com.google.bigtable.v2.BigtableGrpc; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -@RunWith(JUnit4.class) -public class ChannelPrimingTest { - @Rule - public final MockitoRule mockitoJUnit = MockitoJUnit.rule(); - - private Server server; - private FakeService fakeService; - private EnhancedBigtableStub stub; - private FakeCrendentialsProvider fakeCredsProvider; - - @Before - public void setup() throws IOException { - final int port; - try (ServerSocket ss = new ServerSocket(0)) { - port = ss.getLocalPort(); - } - - server = ServerBuilder.forPort(port) - .addService(new FakeService()) - .build(); - - fakeCredsProvider = new FakeCrendentialsProvider(); - - EnhancedBigtableStubSettings settings = EnhancedBigtableStubSettings.newBuilder() - .setProjectId("my-project") - .setInstanceId("my-instance") - .setAppProfileId("my-app-profile") - .setRefreshingChannel(true) - .setPrimedTableId("table1", "table2") - .setCredentialsProvider(fakeCredsProvider) - .setEndpoint("localhost:" + port) - .build(); - - stub = EnhancedBigtableStub.create(settings); - } - - @After - public void teardown() { - if (stub != null) { - stub.close(); - } - if (server != null) { - server.shutdown(); - } - } - - @Test - public void testMoo() { - - } - - static class FakeCrendentialsProvider implements CredentialsProvider { - private AtomicInteger generation = new AtomicInteger(); - - @Override - public Credentials getCredentials() throws IOException { - return new FakeCredentials("generation" + generation.getAndIncrement()); - } - } - - static class FakeCredentials extends Credentials { - private final String token; - - FakeCredentials(String token) { - this.token = token; - } - - @Override - public String getAuthenticationType() { - return "fake"; - } - - @Override - public Map> getRequestMetadata(URI uri) throws IOException { - return ImmutableMap.>of("key", ImmutableList.of(token)); - } - - @Override - public boolean hasRequestMetadata() { - return true; - } - - @Override - public boolean hasRequestMetadataOnly() { - return true; - } - - @Override - public void refresh() throws IOException { - } - } - - static class FakeService extends BigtableGrpc.BigtableImplBase { - List requests = new ArrayList<>(); - - - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 369c846d57..49ebae81f2 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -120,9 +120,9 @@ public void setUp() throws Exception { .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) .build(); - EnhancedBigtableStubSettings stubSettings = EnhancedBigtableStub.finalizeSettings( - settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); - + EnhancedBigtableStubSettings stubSettings = + EnhancedBigtableStub.finalizeSettings( + settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); } From c8ead38bd3c3ff93d1deb37a5458d2517bec1824 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 28 Aug 2020 17:42:47 -0400 Subject: [PATCH 3/8] remove debug --- .../cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 5dd1106e8c..2a37f99572 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -89,7 +89,6 @@ public void primeChannel(ManagedChannel managedChannel) { } catch (IOException | RuntimeException e) { LOG.warning( String.format("Unexpected error while trying to prime a channel: %s", e.getMessage())); - e.printStackTrace(); } } From fba802609f17d254433d17966cbd9ff407ea7657 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 28 Aug 2020 17:55:09 -0400 Subject: [PATCH 4/8] copyright --- .../data/v2/stub/BigtableChannelPrimer.java | 15 +++++++++++++++ .../data/v2/stub/BigtableChannelPrimerTest.java | 16 ++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 2a37f99572..6dd242adb6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigtable.data.v2.stub; import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 66448ed474..9a935d7a71 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.bigtable.data.v2.stub; import static com.google.common.truth.Truth.assertThat; @@ -87,6 +102,7 @@ public void teardown() { public void testCredentials() { primer.primeChannel(channel); + assertThat(metadataInterceptor.metadataList).isEmpty(); for (Metadata metadata : metadataInterceptor.metadataList) { assertThat(metadata.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))) .isEqualTo("Bearer " + TOKEN_VALUE); From 3ff458e0a7abff09a3a6f69b9a96db43a5b60f12 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 31 Aug 2020 12:41:53 -0400 Subject: [PATCH 5/8] remove debug code from test --- .../cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 9a935d7a71..9009bbc037 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -102,7 +102,6 @@ public void teardown() { public void testCredentials() { primer.primeChannel(channel); - assertThat(metadataInterceptor.metadataList).isEmpty(); for (Metadata metadata : metadataInterceptor.metadataList) { assertThat(metadata.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER))) .isEqualTo("Bearer " + TOKEN_VALUE); From 555b9ab71595361eee6df24a669317de6cb79fe9 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 31 Aug 2020 13:20:39 -0400 Subject: [PATCH 6/8] fix deps --- google-cloud-bigtable/pom.xml | 4 ++++ .../cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 309e4d51e8..68dd4e950e 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -87,6 +87,10 @@ com.google.api.grpc proto-google-iam-v1 + + com.google.auth + google-auth-library-credentials + com.google.auth google-auth-library-oauth2-http diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 6dd242adb6..1a2274c1a0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -17,7 +17,6 @@ import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; -import com.google.api.client.util.Preconditions; import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.gax.core.FixedCredentialsProvider; @@ -28,6 +27,7 @@ import com.google.auth.Credentials; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import io.grpc.ConnectivityState; From 91cb7a5eca4316173513d34dd44dc9fa028ae2c4 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 1 Sep 2020 11:49:36 -0400 Subject: [PATCH 7/8] address some feedback --- .../data/v2/stub/BigtableChannelPrimer.java | 2 +- .../v2/stub/EnhancedBigtableStubTest.java | 44 ++++++++++++++----- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 1a2274c1a0..15be8f7309 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -116,7 +116,7 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio } private void waitForChannelReady(ManagedChannel managedChannel) { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 30; i++) { ConnectivityState connectivityState = managedChannel.getState(true); if (connectivityState == ConnectivityState.READY) { break; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index be2d9c2a0f..b823930fb6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -18,14 +18,13 @@ import static com.google.common.truth.Truth.assertThat; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.testing.InProcessServer; -import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.admin.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; import com.google.cloud.bigtable.data.v2.models.Query; @@ -34,8 +33,11 @@ import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; +import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.net.ServerSocket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -49,37 +51,40 @@ public class EnhancedBigtableStubTest { private static final String PROJECT_ID = "fake-project"; private static final String INSTANCE_ID = "fake-instance"; - private static final String FAKE_HOST_NAME = "fake-stub-host:123"; private static final String TABLE_NAME = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); private static final String APP_PROFILE_ID = "app-profile-id"; - private InProcessServer server; + private Server server; private FakeDataService fakeDataService; + private EnhancedBigtableStubSettings defaultSettings; private EnhancedBigtableStub enhancedBigtableStub; @Before public void setUp() throws IOException, IllegalAccessException, InstantiationException { + int port; + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } fakeDataService = new FakeDataService(); - server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME); + server = ServerBuilder.forPort(port).addService(fakeDataService).build(); server.start(); - EnhancedBigtableStubSettings enhancedBigtableStubSettings = - EnhancedBigtableStubSettings.newBuilder() + defaultSettings = + BigtableDataSettings.newBuilderForEmulator(port) .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) .setAppProfileId(APP_PROFILE_ID) .setCredentialsProvider(NoCredentialsProvider.create()) - .setEndpoint(FAKE_HOST_NAME) - .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME)) - .build(); + .build() + .getStubSettings(); - enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings); + enhancedBigtableStub = EnhancedBigtableStub.create(defaultSettings); } @After public void tearDown() { - server.stop(); + server.shutdown(); } @Test @@ -117,6 +122,21 @@ public void testCreateReadRowsRawCallable() throws InterruptedException { assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); } + @Test + public void testChannelPrimerConfigured() throws IOException { + EnhancedBigtableStubSettings settings = + defaultSettings + .toBuilder() + .setRefreshingChannel(true) + .setPrimedTableIds("table1", "table2") + .build(); + + try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) { + // priming will issue a request per table on startup + assertThat(fakeDataService.requests).hasSize(2); + } + } + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); From 5e1a94dad7818040879247a98de7928f35ed60e4 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 1 Sep 2020 15:04:48 -0400 Subject: [PATCH 8/8] add additional error handling test + some comments --- .../data/v2/stub/EnhancedBigtableStub.java | 4 ++++ .../v2/stub/BigtableChannelPrimerTest.java | 23 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 7e177442f3..d729d6244d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -133,6 +133,10 @@ public static EnhancedBigtableStubSettings finalizeSettings( throws IOException { EnhancedBigtableStubSettings.Builder builder = settings.toBuilder(); + // TODO: this implementation is on the cusp of unwieldy, if we end up adding more features + // consider splitting it up by feature. + + // Inject channel priming if (settings.isRefreshingChannel()) { // Fix the credentials so that they can be shared Credentials credentials = null; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 9009bbc037..42d13a7ab1 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -50,6 +50,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.ThrowsException; @RunWith(JUnit4.class) public class BigtableChannelPrimerTest { @@ -158,6 +160,27 @@ public ReadRowsResponse apply(ReadRowsRequest req) { } } + @Test + public void testErrorsAreLoggedForBasic() { + BigtableChannelPrimer basicPrimer = + BigtableChannelPrimer.create( + OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), + "fake-project", + "fake-instance", + "fake-app-profile", + ImmutableList.of()); + + ManagedChannel channel = + Mockito.mock( + ManagedChannel.class, new ThrowsException(new UnsupportedOperationException())); + primer.primeChannel(channel); + + assertThat(logHandler.logs).hasSize(1); + for (LogRecord log : logHandler.logs) { + assertThat(log.getMessage()).contains("Unexpected"); + } + } + private static class MetadataInterceptor implements ServerInterceptor { ConcurrentLinkedQueue metadataList = new ConcurrentLinkedQueue<>();