Skip to content

refactor(bigtable): unify session/vRPC concurrency on an executor model#13604

Closed
igorbernstein2 wants to merge 0 commit into
googleapis:mainfrom
igorbernstein2:threading-refactor-final
Closed

refactor(bigtable): unify session/vRPC concurrency on an executor model#13604
igorbernstein2 wants to merge 0 commit into
googleapis:mainfrom
igorbernstein2:threading-refactor-final

Conversation

@igorbernstein2

Copy link
Copy Markdown
Contributor

Summary

Reworks the threading model of the new session-based transport
(com.google.cloud.bigtable.data.v2.internal). The old model serialized
session callbacks, retry/op state, and pool topology all on a single
synchronized(SessionPoolImpl.this) monitor, dispatched user callbacks
on the gRPC user-thread pool, and used the JDK
ScheduledThreadPoolExecutor for heartbeats and per-vRPC deadline
monitoring. This produced three concrete problems:

  1. Lock contention — the pool monitor was held across session state
    mutations, retry decisions, and PendingVRpc bookkeeping.
  2. Scheduler heap churn — heartbeats fire every ~100 ms per session
    and deadline-monitor futures are cancelled within ~1 ms but stay in
    the DelayQueue until natural expiry, inflating O(log n) inserts.
  3. Silent hangs on dispatch-chain exceptions — uncaught exceptions
    inside the SyncContext could orphan the caller's Future with no
    log or timeout.

New model

Executor Scope Owns
sessionSyncContext per SessionImpl all session state
OpExecutor = SerializingExecutor(userCallbackExecutor) per vRPC all retry/op state
Netty I/O (DirectExecutor) per gRPC channel stream callback delivery
HashedWheelTimer (in-process) per SessionPoolImpl heartbeats, deadlines, watchdog
ScheduledExecutorService pool-wide retry delays, AFE-list pruning

Every state mutation in a session crosses sessionSyncContext; every
state mutation in the retry/op layer crosses the per-op OpExecutor.
Netty I/O threads and the user thread are producers only — they submit
and return. The pool monitor now covers only pool topology
(pendingRpcs, poolState, session list).

The userCallbackExecutor is the one configured on the gax
TransportChannelProvider, so transport and user-callback dispatch
share a single user-controlled pool instead of each Client/Shim
spinning up its own unbounded CachedThreadPool.

Key behavior changes worth flagging to reviewers

  • User callbacks now run on the user-callback executor, not the
    gRPC user-thread pool. Callers that implicitly relied on the prior
    thread identity will see a different Thread.currentThread().
  • SessionImpl.forceClose is now fully async (queues onto
    sessionSyncContext and returns). If the SyncContext is wedged the
    abort path catches the uncaught exception, logs, and tears the
    session down on the next drain — delivering terminal onClose to
    every attached vRPC instead of orphaning their futures.
  • Heartbeat is armed only while a vRPC is in flight, instead of
    ticking on every idle session.
  • Client.close drains SessionPools before tearing down the
    user-callback executor
    , and serializes against concurrent open*
    so racing opens cannot create orphan pools.

Not in scope

  • Per-AFE lock sharding (the pool monitor's scope has narrowed to pool
    topology but is still pool-wide; per-AFE sharding is a queued
    follow-up).
  • Server-streaming retry (requestNext is still
    UnsupportedOperationException — unary-only surface today).

@igorbernstein2 igorbernstein2 requested review from a team as code owners June 30, 2026 20:30

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the threading and scheduling model of the Bigtable client by introducing a custom HashedWheelTimer and a per-operation serializing executor (OpExecutor), replacing the previous usage of ScheduledExecutorService and SynchronizationContext to improve thread safety and reduce lock contention. Feedback on these changes highlights a critical deadlock risk in OpExecutor.drain(), where an abrupt termination caused by an uncaught exception in the handler can leave drainScheduled set to true, permanently wedging the executor.

