Skip to content

Commit a0da993

Browse files
authored
feat(storage): Enhance Otel Span Attributes with BucketId and Location details for every Bucket/Blob operation
# feat(storage): Enhance Otel Span Attributes with BucketId and Location details for every Bucket/Blob operation as part of ACO (App-centric Observability) This PR implements **App-centric Observability (ACO)** tracing compatibility for the GCS Python SDK (`google-cloud-storage`). All OpenTelemetry trace spans produced by bucket and blob operations now seamlessly incorporate mandatory destination resource annotations (`gcp.resource.destination.id` and `gcp.resource.destination.location`). --- ## Core Architecture & Design ### 1. Centralized, DRY Telemetry Helper (`_helpers.py`) - All OpenTelemetry span context generation, attribute injection, and exception trapping are centralized in a module-level context manager `create_trace_span_helper` in [`_helpers.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/google/cloud/storage/_helpers.py). - **Zero modifications to the core tracing module**: [`_opentelemetry_tracing.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/google/cloud/storage/_opentelemetry_tracing.py) remains completely pristine and identical to `main`. - Seamlessly wrapped all critical read/write operations across `blob.py`, `bucket.py`, and `client.py` (e.g., `download_as_bytes`, `upload_from_string`, `get_bucket`, `lookup_bucket`, etc.). ### 2. Bounded LRU Metadata Cache (`_lru_cache.py`, `_bucket_metadata_cache.py`) - **LRU Capacity Bounding**: Implemented `LRUCache` utilizing an `OrderedDict` to support O(1) operations and strict capacity bounding to eliminate memory leaks in long-running applications. - **Concurrent Singleflight Warming**: Implemented `BucketMetadataCache` to store bucket locations and project numbers. On cache misses, it spawns background threads (`_fetch_background`) using singleflight tracking (`_inflight_fetches`) to prevent server stampedes / thundering herds. - **Fallback Annotations on 403**: On GCS `403 Forbidden` permissions errors, the cache permanently registers fallback annotations (`projects/_/buckets/{name}`) to completely avoid retry storms on subsequent API calls. ### 3. Resilient 404 Existence Eviction (`_http.py`, `_helpers.py`, `bucket.py`) - **Smart Out-of-band 404 Verification**: When a `404 NotFound` error occurs during media transfers or REST calls, a background thread is spawned (with concurrency protection via `_inflight_checks`) to check if the bucket was deleted out-of-band (`bucket.exists()`). If `exists()` returns `False`, the bucket is cleanly evicted from the cache. - **Instant Synchronous Eviction**: Direct `Bucket.delete()` calls synchronously and instantly evict the bucket name from the cache, ensuring real-time consistency. --- ## Extensive Testing Suite ### 1. 100% Sleep-Free System Tests (`test_aco_observability.py`) Added a comprehensive system test suite [`test_aco_observability.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/system/test_aco_observability.py) executing against a live GCS backend: - **Sequential Priming**: Verifies cache miss return times, background priming, and subsequent span enrichment. - **403 Fallback**: Verifies minimal fallback registration on Forbidden responses. - **Cache Stampede Protection**: Simulates 15 concurrent threads on a cache miss and asserts only 1 GCS call is fired. - **Smart 404 Eviction**: Deletes a bucket out-of-band and verifies async cache clean-up on 404. - **Synchronous Delete Eviction**: Asserts immediate cache eviction on SDK deletion. - **LRU Capacity Bounding**: Populates the cache beyond its limits and verifies proper LRU eviction. - **Deterministic Synchronization**: Uses **`threading.Event` (zero static sleeps)** for thread coordination, guaranteeing thundering-fast execution and completely eliminating timing flakiness. ### 2. Robust Unit Tests - Added [`test__lru_cache.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test__lru_cache.py) (LRU correctness, bounding, eviction). - Added [`test__bucket_metadata_cache.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py) (concurrency, location resolution, 403 fallback, singleflight). - Added `test_delete_hit_evicts_from_cache` inside [`test_bucket.py`](file:///usr/local/google/home/chandrasiri/storage_related/org-google-cloud-python/packages/google-cloud-storage/tests/unit/test_bucket.py). --- ## Validation Results All checks, unit tests, and live GCS system tests pass flawlessly: - **Unit Tests**: 835 passed in 17.82s - **System Tests**: 8 passed in 26.94s - **Format & Linter**: 100% clean (`black` / `flake8`)
1 parent 4d64ebc commit a0da993

