From 35f669598722cf1be23b4ee7a0ccce8146936e14 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 20 Apr 2026 21:52:25 -0400 Subject: [PATCH 1/4] fix: notify listener for unstarted vRPCs. Fix newRealCall --- .../v2/internal/session/SessionPoolImpl.java | 2 +- .../data/v2/internal/session/VRpcImpl.java | 4 +- .../data/v2/stub/SessionDeadlineTest.java | 221 ++++++++++++++++++ 3 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java index 3ad4a6ebd5..59c2802b5b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java @@ -534,7 +534,6 @@ private void tryDrainPendingRpcs() { if (!handle.isPresent()) { break; } - handle.get().onVRpcStarted(); PendingVRpc rpc = pendingRpcs.removeFirst(); rpc.drainTo(handle.get()); } @@ -578,6 +577,7 @@ private VRpc newRealC return new ForwardingVRpc(handle.getSession().newCall(desc)) { @Override public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { + handle.onVRpcStarted(); final Stopwatch stopwatch = Stopwatch.createStarted(); super.start( req, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java index b2d16c7a09..d5f3b6a0f3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java @@ -124,9 +124,7 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { } if (!status.isOk()) { - if (!state.compareAndSet(State.STARTED, State.CLOSED)) { - return; - } + state.set(State.CLOSED); // TODO: loop through the session executor if (retryable) { listener.onClose(VRpcResult.createUncommitedError(status)); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java new file mode 100644 index 0000000000..82b1322ba9 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java @@ -0,0 +1,221 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ClientConfiguration; +import com.google.bigtable.v2.GetClientConfigurationRequest; +import com.google.bigtable.v2.OpenSessionResponse; +import com.google.bigtable.v2.SessionRequest; +import com.google.bigtable.v2.SessionResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.TableId; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SessionDeadlineTest { + + private io.grpc.Server server; + private EnhancedBigtableStubSettings defaultSettings; + private FakeDataService fakeDataService; + + @Before + public void setUp() throws IOException { + fakeDataService = new FakeDataService(); + server = + io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder.forPort(0) + .addService(fakeDataService) + .intercept(new ResponseHeaderInterceptor()) + .build() + .start(); + + defaultSettings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .setAppProfileId("fake-app-profile") + .setCredentialsProvider(NoCredentialsProvider.create()) + .build() + .getStubSettings(); + } + + @After + public void tearDown() throws InterruptedException { + if (fakeDataService != null) { + fakeDataService.shutdown(); + } + if (server != null) { + server.shutdownNow(); + server.awaitTermination(); + } + } + + @Test(timeout = 1000) + public void testShortDeadlineCancellation() throws Exception { + EnhancedBigtableStubSettings settings = + defaultSettings.toBuilder().setSessionsEnabled(true).build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { + Query request = Query.create(TableId.of("fake-table")).rowKey("row-key"); + + try (io.grpc.Context.CancellableContext ctx = + io.grpc.Context.current() + .withDeadlineAfter( + 5, + TimeUnit.MILLISECONDS, + settings.getBackgroundExecutorProvider().getExecutor())) { + + ctx.run( + () -> { + ApiFuture future = stub.readRowCallable().futureCall(request); + try { + future.get(); + fail("Should throw exception"); + } catch (ExecutionException e) { + assertThat(e).hasMessageThat().contains("DEADLINE_EXCEEDED"); + } catch (InterruptedException e) { + fail("Should not throw interrupted exception"); + } + }); + } + } + } + + @Test(timeout = 10000) + public void testMissedHeartbeat() throws Exception { + EnhancedBigtableStubSettings settings = + defaultSettings.toBuilder().setSessionsEnabled(true).build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { + Query request = Query.create(TableId.of("fake-table")).rowKey("row-key"); + + try (io.grpc.Context.CancellableContext ctx = + io.grpc.Context.current() + .withDeadlineAfter( + 1, TimeUnit.SECONDS, settings.getBackgroundExecutorProvider().getExecutor())) { + ctx.run( + () -> { + ApiFuture future = stub.readRowCallable().futureCall(request); + try { + future.get(); + fail("Should throw exception"); + } catch (ExecutionException e) { + assertThat(e).hasMessageThat().contains("missed heartbeat"); + } catch (InterruptedException e) { + fail("Should not throw interrupted exception"); + } + }); + } + } + } + + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { + private final java.util.concurrent.ScheduledExecutorService serverExecutor = + java.util.concurrent.Executors.newScheduledThreadPool(4); + + public void shutdown() { + serverExecutor.shutdownNow(); + } + + @Override + public void getClientConfiguration( + GetClientConfigurationRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + ClientConfiguration.newBuilder() + .setSessionConfiguration( + com.google.bigtable.v2.SessionClientConfiguration.newBuilder() + .setSessionLoad(1) + .build()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver openTable( + StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(SessionRequest sessionRequest) { + if (sessionRequest.hasOpenSession()) { + responseObserver.onNext( + SessionResponse.newBuilder() + .setOpenSession(OpenSessionResponse.getDefaultInstance()) + .build()); + } else if (sessionRequest.hasVirtualRpc()) { + // Server hangs + } + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + } + + private static class ResponseHeaderInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new io.grpc.ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + Metadata.Key peerInfoKey = + Metadata.Key.of("bigtable-peer-info", Metadata.ASCII_STRING_MARSHALLER); + String encoded = + java.util.Base64.getUrlEncoder() + .encodeToString( + com.google.bigtable.v2.PeerInfo.newBuilder() + .setApplicationFrontendRegion("us-east1") + .build() + .toByteArray()); + headers.put(peerInfoKey, encoded); + super.sendHeaders(headers); + } + + @Override + public void close(io.grpc.Status status, Metadata trailers) { + super.close(status, trailers); + } + }, + metadata); + } + } +} From c38f0bfc97f1b139fd1c760289865886352ecabf Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 21 Apr 2026 12:15:41 -0400 Subject: [PATCH 2/4] convert state transition to precondition check Change-Id: I81320830192df0cd16f405487c25b5dcb28985fe --- .../data/v2/internal/session/SessionImpl.java | 2 +- .../data/v2/internal/session/VRpcImpl.java | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java index f79142171f..a2ef4f6198 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java @@ -324,7 +324,7 @@ VRpc newCall(VRpcDescriptor descriptor) { long rpcId = nextRpcId; nextRpcId = Math.incrementExact(nextRpcId); - return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo()); + return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo(), debugTagTracer); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java index d5f3b6a0f3..38ecc75db9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java @@ -20,6 +20,7 @@ import com.google.bigtable.v2.VirtualRpcRequest; import com.google.bigtable.v2.VirtualRpcRequest.Metadata; import com.google.bigtable.v2.VirtualRpcResponse; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer; import com.google.cloud.bigtable.data.v2.internal.middleware.VRpc; import com.google.protobuf.Message; import com.google.protobuf.MessageLite; @@ -72,16 +73,20 @@ private enum State { private AtomicReference state; + private final DebugTagTracer debugTagTracer; + public VRpcImpl( VRpcSessionApi session, VRpcDescriptor desc, long rpcId, - PeerInfo peerInfo) { + PeerInfo peerInfo, + DebugTagTracer debugTagTracer) { this.session = session; this.desc = desc; this.rpcId = rpcId; this.state = new AtomicReference<>(State.NEW); this.peerInfo = peerInfo; + this.debugTagTracer = debugTagTracer; } @Override @@ -96,6 +101,8 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { retryable = false; } else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS) < TimeUnit.MILLISECONDS.toMicros(1)) { + // transitioning to the close state is handled below + state.set(State.STARTED); // Don't send RPCs that don't have any hope of succeeding status = Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC"); @@ -124,7 +131,11 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { } if (!status.isOk()) { - state.set(State.CLOSED); + debugTagTracer.checkPrecondition( + state.compareAndSet(State.STARTED, State.CLOSED), + "vrpc_incorrect_start_state", + "VRpc has incorrect state. Expected to be started but was %s", + state); // TODO: loop through the session executor if (retryable) { listener.onClose(VRpcResult.createUncommitedError(status)); From c6b4518fb9d3b0602321d31786fce673293bac8c Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 22 Apr 2026 12:58:42 -0400 Subject: [PATCH 3/4] remove onVRpcStarted Change-Id: I1064ff6be95987c994360119c832e9e55abe60e5 --- .../bigtable/data/v2/internal/session/SessionList.java | 7 +------ .../bigtable/data/v2/internal/session/SessionPoolImpl.java | 1 - .../cloud/bigtable/data/v2/stub/SessionDeadlineTest.java | 3 ++- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java index c5efeebffd..061e7e197c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java @@ -111,7 +111,7 @@ public Optional checkoutSession(AfeHandle afeHandle) { handle -> { poolStats.readyCount--; poolStats.inUseCount++; - + inUseSessions.add(handle); if (handle.afe.get().sessions.isEmpty()) { afesWithReadySessions.remove(afeHandle); } @@ -194,11 +194,6 @@ void onSessionStarted() { poolStats.readyCount++; } - void onVRpcStarted() { - // Pool stats and AFE list are updated in SessionList#checkoutSession - inUseSessions.add(this); - } - /** * The session is returned to the pool after use. This undoes what SessionList#checkoutSession */ diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java index 59c2802b5b..35884cb743 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java @@ -577,7 +577,6 @@ private VRpc newRealC return new ForwardingVRpc(handle.getSession().newCall(desc)) { @Override public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { - handle.onVRpcStarted(); final Stopwatch stopwatch = Stopwatch.createStarted(); super.start( req, diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java index 82b1322ba9..fd7c4fc38f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java @@ -34,6 +34,7 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -52,7 +53,7 @@ public class SessionDeadlineTest { public void setUp() throws IOException { fakeDataService = new FakeDataService(); server = - io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder.forPort(0) + NettyServerBuilder.forPort(0) .addService(fakeDataService) .intercept(new ResponseHeaderInterceptor()) .build() From a3ffddddb18654009e1bda3aeb283db3adfd09a8 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 22 Apr 2026 13:07:40 -0400 Subject: [PATCH 4/4] remove full import path from test Change-Id: I65308166955609dfc08ee0fec13987b469683dd0 --- .../data/v2/stub/SessionDeadlineTest.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java index fd7c4fc38f..7df4fddd5e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java @@ -24,20 +24,27 @@ import com.google.bigtable.v2.ClientConfiguration; import com.google.bigtable.v2.GetClientConfigurationRequest; import com.google.bigtable.v2.OpenSessionResponse; +import com.google.bigtable.v2.PeerInfo; import com.google.bigtable.v2.SessionRequest; import com.google.bigtable.v2.SessionResponse; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.TableId; +import io.grpc.Context; +import io.grpc.ForwardingServerCall; import io.grpc.Metadata; +import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.Base64; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -45,7 +52,7 @@ public class SessionDeadlineTest { - private io.grpc.Server server; + private Server server; private EnhancedBigtableStubSettings defaultSettings; private FakeDataService fakeDataService; @@ -119,8 +126,8 @@ public void testMissedHeartbeat() throws Exception { try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { Query request = Query.create(TableId.of("fake-table")).rowKey("row-key"); - try (io.grpc.Context.CancellableContext ctx = - io.grpc.Context.current() + try (Context.CancellableContext ctx = + Context.current() .withDeadlineAfter( 1, TimeUnit.SECONDS, settings.getBackgroundExecutorProvider().getExecutor())) { ctx.run( @@ -140,8 +147,7 @@ public void testMissedHeartbeat() throws Exception { } private static class FakeDataService extends BigtableGrpc.BigtableImplBase { - private final java.util.concurrent.ScheduledExecutorService serverExecutor = - java.util.concurrent.Executors.newScheduledThreadPool(4); + private final ScheduledExecutorService serverExecutor = Executors.newScheduledThreadPool(4); public void shutdown() { serverExecutor.shutdownNow(); @@ -195,15 +201,15 @@ public ServerCall.Listener interceptCall( Metadata metadata, ServerCallHandler serverCallHandler) { return serverCallHandler.startCall( - new io.grpc.ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { @Override public void sendHeaders(Metadata headers) { Metadata.Key peerInfoKey = Metadata.Key.of("bigtable-peer-info", Metadata.ASCII_STRING_MARSHALLER); String encoded = - java.util.Base64.getUrlEncoder() + Base64.getUrlEncoder() .encodeToString( - com.google.bigtable.v2.PeerInfo.newBuilder() + PeerInfo.newBuilder() .setApplicationFrontendRegion("us-east1") .build() .toByteArray());