diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java index f79142171f..a2ef4f6198 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java @@ -324,7 +324,7 @@ VRpc newCall(VRpcDescriptor descriptor) { long rpcId = nextRpcId; nextRpcId = Math.incrementExact(nextRpcId); - return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo()); + return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo(), debugTagTracer); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java index c5efeebffd..061e7e197c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java @@ -111,7 +111,7 @@ public Optional checkoutSession(AfeHandle afeHandle) { handle -> { poolStats.readyCount--; poolStats.inUseCount++; - + inUseSessions.add(handle); if (handle.afe.get().sessions.isEmpty()) { afesWithReadySessions.remove(afeHandle); } @@ -194,11 +194,6 @@ void onSessionStarted() { poolStats.readyCount++; } - void onVRpcStarted() { - // Pool stats and AFE list are updated in SessionList#checkoutSession - inUseSessions.add(this); - } - /** * The session is returned to the pool after use. This undoes what SessionList#checkoutSession */ diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java index 3ad4a6ebd5..35884cb743 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java @@ -534,7 +534,6 @@ private void tryDrainPendingRpcs() { if (!handle.isPresent()) { break; } - handle.get().onVRpcStarted(); PendingVRpc rpc = pendingRpcs.removeFirst(); rpc.drainTo(handle.get()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java index b2d16c7a09..38ecc75db9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java @@ -20,6 +20,7 @@ import com.google.bigtable.v2.VirtualRpcRequest; import com.google.bigtable.v2.VirtualRpcRequest.Metadata; import com.google.bigtable.v2.VirtualRpcResponse; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer; import com.google.cloud.bigtable.data.v2.internal.middleware.VRpc; import com.google.protobuf.Message; import com.google.protobuf.MessageLite; @@ -72,16 +73,20 @@ private enum State { private AtomicReference state; + private final DebugTagTracer debugTagTracer; + public VRpcImpl( VRpcSessionApi session, VRpcDescriptor desc, long rpcId, - PeerInfo peerInfo) { + PeerInfo peerInfo, + DebugTagTracer debugTagTracer) { this.session = session; this.desc = desc; this.rpcId = rpcId; this.state = new AtomicReference<>(State.NEW); this.peerInfo = peerInfo; + this.debugTagTracer = debugTagTracer; } @Override @@ -96,6 +101,8 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { retryable = false; } else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS) < TimeUnit.MILLISECONDS.toMicros(1)) { + // transitioning to the close state is handled below + state.set(State.STARTED); // Don't send RPCs that don't have any hope of succeeding status = Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC"); @@ -124,9 +131,11 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { } if (!status.isOk()) { - if (!state.compareAndSet(State.STARTED, State.CLOSED)) { - return; - } + debugTagTracer.checkPrecondition( + state.compareAndSet(State.STARTED, State.CLOSED), + "vrpc_incorrect_start_state", + "VRpc has incorrect state. Expected to be started but was %s", + state); // TODO: loop through the session executor if (retryable) { listener.onClose(VRpcResult.createUncommitedError(status)); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java new file mode 100644 index 0000000000..7df4fddd5e --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/SessionDeadlineTest.java @@ -0,0 +1,228 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ClientConfiguration; +import com.google.bigtable.v2.GetClientConfigurationRequest; +import com.google.bigtable.v2.OpenSessionResponse; +import com.google.bigtable.v2.PeerInfo; +import com.google.bigtable.v2.SessionRequest; +import com.google.bigtable.v2.SessionResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.TableId; +import io.grpc.Context; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.Base64; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SessionDeadlineTest { + + private Server server; + private EnhancedBigtableStubSettings defaultSettings; + private FakeDataService fakeDataService; + + @Before + public void setUp() throws IOException { + fakeDataService = new FakeDataService(); + server = + NettyServerBuilder.forPort(0) + .addService(fakeDataService) + .intercept(new ResponseHeaderInterceptor()) + .build() + .start(); + + defaultSettings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .setAppProfileId("fake-app-profile") + .setCredentialsProvider(NoCredentialsProvider.create()) + .build() + .getStubSettings(); + } + + @After + public void tearDown() throws InterruptedException { + if (fakeDataService != null) { + fakeDataService.shutdown(); + } + if (server != null) { + server.shutdownNow(); + server.awaitTermination(); + } + } + + @Test(timeout = 1000) + public void testShortDeadlineCancellation() throws Exception { + EnhancedBigtableStubSettings settings = + defaultSettings.toBuilder().setSessionsEnabled(true).build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { + Query request = Query.create(TableId.of("fake-table")).rowKey("row-key"); + + try (io.grpc.Context.CancellableContext ctx = + io.grpc.Context.current() + .withDeadlineAfter( + 5, + TimeUnit.MILLISECONDS, + settings.getBackgroundExecutorProvider().getExecutor())) { + + ctx.run( + () -> { + ApiFuture future = stub.readRowCallable().futureCall(request); + try { + future.get(); + fail("Should throw exception"); + } catch (ExecutionException e) { + assertThat(e).hasMessageThat().contains("DEADLINE_EXCEEDED"); + } catch (InterruptedException e) { + fail("Should not throw interrupted exception"); + } + }); + } + } + } + + @Test(timeout = 10000) + public void testMissedHeartbeat() throws Exception { + EnhancedBigtableStubSettings settings = + defaultSettings.toBuilder().setSessionsEnabled(true).build(); + + try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { + Query request = Query.create(TableId.of("fake-table")).rowKey("row-key"); + + try (Context.CancellableContext ctx = + Context.current() + .withDeadlineAfter( + 1, TimeUnit.SECONDS, settings.getBackgroundExecutorProvider().getExecutor())) { + ctx.run( + () -> { + ApiFuture future = stub.readRowCallable().futureCall(request); + try { + future.get(); + fail("Should throw exception"); + } catch (ExecutionException e) { + assertThat(e).hasMessageThat().contains("missed heartbeat"); + } catch (InterruptedException e) { + fail("Should not throw interrupted exception"); + } + }); + } + } + } + + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { + private final ScheduledExecutorService serverExecutor = Executors.newScheduledThreadPool(4); + + public void shutdown() { + serverExecutor.shutdownNow(); + } + + @Override + public void getClientConfiguration( + GetClientConfigurationRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + ClientConfiguration.newBuilder() + .setSessionConfiguration( + com.google.bigtable.v2.SessionClientConfiguration.newBuilder() + .setSessionLoad(1) + .build()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver openTable( + StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(SessionRequest sessionRequest) { + if (sessionRequest.hasOpenSession()) { + responseObserver.onNext( + SessionResponse.newBuilder() + .setOpenSession(OpenSessionResponse.getDefaultInstance()) + .build()); + } else if (sessionRequest.hasVirtualRpc()) { + // Server hangs + } + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + } + + private static class ResponseHeaderInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + Metadata.Key peerInfoKey = + Metadata.Key.of("bigtable-peer-info", Metadata.ASCII_STRING_MARSHALLER); + String encoded = + Base64.getUrlEncoder() + .encodeToString( + PeerInfo.newBuilder() + .setApplicationFrontendRegion("us-east1") + .build() + .toByteArray()); + headers.put(peerInfoKey, encoded); + super.sendHeaders(headers); + } + + @Override + public void close(io.grpc.Status status, Metadata trailers) { + super.close(status, trailers); + } + }, + metadata); + } + } +}