Skip to content

Commit d37fb80

Browse files
feat: add cloud.region, request_tag and transaction_tag in span attributes (#1449)
This change enhances observability by introducing these new features: 1. **Cloud Region Attribute**: The `cloud.region` attribute is now added to all OpenTelemetry spans generated by the Spanner client. This provides better geographical context for traces, aiding in performance analysis and debugging across different regions. 2. **Transaction Tag**: The `transaction_tag` set on a `Transaction` object is now correctly propagated and included in the `Commit` request. This allows for better end-to-end traceability of transactions. 3. **Request Tag**: This introduces support for `request_tag` on individual Spanner operations like `read`, `execute_sql`, and `execute_update`. When a `request_tag` is provided in the `request_options`, it is now added as a `spanner.request_tag` attribute to the corresponding OpenTelemetry span. This allows for more granular tracing and debugging of specific requests within a transaction or a snapshot. --------- Co-authored-by: surbhigarg92 <surbhigarg.92@gmail.com>
1 parent 8b6f154 commit d37fb80

16 files changed

+1115
-338
lines changed

google/cloud/spanner_v1/_helpers.py

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import time
2121
import base64
2222
import threading
23+
import logging
2324

2425
from google.protobuf.struct_pb2 import ListValue
2526
from google.protobuf.struct_pb2 import Value
@@ -29,16 +30,27 @@
2930
from google.api_core import datetime_helpers
3031
from google.api_core.exceptions import Aborted
3132
from google.cloud._helpers import _date_from_iso8601_date
32-
from google.cloud.spanner_v1 import TypeCode
33-
from google.cloud.spanner_v1 import ExecuteSqlRequest
34-
from google.cloud.spanner_v1 import JsonObject, Interval
35-
from google.cloud.spanner_v1 import TransactionOptions
33+
from google.cloud.spanner_v1.types import ExecuteSqlRequest
34+
from google.cloud.spanner_v1.types import TransactionOptions
35+
from google.cloud.spanner_v1.data_types import JsonObject, Interval
3636
from google.cloud.spanner_v1.request_id_header import with_request_id
37+
from google.cloud.spanner_v1.types import TypeCode
38+
3739
from google.rpc.error_details_pb2 import RetryInfo
3840

3941
try:
4042
from opentelemetry.propagate import inject
4143
from opentelemetry.propagators.textmap import Setter
44+
from opentelemetry.semconv.resource import ResourceAttributes
45+
from opentelemetry.resourcedetector import gcp_resource_detector
46+
from opentelemetry.resourcedetector.gcp_resource_detector import (
47+
GoogleCloudResourceDetector,
48+
)
49+
50+
# Overwrite the requests timeout for the detector.
51+
# This is necessary as the client will wait the full timeout if the
52+
# code is not run in a GCP environment, with the location endpoints available.
53+
gcp_resource_detector._TIMEOUT_SEC = 0.2
4254

4355
HAS_OPENTELEMETRY_INSTALLED = True
4456
except ImportError:
@@ -55,6 +67,12 @@
5567
+ "numeric has a whole component with precision {}"
5668
)
5769

70+
GOOGLE_CLOUD_REGION_GLOBAL = "global"
71+
72+
log = logging.getLogger(__name__)
73+
74+
_cloud_region: str = None
75+
5876

5977
if HAS_OPENTELEMETRY_INSTALLED:
6078

@@ -79,6 +97,33 @@ def set(self, carrier: List[Tuple[str, str]], key: str, value: str) -> None:
7997
carrier.append((key, value))
8098

8199