14 files changed

Lines changed: 1584 additions & 86 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO)."""
16+
17+
import logging
18+
import threading
19+
20+
from google.api_core import exceptions as api_exceptions
21+
from google.cloud.exceptions import NotFound
22+
from google.cloud.storage._lru_cache import LRUCache
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class BucketMetadataCache:
28+
"""Thread-safe LRU cache for storing GCS bucket metadata (project number and location).
29+
30+
Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses.
31+
"""
32+
33+
def __init__(self, client, max_size=10000):
34+
self._client = client
35+
self._cache = LRUCache(max_size)
36+
self._lock = threading.Lock()
37+
self._inflight_fetches = set()
38+
self._inflight_checks = set()
39+
40+
def get(self, bucket_name):
41+
"""Thread-safely retrieve cached metadata without queueing fetch."""
42+
with self._lock:
43+
return self._cache.get(bucket_name)
44+
45+
def get_or_queue_fetch(self, bucket_name):
46+
"""Retrieve bucket metadata or queue a background fetch on cache miss.
47+
48+
Returns None immediately on cache miss so caller does not block.
49+
"""
50+
with self._lock:
51+
if bucket_name in self._cache:
52+
return self._cache.get(bucket_name)
53+
elif bucket_name in self._inflight_fetches:
54+
# This handles a thundering herd where 'n' threads
55+
# simultaneously experience a cache miss while 1 is already
56+
# fetching metadata. The remaining n - 1 threads should
57+
# bypass starting duplicate fetches.
58+
return None
59+
else:
60+
# fire a background thread and get bucket metadata.
61+
self._inflight_fetches.add(bucket_name)
62+
threading.Thread(
63+
target=self._fetch_background, args=(bucket_name,), daemon=True
64+
).start()
65+
return None
66+
67+
def check_and_evict(self, bucket_name):
68+
"""Asynchronously verify if a bucket exists on 404 and evict if deleted."""
69+
with self._lock:
70+
if bucket_name not in self._cache:
71+
return
72+
if bucket_name in self._inflight_checks:
73+
return
74+
self._inflight_checks.add(bucket_name)
75+
threading.Thread(
76+
target=self._verify_existence_background,
77+
args=(bucket_name,),
78+
daemon=True,
79+
).start()
80+
81+
def _verify_existence_background(self, bucket_name):
82+
try:
83+
bucket = self._client.bucket(bucket_name)
84+
if not bucket.exists():
85+
self.evict(bucket_name)
86+
except Exception as e:
87+
logger.debug(
88+
f"Background verification for bucket existence failed for {bucket_name}: {e}"
89+
)
90+
finally:
91+
with self._lock:
92+
self._inflight_checks.discard(bucket_name)
93+
94+
def _fetch_background(self, bucket_name):
95+
"""Asynchronously fetch bucket metadata and update the cache."""
96+
try:
97+
bucket = self._client.get_bucket(bucket_name, timeout=10.0)
98+
self.update_from_bucket(bucket)
99+
except (NotFound, api_exceptions.NotFound):
100+
self.evict(bucket_name)
101+
except api_exceptions.Forbidden:
102+
# On 403 (Forbidden), cache fallback values permanently to avoid retry storms
103+
self.update_cache(
104+
bucket_name, f"projects/_/buckets/{bucket_name}", "global"
105+
)
106+
except Exception as e:
107+
logger.debug(
108+
f"Background fetch for bucket metadata failed for {bucket_name}: {e}"
109+
)
110+
finally:
111+
with self._lock:
112+
self._inflight_fetches.discard(bucket_name)
113+
114+
def update_from_bucket(self, bucket):
115+
"""Update cache from a Bucket instance."""
116+
if not bucket or not bucket.name:
117+
return
118+
119+
project_number = getattr(bucket, "project_number", None)
120+
location = getattr(bucket, "location", None) or "global"
121+
location = location.lower()
122+
location_type = getattr(bucket, "location_type", None) or "region"
123+
location_type = location_type.lower()
124+
125+
if location_type in ("multi-region", "dual-region"):
126+
location = "global"
127+
128+
if project_number:
129+
destination_id = f"projects/{project_number}/buckets/{bucket.name}"
130+
else:
131+
destination_id = f"projects/_/buckets/{bucket.name}"
132+
133+
self.update_cache(bucket.name, destination_id, location)
134+
135+
def update_cache(self, bucket_name, destination_id, location):
136+
"""Thread-safely update or insert a cache entry with bounded size."""
137+
with self._lock:
138+
self._cache.put(bucket_name, (destination_id, location))
139+
140+
def evict(self, bucket_name):
141+
"""Remove a bucket from the cache (e.g., on 404)."""
142+
with self._lock:
143+
self._cache.delete(bucket_name)
144+
145+
def clear(self):
146+
"""Clear all cached metadata."""
147+
with self._lock:
148+
self._cache.clear()
149+
self._inflight_fetches.clear()
150+
self._inflight_checks.clear()

packages/google-cloud-storage/google/cloud/storage/_helpers.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,30 @@
1919

2020
import base64
2121
import datetime
22+
import logging
2223
import os
2324
import secrets
2425
import sys
26+
from contextlib import contextmanager
2527
from hashlib import md5
2628
from urllib.parse import urlsplit, urlunsplit
2729
from uuid import uuid4
2830

31+
from google.api_core import exceptions as api_exceptions
32+
from google.cloud.exceptions import NotFound
33+
2934
from google.auth import environment_vars
3035

3136
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
3237
from google.cloud.storage.retry import (
3338
DEFAULT_RETRY,
3439
DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
3540
)
41+
from google.cloud.storage._opentelemetry_tracing import (
42+
create_trace_span as _base_create_trace_span,
43+
)
44+
45+
_logger = logging.getLogger(__name__)
3646

3747
STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme.
3848
"""Environment variable defining host for Storage emulator."""
@@ -137,6 +147,62 @@ def _validate_name(name):
137147
return name
138148

139149

150+
@contextmanager
151+
def create_trace_span_helper(client, bucket_name, name, attributes=None, **kwargs):
152+
span_attrs = dict(attributes) if attributes else {}
153+
154+
if (
155+
bucket_name
156+
and isinstance(bucket_name, str)
157+
and client
158+
and hasattr(client, "_bucket_metadata_cache")
159+
and client._bucket_metadata_cache
160+
):
161+
try:
162+
if name in (
163+
"Storage.Client.getBucket",
164+
"Storage.Client.lookupBucket",
165+
"Storage.Bucket.reload",
166+
"Storage.Bucket.exists",
167+
):
168+
cached = client._bucket_metadata_cache.get(bucket_name)
169+
else:
170+
cached = client._bucket_metadata_cache.get_or_queue_fetch(bucket_name)
171+
172+
if cached and isinstance(cached, tuple) and len(cached) == 2:
173+
dest_id, loc = cached
174+
span_attrs.update(
175+
{
176+
"gcp.resource.destination.id": dest_id,
177+
"gcp.resource.destination.location": loc,
178+
}
179+
)
180+
except Exception as e:
181+
_logger.debug(f"Failed cache lookup in create_trace_span_helper: {e}")
182+
183+
if "client" not in kwargs and client:
184+
kwargs["client"] = client
185+
186+
with _base_create_trace_span(name, attributes=span_attrs, **kwargs) as span:
187+
try:
188+
yield span
189+
except (NotFound, api_exceptions.NotFound):
190+
if (
191+
bucket_name
192+
and isinstance(bucket_name, str)
193+
and client
194+
and hasattr(client, "_bucket_metadata_cache")
195+
and client._bucket_metadata_cache
196+
):
197+
try:
198+
client._bucket_metadata_cache.check_and_evict(bucket_name)
199+
except Exception as e:
200+
_logger.debug(
201+
f"Failed cache eviction on 404 in create_trace_span_helper: {e}"
202+
)
203+
raise
204+
205+
140206
class _PropertyMixin(object):
141207
"""Abstract mixin for cloud storage classes with associated properties.
142208
@@ -185,6 +251,42 @@ def _require_client(self, client):
185251
client = self.client
186252
return client
187253

