Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add OpenTelemetry tracing to spanner calls #107

Merged
merged 7 commits into from Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.rst
Expand Up @@ -23,6 +23,7 @@ API Documentation

api-reference
advanced-session-pool-topics
opentelemetry-tracing

Changelog
---------
Expand Down
36 changes: 36 additions & 0 deletions docs/opentelemetry-tracing.rst
@@ -0,0 +1,36 @@
Tracing with OpenTelemetry
==================================
This library uses `OpenTelemetry <https://opentelemetry.io/>`_ to automatically generate traces providing insight on calls to Cloud Spanner.
For information on the benefits and utility of tracing, see the `Cloud Trace docs <https://cloud.google.com/trace/docs/overview>`_.

To take advantage of these traces, we first need to install opentelemetry:

.. code-block:: sh

pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation

We also need to tell OpenTelemetry which exporter to use. For example, to export python-spanner traces to `Cloud Tracing <https://cloud.google.com/trace>`_, add the following lines to your application:

.. code:: python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.sampling import ProbabilitySampler
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
# BatchExportSpanProcessor exports spans to Cloud Trace
# in a seperate thread to not block on the main thread
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor

# Create and export one trace every 1000 requests
sampler = ProbabilitySampler(1/1000)
# Use the default tracer provider
trace.set_tracer_provider(TracerProvider(sampler=sampler))
trace.get_tracer_provider().add_span_processor(
# Initialize the cloud tracing exporter
BatchExportSpanProcessor(CloudTraceSpanExporter())
)

Generated spanner traces should now be available on `Cloud Trace <https://console.cloud.google.com/traces>`_.

Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_
65 changes: 65 additions & 0 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
@@ -0,0 +1,65 @@
# Copyright 2020 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Manages OpenTelemetry trace creation and handling"""

from contextlib import contextmanager

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.spanner_v1.gapic import spanner_client

try:
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCanonicalCode
from opentelemetry.instrumentation.utils import http_status_to_canonical_code

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError:
HAS_OPENTELEMETRY_INSTALLED = False


@contextmanager
def trace_call(name, session, extra_attributes=None):
if not HAS_OPENTELEMETRY_INSTALLED or not session:
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return

tracer = trace.get_tracer(__name__)

# Set base attributes that we know for every trace created
attributes = {
"db.type": "spanner",
"db.url": spanner_client.SpannerClient.SERVICE_ADDRESS,
larkee marked this conversation as resolved.
Show resolved Hide resolved
"db.instance": session._database.name,
"net.host.name": spanner_client.SpannerClient.SERVICE_ADDRESS,
}

if extra_attributes:
attributes.update(extra_attributes)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
try:
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_canonical_code(error.code)))
elif error.grpc_status_code is not None:
span.set_status(
# OpenTelemetry's StatusCanonicalCode maps 1-1 with grpc status codes
Status(StatusCanonicalCode(error.grpc_status_code.value[0]))
)
raise
15 changes: 9 additions & 6 deletions google/cloud/spanner_v1/batch.py
Expand Up @@ -22,6 +22,7 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call

# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -147,12 +148,14 @@ def commit(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
response = api.commit(
self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
metadata=metadata,
)
trace_attributes = {"num_mutations": len(self._mutations)}
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
response = api.commit(
self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
metadata=metadata,
)
self.committed = _pb_timestamp_to_datetime(response.commit_timestamp)
return self.committed

Expand Down
25 changes: 18 additions & 7 deletions google/cloud/spanner_v1/session.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
import random

# pylint: enable=ungrouped-imports
Expand Down Expand Up @@ -114,7 +115,11 @@ def create(self):
kw = {}
if self._labels:
kw = {"session": {"labels": self._labels}}
session_pb = api.create_session(self._database.name, metadata=metadata, **kw)

with trace_call("CloudSpanner.CreateSession", self, self._labels):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder do we have a uniform format for name?

I guess it is okay to use this format because every language has a different format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to follow Java's naming system when I wrote the doc, but I guess I missed the Operation part. Am happy to leave it as is or change it to match one of the languages, but I'm not sure what the right naming scheme is personally.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go is a bit special. Can you match it with Java? Thanks for the update.

session_pb = api.create_session(
self._database.name, metadata=metadata, **kw
)
self._session_id = session_pb.name.split("/")[-1]