100+
def _get_cloud_region() -> str:
101+
"""Get the location of the resource, caching the result.
102+
103+
Returns:
104+
str: The location of the resource. If OpenTelemetry is not installed, returns a global region.
105+
"""
106+
global _cloud_region
107+
if _cloud_region is not None:
108+
return _cloud_region
109+
110+
try:
111+
detector = GoogleCloudResourceDetector()
112+
resources = detector.detect()
113+
if ResourceAttributes.CLOUD_REGION in resources.attributes:
114+
_cloud_region = resources.attributes[ResourceAttributes.CLOUD_REGION]
115+
else:
116+
_cloud_region = GOOGLE_CLOUD_REGION_GLOBAL
117+
except Exception as e:
118+
log.warning(
119+
"Failed to detect GCP resource location for Spanner metrics, defaulting to 'global'. Error: %s",
120+
e,
121+
)
122+
_cloud_region = GOOGLE_CLOUD_REGION_GLOBAL
123+
124+
return _cloud_region
125+
126+
82127
def _try_to_coerce_bytes(bytestring):
83128
"""Try to coerce a byte string into the right thing based on Python
84129
version and whether or not it is base64 encoded.

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from google.cloud.spanner_v1 import SpannerClient
2222
from google.cloud.spanner_v1 import gapic_version
2323
from google.cloud.spanner_v1._helpers import (
24+
_get_cloud_region,
2425
_metadata_with_span_context,
2526
)
2627

@@ -75,6 +76,7 @@ def trace_call(
7576
enable_end_to_end_tracing = False
7677

7778
db_name = ""
79+
cloud_region = None
7880
if session and getattr(session, "_database", None):
7981
db_name = session._database.name
8082

@@ -88,6 +90,7 @@ def trace_call(
8890
)
8991
db_name = observability_options.get("db_name", db_name)
9092

93+
cloud_region = _get_cloud_region()
9194
tracer = get_tracer(tracer_provider)
9295

9396
# Set base attributes that we know for every trace created
@@ -97,6 +100,7 @@ def trace_call(
97100
"db.instance": db_name,
98101
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
99102
OTEL_SCOPE_NAME: TRACER_NAME,
103+
"cloud.region": cloud_region,
100104
OTEL_SCOPE_VERSION: TRACER_VERSION,
101105
# Standard GCP attributes for OTel, attributes are used for internal purpose and are subjected to change
102106
"gcp.client.service": "spanner",
@@ -107,6 +111,11 @@ def trace_call(
107111
if extra_attributes:
108112
attributes.update(extra_attributes)
109113

114+
if "request_options" in attributes:
115+
request_options = attributes.pop("request_options")
116+
if request_options and request_options.request_tag:
117+
attributes["request.tag"] = request_options.request_tag
118+
110119
if extended_tracing_globally_disabled:
111120
enable_extended_tracing = False
112121

google/cloud/spanner_v1/database.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,8 +1025,14 @@ def run_in_transaction(self, func, *args, **kw):
10251025
reraises any non-ABORT exceptions raised by ``func``.
10261026
"""
10271027
observability_options = getattr(self, "observability_options", None)
1028+
transaction_tag = kw.get("transaction_tag")
1029+
extra_attributes = {}
1030+
if transaction_tag:
1031+
extra_attributes["transaction.tag"] = transaction_tag
1032+
10281033
with trace_call(
10291034
"CloudSpanner.Database.run_in_transaction",
1035+
extra_attributes=extra_attributes,
10301036
observability_options=observability_options,
10311037
), MetricsCapture():
10321038
# Sanity check: Is there a transaction already running?

google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,9 @@
1818
from .metrics_tracer_factory import MetricsTracerFactory
1919
import os
2020
import logging
21-
from .constants import (
22-
SPANNER_SERVICE_NAME,
23-
GOOGLE_CLOUD_REGION_KEY,
24-
GOOGLE_CLOUD_REGION_GLOBAL,
25-
)
21+
from .constants import SPANNER_SERVICE_NAME
2622

2723
try:
28-
from opentelemetry.resourcedetector import gcp_resource_detector
29-
30-
# Overwrite the requests timeout for the detector.
31-
# This is necessary as the client will wait the full timeout if the
32-
# code is not run in a GCP environment, with the location endpoints available.
33-
gcp_resource_detector._TIMEOUT_SEC = 0.2
34-
3524
import mmh3
3625