254+
@contextmanager
255+
def _create_trace_span(self, name, attributes=None, **kwargs):
256+
from google.cloud.storage.blob import Blob
257+
from google.cloud.storage.bucket import Bucket
258+
259+
if isinstance(self, Bucket):
260+
client = self.client
261+
bucket_name = self.name
262+
elif isinstance(self, Blob):
263+
bucket = getattr(self, "bucket", None)
264+
client = (
265+
getattr(bucket, "client", None)
266+
if bucket and hasattr(bucket, "client")
267+
else None
268+
)
269+
bucket_name = getattr(bucket, "name", None) if bucket else None
270+
else:
271+
client = None
272+
bucket_name = None
273+
274+
if callable(bucket_name):
275+
try:
276+
bucket_name = bucket_name()
277+
except Exception as e:
278+
_logger.debug(
279+
f"Failed callable bucket_name resolution in _create_trace_span: {e}"
280+
)
281+
282+
client_override = kwargs.pop("client", None)
283+
active_client = client_override or client
284+
285+
with create_trace_span_helper(
286+
active_client, bucket_name, name, attributes=attributes, **kwargs
287+
) as span:
288+
yield span
289+
188290
def _encryption_headers(self):
189291
"""Return any encryption headers needed to fetch the object.
190292

packages/google-cloud-storage/google/cloud/storage/_http.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515
"""Create / interact with Google Cloud Storage connections."""
1616

1717
import functools
18+
import logging
19+
import re
1820

21+
from google.api_core import exceptions as api_exceptions
1922
from google.cloud import _http
23+
from google.cloud.exceptions import NotFound
2024
from google.cloud.storage import __version__, _helpers
21-
from google.cloud.storage._opentelemetry_tracing import create_trace_span
25+
from google.cloud.storage._opentelemetry_tracing import (
26+
create_trace_span,
27+
enable_otel_traces,
28+
HAS_OPENTELEMETRY,
29+
)
30+
31+
logger = logging.getLogger(__name__)
2232

2333

2434
class Connection(_http.JSONConnection):
@@ -71,11 +81,30 @@ def api_request(self, *args, **kwargs):
7181
span_attributes = {
7282
"gccl-invocation-id": invocation_id,
7383
}
84+
client = self._client
85+
if (
86+
HAS_OPENTELEMETRY
87+
and enable_otel_traces
88+
and hasattr(client, "_bucket_metadata_cache")
89+
and client._bucket_metadata_cache
90+
):
91+
path = kwargs.get("path") or ""
92+
match = re.search(r"/b/([^/?#]+)", path)
93+
if match:
94+
try:
95+
cached = client._bucket_metadata_cache.get(match.group(1))
96+
if cached and isinstance(cached, tuple) and len(cached) == 2:
97+
dest_id, loc = cached
98+
span_attributes["gcp.resource.destination.id"] = dest_id
99+
span_attributes["gcp.resource.destination.location"] = loc
100+
except Exception as e:
101+
logger.debug(f"Failed cache.get_or_queue_fetch in api_request: {e}")
102+
74103
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
75104
with create_trace_span(
76105
name="Storage.Connection.api_request",
77106
attributes=span_attributes,
78-
client=self._client,
107+
client=client,
79108
api_request=kwargs,
80109
retry=retry,
81110
):
@@ -87,4 +116,24 @@ def api_request(self, *args, **kwargs):
87116
pass
88117
if retry:
89118
call = retry(call)
90-
return call()
119+
try:
120+
return call()
121+
except (NotFound, api_exceptions.NotFound):
122+
if (
123+
HAS_OPENTELEMETRY
124+
and enable_otel_traces
125+
and hasattr(client, "_bucket_metadata_cache")
126+
and client._bucket_metadata_cache
127+
):
128+
path = kwargs.get("path") or ""
129+
match = re.search(r"/b/([^/?#]+)", path)
130+
if match:
131+
try:
132+
client._bucket_metadata_cache.check_and_evict(
133+
match.group(1)
134+
)
135+
except Exception as e:
136+
logger.debug(
137+
f"Failed cache.check_and_evict on 404 in api_request: {e}"
138+
)
139+
raise

0 commit comments

Comments
 (0)