def exists(self):
Expand All @@ -130,10 +135,16 @@ def exists(self):
return False
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
try:
api.get_session(self.name, metadata=metadata)
except NotFound:
return False

with trace_call("CloudSpanner.GetSession", self) as span:
try:
api.get_session(self.name, metadata=metadata)
if span:
span.set_attribute("session_found", True)
except NotFound:
if span:
span.set_attribute("session_found", False)
return False

return True

Expand All @@ -150,8 +161,8 @@ def delete(self):
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)

api.delete_session(self.name, metadata=metadata)
with trace_call("CloudSpanner.DeleteSession", self):
api.delete_session(self.name, metadata=metadata)

def ping(self):
"""Ping the session to keep it alive by executing "SELECT 1".
Expand Down
77 changes: 50 additions & 27 deletions google/cloud/spanner_v1/snapshot.py
Expand Up @@ -30,17 +30,19 @@
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1.streamed import StreamedResultSet
from google.cloud.spanner_v1.types import PartitionOptions
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call


def _restart_on_unavailable(restart):
def _restart_on_unavailable(restart, trace_name=None, session=None, attributes=None):
"""Restart iteration after :exc:`.ServiceUnavailable`.

:type restart: callable
:param restart: curried function returning iterator
"""
resume_token = b""
item_buffer = []
iterator = restart()
with trace_call(trace_name, session, attributes):
iterator = restart()
while True:
try:
for item in iterator:
Expand All @@ -50,7 +52,8 @@ def _restart_on_unavailable(restart):
break
except ServiceUnavailable:
del item_buffer[:]
iterator = restart(resume_token=resume_token)
with trace_call(trace_name, session, attributes):
iterator = restart(resume_token=resume_token)
continue

if len(item_buffer) == 0:
Expand Down Expand Up @@ -143,7 +146,10 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
metadata=metadata,
)

iterator = _restart_on_unavailable(restart)
trace_attributes = {"table_id": table, "columns": columns}
iterator = _restart_on_unavailable(
restart, "CloudSpanner.ReadOnlyTransaction", self._session, trace_attributes
)

self._read_request_count += 1

Expand Down Expand Up @@ -243,7 +249,13 @@ def execute_sql(
timeout=timeout,
)

iterator = _restart_on_unavailable(restart)
trace_attributes = {"db.statement": sql}
hengfengli marked this conversation as resolved.
Show resolved Hide resolved
iterator = _restart_on_unavailable(
restart,
"CloudSpanner.ReadWriteTransaction",
self._session,
trace_attributes,
)

self._read_request_count += 1
self._execute_sql_count += 1
Expand Down Expand Up @@ -309,16 +321,20 @@ def partition_read(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)

response = api.partition_read(
session=self._session.name,
table=table,
columns=columns,
key_set=keyset._to_pb(),
transaction=transaction,
index=index,
partition_options=partition_options,
metadata=metadata,
)
trace_attributes = {"table_id": table, "columns": columns}
with trace_call(
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
):
response = api.partition_read(
session=self._session.name,
table=table,
columns=columns,
key_set=keyset._to_pb(),
transaction=transaction,
index=index,
partition_options=partition_options,
metadata=metadata,
)

return [partition.partition_token for partition in response.partitions]

Expand Down Expand Up @@ -385,15 +401,21 @@ def partition_query(
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
)

response = api.partition_query(
session=self._session.name,
sql=sql,
transaction=transaction,
params=params_pb,
param_types=param_types,
partition_options=partition_options,
metadata=metadata,
)
trace_attributes = {"db.statement": sql}
with trace_call(
"CloudSpanner.PartitionReadWriteTransaction",
self._session,
trace_attributes,
):
response = api.partition_query(
session=self._session.name,
sql=sql,
transaction=transaction,
params=params_pb,
param_types=param_types,
partition_options=partition_options,
metadata=metadata,
)

return [partition.partition_token for partition in response.partitions]

Expand Down Expand Up @@ -515,8 +537,9 @@ def begin(self):
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
txn_selector = self._make_txn_selector()
response = api.begin_transaction(
self._session.name, txn_selector.begin, metadata=metadata
)
with trace_call("CloudSpanner.BeginTransaction", self._session):
response = api.begin_transaction(
self._session.name, txn_selector.begin, metadata=metadata
)
self._transaction_id = response.id
return self._transaction_id