3726
logging.getLogger("opentelemetry.resourcedetector.gcp_resource_detector").setLevel(
@@ -44,6 +33,7 @@
4433

4534
from .metrics_tracer import MetricsTracer
4635
from google.cloud.spanner_v1 import __version__
36+
from google.cloud.spanner_v1._helpers import _get_cloud_region
4737
from uuid import uuid4
4838

4939
log = logging.getLogger(__name__)
@@ -86,7 +76,7 @@ def __new__(
8676
cls._metrics_tracer_factory.set_client_hash(
8777
cls._generate_client_hash(client_uid)
8878
)
89-
cls._metrics_tracer_factory.set_location(cls._get_location())
79+
cls._metrics_tracer_factory.set_location(_get_cloud_region())
9080
cls._metrics_tracer_factory.gfe_enabled = gfe_enabled
9181

9282
if cls._metrics_tracer_factory.enabled != enabled:
@@ -153,28 +143,3 @@ def _generate_client_hash(client_uid: str) -> str:
153143

154144
# Return as 6 digit zero padded hex string
155145
return f"{sig_figs:06x}"
156-
157-
@staticmethod
158-
def _get_location() -> str:
159-
"""Get the location of the resource.
160-
161-
In case of any error during detection, this method will log a warning
162-
and default to the "global" location.
163-
164-
Returns:
165-
str: The location of the resource. If OpenTelemetry is not installed, returns a global region.
166-
"""
167-
if not HAS_OPENTELEMETRY_INSTALLED:
168-
return GOOGLE_CLOUD_REGION_GLOBAL
169-
try:
170-
detector = gcp_resource_detector.GoogleCloudResourceDetector()
171-
resources = detector.detect()
172-
173-
if GOOGLE_CLOUD_REGION_KEY in resources.attributes:
174-
return resources.attributes[GOOGLE_CLOUD_REGION_KEY]
175-
except Exception as e:
176-
log.warning(
177-
"Failed to detect GCP resource location for Spanner metrics, defaulting to 'global'. Error: %s",
178-
e,
179-
)
180-
return GOOGLE_CLOUD_REGION_GLOBAL

google/cloud/spanner_v1/session.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,9 +532,14 @@ def run_in_transaction(self, func, *args, **kw):
532532
database = self._database
533533
log_commit_stats = database.log_commit_stats
534534

535+
extra_attributes = {}
536+
if transaction_tag:
537+
extra_attributes["transaction.tag"] = transaction_tag
538+
535539
with trace_call(
536540
"CloudSpanner.Session.run_in_transaction",
537541
self,
542+
extra_attributes=extra_attributes,
538543
observability_options=getattr(database, "observability_options", None),
539544
) as span, MetricsCapture():
540545
attempts: int = 0

google/cloud/spanner_v1/snapshot.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,11 @@ def read(
409409
method=streaming_read_method,
410410
request=read_request,
411411
metadata=metadata,
412-
trace_attributes={"table_id": table, "columns": columns},
412+
trace_attributes={
413+
"table_id": table,
414+
"columns": columns,
415+
"request_options": request_options,
416+
},
413417
column_info=column_info,
414418
lazy_decode=lazy_decode,
415419
)
@@ -601,7 +605,7 @@ def execute_sql(
601605
method=execute_streaming_sql_method,
602606
request=execute_sql_request,
603607
metadata=metadata,
604-
trace_attributes={"db.statement": sql},
608+
trace_attributes={"db.statement": sql, "request_options": request_options},
605609
column_info=column_info,
606610
lazy_decode=lazy_decode,
607611
)

google/cloud/spanner_v1/transaction.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,10 @@ def execute_update(
479479
request_options = RequestOptions(request_options)
480480
request_options.transaction_tag = self.transaction_tag
481481

482-
trace_attributes = {"db.statement": dml}
482+
trace_attributes = {
483+
"db.statement": dml,
484+
"request_options": request_options,
485+
}
483486

484487
# If this request begins the transaction, we need to lock
485488
# the transaction until the transaction ID is updated.
@@ -629,7 +632,8 @@ def batch_update(
629632

630633
trace_attributes = {
631634
# Get just the queries from the DML statement batch
632-
"db.statement": ";".join([statement.sql for statement in parsed])
635+
"db.statement": ";".join([statement.sql for statement in parsed]),
636+
"request_options": request_options,
633637
}
634638

635639
# If this request begins the transaction, we need to lock

tests/system/test_session_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from google.cloud.spanner_admin_database_v1 import DatabaseDialect
3131
from google.cloud._helpers import UTC
3232

33+
from google.cloud.spanner_v1._helpers import _get_cloud_region
3334
from google.cloud.spanner_v1._helpers import AtomicCounter
3435
from google.cloud.spanner_v1.data_types import JsonObject
3536
from google.cloud.spanner_v1.database_sessions_manager import TransactionType
@@ -356,6 +357,7 @@ def _make_attributes(db_instance, **kwargs):
356357
"db.url": "spanner.googleapis.com",
357358
"net.host.name": "spanner.googleapis.com",
358359
"db.instance": db_instance,
360+
"cloud.region": _get_cloud_region(),
359361
"gcp.client.service": "spanner",
360362
"gcp.client.version": ot_helpers.LIB_VERSION,
361363
"gcp.client.repo": "googleapis/python-spanner",

tests/unit/test__helpers.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
import unittest
1717
import mock
1818

19-
from google.cloud.spanner_v1 import TransactionOptions
19+
from opentelemetry.sdk.resources import Resource
20+
from opentelemetry.semconv.resource import ResourceAttributes
21+
22+
23+
from google.cloud.spanner_v1 import TransactionOptions, _helpers
2024

2125

2226
class Test_merge_query_options(unittest.TestCase):
@@ -89,6 +93,48 @@ def test_base_object_merge_dict(self):
8993
self.assertEqual(result, expected)
9094

9195

96+
class Test_get_cloud_region(unittest.TestCase):
97+
def setUp(self):
98+
_helpers._cloud_region = None
99+
100+
def _callFUT(self, *args, **kw):
101+
from google.cloud.spanner_v1._helpers import _get_cloud_region
102+
103+
return _get_cloud_region(*args, **kw)
104+
105+
@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
106+
def test_get_location_with_region(self, mock_detect):
107+
"""Test that _get_cloud_region returns the region when detected."""
108+
mock_resource = Resource.create(
109+
{ResourceAttributes.CLOUD_REGION: "us-central1"}
110+
)
111+
mock_detect.return_value = mock_resource
112+
113+
location = self._callFUT()
114+
self.assertEqual(location, "us-central1")
115+
116+
@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
117+
def test_get_location_without_region(self, mock_detect):
118+
"""Test that _get_cloud_region returns 'global' when no region is detected."""
119+
mock_resource = Resource.create({}) # No region attribute
120+
mock_detect.return_value = mock_resource
121+
122+
location = self._callFUT()
123+
self.assertEqual(location, "global")
124+
125+
@mock.patch("google.cloud.spanner_v1._helpers.GoogleCloudResourceDetector.detect")
126+
def test_get_location_with_exception(self, mock_detect):
127+
"""Test that _get_cloud_region returns 'global' and logs a warning on exception."""
128+
mock_detect.side_effect = Exception("detector failed")
129+
130+
with self.assertLogs(
131+
"google.cloud.spanner_v1._helpers", level="WARNING"
132+
) as log:
133+
location = self._callFUT()
134+
self.assertEqual(location, "global")
135+
self.assertIn("Failed to detect GCP resource location", log.output[0])
136+
137+
92138
class Test_make_value_pb(unittest.TestCase):
93139
def _callFUT(self, *args, **kw):
94140
from google.cloud.spanner_v1._helpers import _make_value_pb

0 commit comments

Comments
 (0)