igorbernstein2 added a commit to igorbernstein2/google-cloud-java that referenced this pull request Jun 30, 2026
handler.uncaught is the last-resort recovery for a task throw; it cancels
the chain, which drives Done.onStart, which calls listener.onClose. If
the user's terminal listener also throws, the throw escapes the handler
and exits drain() before reaching the `r == null` branch that clears
drainScheduled. The flag stays set and every subsequent execute() queues
without rescheduling — the OpExecutor is wedged.

Per-op lifecycle today has no work after terminal close, so the wedge
leaks nothing observable. Fixing it for symmetry with scheduleDrainLocked
(which already resets drainScheduled on a backing.execute throw) and to
keep the contract robust if a future handler can throw on a non-terminal
op.

Caught by gemini-code-assist on PR googleapis#13604.
@mutianf

mutianf commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the threading and scheduling model of the Bigtable client by introducing a custom HashedWheelTimer to replace ScheduledExecutorService for low-resolution scheduling, and a per-operation serializing executor (OpExecutor) to replace heavy synchronization in SessionImpl with a SynchronizationContext. Feedback on these changes highlights a critical concurrency bug in OpExecutor.runInline that violates serialization guarantees, a potential race condition in HashedWheelTimer.stop() that could cause stop hooks to execute twice, and a cumulative timeout issue in Client.close() when sequentially waiting for multiple session pools to drain.

@igorbernstein2

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant refactoring of the Bigtable client's internal concurrency and session management. Key changes include the introduction of a custom 'BigtableTimer' (a hashed-wheel timer) to replace reliance on 'ScheduledExecutorService' for short-lived tasks, the implementation of an 'OpExecutor' to provide per-operation serialization, and a migration of session state management to a 'SynchronizationContext'. These changes improve thread safety, prevent potential memory leaks, and optimize resource usage by reducing the number of threads and timer tasks. I have identified a potential correctness issue in 'VRpcImpl.java' where the state is not transitioned to 'CLOSED' upon successful response, which could lead to memory leaks in the 'VOperationImpl' cancellation listener registration.

Comment on lines +160 to +163
ctx.getExecutor()
.execute(
() -> {
RespT resp;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Correctness / Memory Leak Risk

In handleResponse, the state of VRpcImpl is checked but never transitioned to CLOSED (unlike handleError and handleSessionClose which use compareAndSet(State.STARTED, State.CLOSED)).

This leads to two issues:

  1. Inconsistent isDone() state: isDone() will return false even after the RPC has successfully completed.
  2. Potential Memory Leak in VOperationImpl: If VRpcImpl is used directly (or via a delegating middleware), VOperationImpl.start checks !chain.isDone() to decide whether to register the cancellationListener on grpcContext. Since isDone() returns false on success, the listener will be registered after the call has already completed and CleanupListener.onClose has run. This permanently leaks the listener (and the entire call chain) on the grpcContext.

Suggestion:
Transition the state to CLOSED inside the executor task in handleResponse using compareAndSet to ensure symmetry and correctness.

    ctx.getExecutor()
        .execute(
            () -> {
              if (!state.compareAndSet(State.STARTED, State.CLOSED)) {
                return;
              }
              RespT resp;

@igorbernstein2 igorbernstein2 force-pushed the threading-refactor-final branch from 4ed8f07 to b56f9b8 Compare July 1, 2026 19:48
igorbernstein2 added a commit to igorbernstein2/google-cloud-java that referenced this pull request Jul 1, 2026
handler.uncaught is the last-resort recovery for a task throw; it cancels
the chain, which drives Done.onStart, which calls listener.onClose. If
the user's terminal listener also throws, the throw escapes the handler
and exits drain() before reaching the `r == null` branch that clears
drainScheduled. The flag stays set and every subsequent execute() queues
without rescheduling — the OpExecutor is wedged.

Per-op lifecycle today has no work after terminal close, so the wedge
leaks nothing observable. Fixing it for symmetry with scheduleDrainLocked
(which already resets drainScheduled on a backing.execute throw) and to
keep the contract robust if a future handler can throw on a non-terminal
op.

Caught by gemini-code-assist on PR googleapis#13604.
@igorbernstein2

Copy link
Copy Markdown
Contributor Author

Superseded by #13633 after a rebase mishap on this branch. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants