Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the EndpointLifecycleManager to manage the lifecycle of location-aware routing endpoints, including background probing, traffic tracking, and idle eviction. It updates the routing logic in KeyRangeCache and KeyAwareChannel to be state-aware, ensuring only READY endpoints are utilized and TRANSIENT_FAILURE states are reported. Feedback suggests expanding the session extraction logic to include partition requests and addresses a potential concurrency bottleneck in the single-threaded executor used for background endpoint creation.
| Executors.newScheduledThreadPool( | ||
| 1, | ||
| r -> { | ||
| Thread t = new Thread(r, "spanner-endpoint-lifecycle"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| }); |
There was a problem hiding this comment.
The ScheduledExecutorService is created with a single thread. While most operations are asynchronous gRPC calls, endpointCache.get(address) (called in createAndStartProbing) can be a blocking operation depending on the implementation of the ChannelEndpointCache. If many endpoints are being created simultaneously, this single thread could become a bottleneck. Consider using a small pool of threads or ensuring that endpointCache.get is non-blocking.
...nner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java
Outdated
Show resolved
Hide resolved
…int lifecycle management Location aware routing previously treated IDLE and CONNECTING channels as healthy, which could send traffic to stale replicas after cache updates. This change tightens endpoint readiness to READY-only, adds state-aware skipped_tablets reporting (TRANSIENT_FAILURE only), and introduces a background lifecycle manager that probes endpoints with GetSession to keep channels warm and evicts idle endpoints after 30 minutes of no real traffic.
0f6018c to
bcdc852
Compare
bcdc852 to
26c365b
Compare
| private final Map<String, EndpointState> endpoints = new ConcurrentHashMap<>(); | ||
|
|
||
| /** Active addresses reported by each ChannelFinder (keyed by finder identity). */ | ||
| private final Map<Object, Set<String>> activeAddressesPerFinder = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
This seems to be a 'grow only' map. We add new ChannelFinder keys to it, but never remove any. In theory, that means that if we open many different databases, this would create a (small) memory leak.
Also, why is the key type Object?
There was a problem hiding this comment.
Good catch on both points. Fixed:
- Key is now String (the database resource path), which is naturally stable — one entry per database regardless of ChannelFinder recreation.
- KeyAwareChannel now uses a ReferenceQueue to detect when a SoftReference is cleared, and calls lifecycleManager.unregisterFinder(databaseId) to remove the entry and evict any endpoints that are no longer referenced by remaining finders.
- No strong reference to ChannelFinder is held by the lifecycle manager.
| * @param finderKey identity of the ChannelFinder reporting its active addresses | ||
| * @param activeAddresses server addresses currently referenced by tablets in this finder | ||
| */ | ||
| void updateActiveAddresses(Object finderKey, Set<String> activeAddresses) { |
There was a problem hiding this comment.
I think that this method has a potential race condition, as one EndpointLifecycleManager is shared across multiple DatabaseClients (assuming that the DatabaseClient instances were created from the same Spanner instance).
This method reads from the endpoints map to determine which endpoints to evict. At the same time, another thread could be adding endpoints to this map in ensureEndpointExists(..), but that endpoint might not yet be registered as active. That will cause it to be evicted here again directly.
There was a problem hiding this comment.
Fixed. ensureEndpointExists is now private and only called from inside updateActiveAddresses under activeAddressLock. The method atomically: creates endpoint state → registers the active address set → computes stale endpoints → evicts. This guarantees a newly created endpoint is always in the active set before any stale-eviction check can see it.
Also fixed a subtle bug discovered during testing: the scheduler task was previously submitted from inside
ConcurrentHashMap.computeIfAbsent, which meant the scheduler thread could run before computeIfAbsent published the entry (the node isn't visible until the mapping function returns). Background creation tasks are now scheduled after computeIfAbsent returns.
| } | ||
|
|
||
| /** Creates an endpoint and starts probing. Runs on the scheduler thread. */ | ||
| private void createAndStartProbing(String address) { |
There was a problem hiding this comment.
I (or actually Gemini) thinks that there is a potential concurrency issue in this method:
Imagine this scenario:
- ensureEndpointExists is called, adds the address to the endpoints map, and schedules createAndStartProbing.
- The first attempt fails, so it schedules a retry in 60 seconds. During that 60 seconds, a cache update happens, the address is no longer referenced, and updateActiveAddresses calls evictEndpoint(address). This removes the address from the endpoints map.
- Now the 60-second timer expires, and the retry task runs createAndStartProbing:
- Line 227: endpointCache.get(address) is called and it succeeds this time. A gRPC channel is created and put into the endpointCache.
- Line 229: It calls startProbing(address).
In startProbing:
243: EndpointState state = endpoints.get(address);
244: if (state == null || isShutdown.get()) {
245: return; // Triggers here because evictEndpoint removed it!
246: }It sees state is null and returns.
The Bug: The channel was created in endpointCache (Line 227), but because it returned early on line 245, it was never added back to the endpoints map of EndpointLifecycleManager. Since it is not in the endpoints map, it will never be checked for idle eviction and never be probed. It is now leaked forever in the endpointCache.
There was a problem hiding this comment.
Fixed. createAndStartProbing now re-checks endpoints.containsKey(address) after endpointCache.get(address) completes.
If the endpoint was evicted during channel creation, it immediately calls endpointCache.evict(address) to clean up the leaked channel. The same guard is applied before scheduling retries on failure.
| ManagedChannel channel = endpoint.getChannel(); | ||
| state.lastProbeAt = clock.instant(); | ||
|
|
||
| try { |
There was a problem hiding this comment.
This try block should probably catch all possible errors (or maybe even put a try block around all code in this method). If a scheduled task fails with an uncaught exception, then all future scheduled tasks will be cancelled. Meaning that a single unexpected error in this method will cause all probes of a given endpoint to stop.
There was a problem hiding this comment.
Fixed. The entire probe() method body is now wrapped in try-catch(Exception) so an uncaught exception won't cause ScheduledExecutorService to cancel all future runs of that task.
| this.clock = clock; | ||
| this.defaultEndpointAddress = endpointCache.defaultChannel().getAddress(); | ||
| this.scheduler = | ||
| Executors.newScheduledThreadPool( |
There was a problem hiding this comment.
I think that you should consider whether having a single executor with a single thread for all background tasks is the best choice here. Any operation that turns out to be slow (e.g. a slow network lookup) could block all other operations for a long time.
There was a problem hiding this comment.
Increased to 2 threads. endpointCache.get() can involve a network-level channel setup, and a slow creation should not block probes and eviction checks for other endpoints.
| "Evicting idle endpoint {0}: no real traffic for {1}", | ||
| new Object[] {address, idleEvictionDuration}); | ||
|
|
||
| stopProbing(address); |
There was a problem hiding this comment.
I think that this call can also cause a race condition (or maybe more in general: the lack of synchronization in this class). In this case, you could end up with a task that probes for an endpoint that has been evicted. And if that endpoint is added again later, then there will be two tasks for probing that endpoint. Imagine this sequence:
- Thread A is in startProbing. It gets the state object (line 243) but has not yet executed line 253 to schedule the task.
- Thread B runs evictEndpoint, which calls stopProbing.
- Thread B reads state.probeFuture. Since Thread A hasn't set it yet, it is null. So Thread B skips the cancellation block.
- Thread B returns to evictEndpoint and calls endpoints.remove(address). The endpoint is now gone from the map.
- Thread A resumes and executes line 253. It schedules the periodic task and saves the Future in state.probeFuture.
There is now a periodic task scheduled in the scheduler for an endpoint that has been evicted.
There was a problem hiding this comment.
Fixed. Both startProbing and stopProbing now synchronize on the EndpointState instance when accessing probeFuture. startProbing also re-checks endpoints.containsKey(address) inside the synchronized block to detect concurrent eviction. shutdown() similarly synchronizes on each state before cancelling.
olavloite
left a comment
There was a problem hiding this comment.
The feature LGTM.
But we need to get rid of all the Thread.sleep(..) calls in the tests. It is both a potential source for flaky tests, and something that increases test/build time (and it is typically something that 'spreads' through the code, i.e. if one test is 'allowed' to use Thread.sleep(..), then other tests assume that they can also do that)
| void updateActiveAddresses(Object finderKey, Set<String> activeAddresses) { | ||
| if (isShutdown.get()) { | ||
| void updateActiveAddresses(String finderKey, Set<String> activeAddresses) { | ||
| if (isShutdown.get() || finderKey == null || finderKey.isEmpty()) { |
There was a problem hiding this comment.
nit: (for another time, just ignore now) prefer the use of Strings.isNullOrEmpty(..)
| // Wait for background creation on the scheduler thread. | ||
| Thread.sleep(1000); |
There was a problem hiding this comment.
Sorry, I never got to the tests during the initial review. It is highly preferable to find some other way than Thread.sleep(..) to test this kind of thing, especially in unit tests. Maven does not run unit tests in parallel, and if many tests do this kind of thing, then the entire test suite gets slower and slower over time.
Is it possible to use a latch (or a set of latches) to guarantee that the scheduler has executed? Or do some active polling?
Especially as I see that many other tests further down are also using Thread.sleep(..).
There was a problem hiding this comment.
I removed the Thread.sleep(...) calls from EndpointLifecycleManagerTest and replaced them with bounded waiting on observable conditions instead of fixed delays. That keeps the tests deterministic, avoids unnecessary wall-clock delay, and removes the flakiness risk from scheduler timing.
🤖 I have created a release *beep* *boop* --- <details><summary>1.83.0</summary> ## [1.83.0](v1.82.0...v1.83.0) (2026-04-07) ### Features * [aiplatform] [Memorystore for Redis Cluster] Add support for ([0bd7666](0bd7666)) * [aiplatform] Add container_spec to Reasoning Engine public protos ([0bd7666](0bd7666)) * [aiplatform] Add container_spec to Reasoning Engine public protos ([0bd7666](0bd7666)) * [aiplatform] Add container_spec to Reasoning Engine public protos ([3ba3854](3ba3854)) * [aiplatform] Add container_spec to Reasoning Engine public protos ([3ba3854](3ba3854)) * [aiplatform] add evaluation metrics and autorater configuration to ([0bd7666](0bd7666)) * [backupdr] Adding new workload specific fields for AlloyDB ([6344cb0](6344cb0)) * [ces] update public libraries for CES v1 ([6344cb0](6344cb0)) * [ces] update public libraries for CES v1beta ([0bd7666](0bd7666)) * [ces] update public libraries for CES v1beta ([0bd7666](0bd7666)) * [chat] Addition of Section and SectionItem APIs ([0bd7666](0bd7666)) * [chat] Support app authentication with admin-consent scopes for ([0bd7666](0bd7666)) * [databasecenter] A new value `SUB_RESOURCE_TYPE_READ_POOL` is ([6344cb0](6344cb0)) * [dataflow] Add Pausing/Yaml capabilities to public protos ([3ba3854](3ba3854)) * [dataflow] add sha256 field to Package proto ([0bd7666](0bd7666)) * [dataflow] add sha256 field to Package proto ([3ba3854](3ba3854)) * [dataform] add folders and teamFolders related changes to v1 ([6344cb0](6344cb0)) * [datalineage] add configmanagement v1 module ([#12355](#12355)) ([2def625](2def625)) * [datamanager] add INVALID_MERCHANT_ID to the ErrorReason enum for ([6344cb0](6344cb0)) * [dialogflow-cx] updated v3 dialogflow client libraries with ([6344cb0](6344cb0)) * [dialogflow] updated v2 dialogflow client libraries ([6344cb0](6344cb0)) * [dialogflow] updated v2beta1 dialogflow client libraries ([6344cb0](6344cb0)) * [dlp] added support for detecting key-value pairs in client ([e5e22ed](e5e22ed)) * [document-ai] Added a fields for image and table annotation output ([0bd7666](0bd7666)) * [geocode] new module for geocode ([#12343](#12343)) ([474efb1](474efb1)) * [netapp] Add ONTAP passthrough APIs ([6344cb0](6344cb0)) * [network-security] Publish proto definitions for AuthzPolicy, ([6344cb0](6344cb0)) * [redis-cluster] [Memorystore for Redis Cluster] Add support for ([0bd7666](0bd7666)) * [redis-cluster] [Memorystore for Redis Cluster] Add support for ([3ba3854](3ba3854)) * [redis-cluster] [Memorystore for Redis Cluster] Add support for ([3ba3854](3ba3854)) * [securesourcemanager] Add CustomHostConfig to configure custom ([6344cb0](6344cb0)) * [storage] populate the `persisted_data_checksums` field with ([e5e22ed](e5e22ed)) * [texttospeech] Support safety settings for Gemini voices and ([0bd7666](0bd7666)) * [texttospeech] Support safety settings for Gemini voices and ([0bd7666](0bd7666)) * [texttospeech] Support safety settings for Gemini voices and ([0bd7666](0bd7666)) * [texttospeech] Support safety settings for Gemini voices and ([0bd7666](0bd7666)) * [translate] A new field `mime_type` is added to message ([e5e22ed](e5e22ed)) * [valkey] [Memorystore for Valkey] Add support for Flexible CA ([0bd7666](0bd7666)) * [valkey] [Memorystore for Valkey] Add support for Flexible CA ([0bd7666](0bd7666)) * [valkey] [Memorystore for Valkey] Add support for Flexible CA ([3ba3854](3ba3854)) * Add getProjectId getter for ComputeEngineCredentials ([#1833](#1833)) ([0a7895a](0a7895a)) * **bigguery:** add url.domain to span tracing ([#12208](#12208)) ([6f79c2d](6f79c2d)) * **bigquery observability:** add version attribute to span tracing ([#12132](#12132)) ([95c3eb8](95c3eb8)) * **bigquery:** add gcp.resource.destination.id for span tracing ([#12134](#12134)) ([5f31ded](5f31ded)) * **bigquery:** add opentelemetry W3C Trace Context to headers ([#12203](#12203)) ([965761a](965761a)) * **bigquery:** add resend attribute to span tracing + integration tests ([#12313](#12313)) ([167722d](167722d)) * **bigquery:** add url.full attribute to span tracing ([#12176](#12176)) ([7fdf9ff](7fdf9ff)) * **bigquery:** add url.template to span tracing ([#12181](#12181)) ([30f8afb](30f8afb)) * **bigquery:** added error attributes to span tracing ([#12115](#12115)) ([863d23b](863d23b)) * Extract resource name from unary requests for tracing ([#4159](#4159)) ([23b16b7](23b16b7)) * **gapic-generator-java:** Extract resource name heuristicly ([#12207](#12207)) ([f46480a](f46480a)) * **gax:** Actionable Errors Logging API Tracer ([#12202](#12202)) ([8d23279](8d23279)) * **gax:** Add error attributes to golden signal metrics. ([#12564](#12564)) ([063dfe5](063dfe5)) * **gax:** add utility for logging actionable errors ([#4144](#4144)) ([54fb8a5](54fb8a5)) * **gax:** Implement trace context extraction and injection with integration test ([#12625](#12625)) ([6675310](6675310)) * **observability:** Implement gcp.client.service attribute ([#12315](#12315)) ([e99812f](e99812f)) * **observability:** implement url.domain attribute ([#12316](#12316)) ([0a865bf](0a865bf)) * **sdk-platform-java:** Add CompositeTracer and CompositeTracerFactory. ([#12321](#12321)) ([4b5e8af](4b5e8af)) * Switch Eef metrics to using built in open telemetry ([#4385](#4385)) ([759bb22](759bb22)) ### Bug Fixes * Add error attributes to logging ([#12685](#12685)) ([a9198ee](a9198ee)) * **bq jdbc:** allow & ignore unknown parameters ([#12352](#12352)) ([2332ff1](2332ff1)) * **bq jdbc:** ensure getMoreResults() always moves the cursor ([#12353](#12353)) ([ac1f0f4](ac1f0f4)) * **ci:** consolidate duplicate yaml keys in github actions workflows ([#12306](#12306)) ([f644a19](f644a19)) * Clean up attributes for traces and metrics ([#12677](#12677)) ([914f97f](914f97f)) * fix getLong on NUMERIC ([#2420](#2420)) ([75ec5c2](75ec5c2)) * **gax:** Implement lazy resource name evaluation in ApiTracerContext ([#12618](#12618)) ([5e47749](5e47749)) * Handle null server address ([#12184](#12184)) ([435dd8c](435dd8c)) * **hermetic-build:** do not add release please comments on vertexai poms ([#12559](#12559)) ([5e161a7](5e161a7)) * **o11y:** create noop tracer when artifact ID is not set ([#12307](#12307)) ([630d83d](630d83d)) * **o11y:** do not record error.type in successful runs ([#12620](#12620)) ([28eebf0](28eebf0)) * **o11y:** remove `gpc.client.language` attribute ([#12621](#12621)) ([40d2e43](40d2e43)) * **oauth2:** mask sensitive tokens in HTTP logs ([#1900](#1900)) ([3e4ccb7](3e4ccb7)) * **release:** add Version.java as extra files in release-please ([#12617](#12617)) ([f5101d9](f5101d9)) * **spanner:** enforce READY-only location aware routing and add endpoint lifecycle management ([ecb86fd](ecb86fd)) * **spanner:** enforce READY-only location aware routing and add endpoint lifecycle management ([#12678](#12678)) ([ca9edb9](ca9edb9)) * **spanner:** improve grpc-gcp affinity cleanup and location-aware retries ([a157c2f](a157c2f)) * **spanner:** improve grpc-gcp affinity cleanup and location-aware retries ([#12682](#12682)) ([aca0428](aca0428)) * use dynamic tracer name instead of hardcoded gax-java ([#12190](#12190)) ([dea24db](dea24db)) ### Dependencies * bump jackson version to 2.18.3 ([#12351](#12351)) ([50304c1](50304c1)) * update dependencies.txt for grpc-gcp to 1.9.2 ([#4164](#4164)) ([f336fdc](f336fdc)) * update dependency com.google.apis:google-api-services-storage to v1-rev20260204-2.0.0 ([#1750](#1750)) ([340ecbe](340ecbe)) * update dependency com.google.apis:google-api-services-storage to v1-rev20260204-2.0.0 ([#3519](#3519)) ([1531107](1531107)) * update dependency com.google.cloud:google-cloud-storage to v2.64.1 ([#1752](#1752)) ([8fb6935](8fb6935)) * update dependency com.google.cloud:sdk-platform-java-config to v3.58.0 ([#1751](#1751)) ([9cc3e22](9cc3e22)) * update dependency com.google.cloud:sdk-platform-java-config to v3.58.0 ([#3523](#3523)) ([26d772a](26d772a)) * update dependency node to v24 ([#3509](#3509)) ([f308477](f308477)) * update gcr.io/cloud-devrel-public-resources/storage-testbench docker tag to v0.62.0 ([#3526](#3526)) ([ca29d5e](ca29d5e)) * update googleapis/sdk-platform-java action to v2.68.0 ([#3522](#3522)) ([abae1ac](abae1ac)) ### Reverts * ci: only run default list of graalvm tests if too many modules are touched ([#12292](#12292)) ([92bcdf4](92bcdf4)) ### Documentation * [dataplex] Change Dataplex library from `ALPHA` to `GA` ([6344cb0](6344cb0)) * [run] An existing repeated string field custom_audiences is marked ([015d9a1](015d9a1)) * **hermetic-build:** improve usability of development guide ([#12362](#12362)) ([5944127](5944127)) </details> --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). Co-authored-by: chingor13 <chingor@google.com>
Summary
isHealthy()now requiresREADYstate instead of "not SHUTDOWN and notTRANSIENT_FAILURE". IDLE/CONNECTING endpoints silently fall back to the default host without emitting
skipped_tablets.skipped_tablets: onlyTRANSIENT_FAILUREendpoints are reported inskipped_tabletsso the server can refresh the client cache. Other non-ready states (IDLE, CONNECTING, absent) are skipped silently.getIfPresent()toChannelEndpointCacheso the foreground request path never triggers blocking endpoint creation.affinityEndpoint()usesgetIfPresent()+ READY check, falling back to default instead offorcing traffic to a stale replica.
GrpcChannelEndpointnow usescreateDecoratedChannelBuilder().build()instead ofgetTransportChannel(), bypassing the GAXChannelPoolwrapper that does not supportgetState(). The grpc-gcp dynamic channel pool is preserved since it is applied via thechannelConfigurator.getState(true)to warm up IDLE channels without sending application RPCs. READY channels are skipped (no-op probe). Tracks per-endpointlastRealTrafficAtfor idle eviction after 30 minutes, andrequestEndpointRecreation()is called from the routing path when an evicted endpoint is needed again.TRANSIENT_FAILUREfor 3 consecutive probe cycles are evicted and their channel pools shut down. They are recreated on demand when the routing cache references them again.ChannelFindercollects all active server addresses across cached tablets and reports them to the lifecycle manager. Endpoints no longer referenced by any tablet (e.g. after a tablet migrates fromserver1:15000toserver2:15000) are evicted immediately instead of lingering until idle eviction.shouldSkip()detects shutdown channels from evicted endpoints, clears the cached reference, and re-lookups from the cache so routing picks up recreated endpoints.