From 14c0aa498372969e94eb69f1d9541743260f09fa Mon Sep 17 00:00:00 2001 From: Paul Van Eck Date: Tue, 25 Apr 2023 12:02:15 -0700 Subject: [PATCH] Refactor tracing utils Signed-off-by: Paul Van Eck --- .../azure/servicebus/_common/constants.py | 38 --- .../azure/servicebus/_common/message.py | 8 +- .../azure/servicebus/_common/tracing.py | 314 ++++++++++++++++++ .../azure/servicebus/_common/utils.py | 245 +------------- .../azure/servicebus/_servicebus_receiver.py | 10 +- .../azure/servicebus/_servicebus_sender.py | 48 ++- .../_transport/_pyamqp_transport.py | 8 +- .../servicebus/_transport/_uamqp_transport.py | 8 +- .../aio/_servicebus_receiver_async.py | 12 +- .../aio/_servicebus_sender_async.py | 47 ++- .../aio/_transport/_pyamqp_transport_async.py | 8 +- .../aio/_transport/_uamqp_transport_async.py | 5 +- 12 files changed, 376 insertions(+), 375 deletions(-) create mode 100644 sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py index 6e8059ac38d8f..204895a17475e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py @@ -137,44 +137,6 @@ "ServiceBusDlqSupplementaryAuthorization" ) -# Distributed Tracing Constants - -TRACE_COMPONENT_PROPERTY = "component" -TRACE_COMPONENT = "servicebus" - -TRACE_NAMESPACE_ATTRIBUTE = "az.namespace" -TRACE_NAMESPACE = "Microsoft.ServiceBus" - -SPAN_NAME_RECEIVE = "ServiceBus.receive" -SPAN_NAME_RECEIVE_DEFERRED = "ServiceBus.receive_deferred" -SPAN_NAME_PEEK = "ServiceBus.peek" -SPAN_NAME_SEND = "ServiceBus.send" -SPAN_NAME_SCHEDULE = "ServiceBus.schedule" -SPAN_NAME_MESSAGE = "ServiceBus.message" - -SPAN_ENQUEUED_TIME_PROPERTY = "enqueuedTime" - -TRACE_ENQUEUED_TIME_PROPERTY = b"x-opt-enqueued-time" -TRACE_PARENT_PROPERTY = b"traceparent" -TRACE_STATE_PROPERTY = b"tracestate" -TRACE_PROPERTY_ENCODING = "ascii" - -TRACE_MESSAGING_SYSTEM_ATTRIBUTE = "messaging.system" -TRACE_MESSAGING_SYSTEM = "servicebus" - -TRACE_NET_PEER_NAME_ATTRIBUTE = "net.peer.name" -TRACE_MESSAGING_DESTINATION_ATTRIBUTE = "messaging.destination.name" -TRACE_MESSAGING_SOURCE_ATTRIBUTE = "messaging.source.name" -TRACE_MESSAGING_OPERATION_ATTRIBUTE = "messaging.operation" -TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE = "messaging.batch.message_count" - -DIAGNOSTIC_ID_PROPERTY = b"Diagnostic-Id" - -class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta): - PUBLISH = "publish" - RECEIVE = "receive" - SETTLE = "settle" - MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP MESSAGE_PROPERTY_MAX_LENGTH = 128 # .NET TimeSpan.MaxValue: 10675199.02:48:05.4775807 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 347e3bd088b36..68fc61d169632 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -34,7 +34,7 @@ MAX_ABSOLUTE_EXPIRY_TIME, MAX_DURATION_VALUE, MAX_MESSAGE_LENGTH_BYTES, - MESSAGE_STATE_NAME, + MESSAGE_STATE_NAME ) from ..amqp import ( AmqpAnnotatedMessage, @@ -46,9 +46,9 @@ from .utils import ( utc_from_timestamp, utc_now, - trace_message, transform_outbound_messages, ) +from .tracing import trace_message if TYPE_CHECKING: try: @@ -60,7 +60,6 @@ except ImportError: pass from .._pyamqp.performatives import TransferFrame - from azure.core.tracing import AbstractSpan from ..aio._servicebus_receiver_async import ( ServiceBusReceiver as AsyncServiceBusReceiver, ) @@ -659,6 +658,7 @@ def __init__( **kwargs: Any ) -> None: self._amqp_transport = kwargs.pop("amqp_transport", PyamqpTransport) + self._tracing_attributes: Dict[str, Union[str, int]] = kwargs.pop("tracing_attributes", {}) self._max_size_in_bytes = max_size_in_bytes or MAX_MESSAGE_LENGTH_BYTES self._message = self._amqp_transport.build_batch_message([]) @@ -667,8 +667,6 @@ def __init__( self._messages: List[ServiceBusMessage] = [] self._uamqp_message: Optional[LegacyBatchMessage] = None - self._tracing_attributes: Dict[str, Union[str, int]] = {} - def __repr__(self) -> str: batch_repr = "max_size_in_bytes={}, message_count={}".format( self.max_size_in_bytes, self._count diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py new file mode 100644 index 0000000000000..e235b04aafddb --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py @@ -0,0 +1,314 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +from __future__ import annotations +from contextlib import contextmanager +from enum import Enum +import logging +from typing import ( + Dict, + Iterable, + Iterator, + List, + Optional, + Type, + TYPE_CHECKING, + Union, + cast, +) + +from azure.core import CaseInsensitiveEnumMeta +from azure.core.settings import settings +from azure.core.tracing import SpanKind, Link + +if TYPE_CHECKING: + try: + # pylint:disable=unused-import + from uamqp import Message as uamqp_Message + except ImportError: + uamqp_Message = None + from azure.core.tracing import AbstractSpan + + from .._pyamqp.message import Message as pyamqp_Message + from .message import ( + ServiceBusReceivedMessage, + ServiceBusMessage, + ServiceBusMessageBatch + ) + from .._base_handler import BaseHandler + from ..aio._base_handler_async import BaseHandler as BaseHandlerAsync + from .._servicebus_receiver import ServiceBusReceiver + from ..aio._servicebus_receiver_async import ServiceBusReceiver as ServiceBusReceiverAsync + from .._servicebus_sender import ServiceBusSender + from ..aio._servicebus_sender_async import ServiceBusSender as ServiceBusSenderAsync + from .._transport._base import AmqpTransport + from ..aio._transport._base_async import AmqpTransportAsync + + ReceiveMessageTypes = Union[ + ServiceBusReceivedMessage, + pyamqp_Message, + uamqp_Message + ] + +TRACE_DIAGNOSTIC_ID_PROPERTY = b"Diagnostic-Id" +TRACE_ENQUEUED_TIME_PROPERTY = b"x-opt-enqueued-time" +TRACE_PARENT_PROPERTY = b"traceparent" +TRACE_STATE_PROPERTY = b"tracestate" +TRACE_PROPERTY_ENCODING = "ascii" + +SPAN_ENQUEUED_TIME_PROPERTY = "enqueuedTime" + +SPAN_NAME_RECEIVE = "ServiceBus.receive" +SPAN_NAME_RECEIVE_DEFERRED = "ServiceBus.receive_deferred" +SPAN_NAME_PEEK = "ServiceBus.peek" +SPAN_NAME_SEND = "ServiceBus.send" +SPAN_NAME_SCHEDULE = "ServiceBus.schedule" +SPAN_NAME_MESSAGE = "ServiceBus.message" + + +_LOGGER = logging.getLogger(__name__) + +class TraceAttributes: + TRACE_NAMESPACE_ATTRIBUTE = "az.namespace" + TRACE_NAMESPACE = "Microsoft.ServiceBus" + + TRACE_MESSAGING_SYSTEM_ATTRIBUTE = "messaging.system" + TRACE_MESSAGING_SYSTEM = "servicebus" + + TRACE_NET_PEER_NAME_ATTRIBUTE = "net.peer.name" + TRACE_MESSAGING_DESTINATION_ATTRIBUTE = "messaging.destination.name" + TRACE_MESSAGING_SOURCE_ATTRIBUTE = "messaging.source.name" + TRACE_MESSAGING_OPERATION_ATTRIBUTE = "messaging.operation" + TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE = "messaging.batch.message_count" + + LEGACY_TRACE_COMPONENT_ATTRIBUTE = "component" + LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE = "message_bus.destination" + LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE = "peer.address" + + +class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta): + PUBLISH = "publish" + RECEIVE = "receive" + SETTLE = "settle" + + +def is_tracing_enabled(): + span_impl_type = settings.tracing_implementation() + return span_impl_type is not None + + +@contextmanager +def send_trace_context_manager( + sender: Union[ServiceBusSender, ServiceBusSenderAsync], + span_name: str = SPAN_NAME_SEND, + links: Optional[List[Link]] = None +) -> Iterator[None]: + """Tracing for sending messages.""" + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links) as span: + add_span_attributes(span, TraceOperationTypes.PUBLISH, sender, message_count=len(links)) + yield + else: + yield + + +@contextmanager +def receive_trace_context_manager( + receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], + span_name: str = SPAN_NAME_RECEIVE, + links: Optional[List[Link]] = None, + start_time: Optional[int] = None +) -> Iterator[None]: + """Tracing for receiving messages.""" + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links, start_time=start_time) as span: + add_span_attributes(span, TraceOperationTypes.RECEIVE, receiver, message_count=len(links)) + yield + else: + yield + + +@contextmanager +def settle_trace_context_manager( + receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], + operation: str, + links: Optional[List[Link]] = None +): + """Tracing for settling messages.""" + span_impl_type = settings.tracing_implementation() + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=f"ServiceBus.{operation}", kind=SpanKind.CLIENT, links=links) as span: + add_span_attributes(span, TraceOperationTypes.SETTLE, receiver) + yield + else: + yield + + +def trace_message( + message: Union[uamqp_Message, pyamqp_Message], + amqp_transport: Union[AmqpTransport, AmqpTransportAsync], + additional_attributes: Optional[Dict[str, Union[str, int]]] = None, + parent_span: Optional[AbstractSpan] = None +) -> Union["uamqp_Message", "pyamqp_Message"]: + """Adds tracing information to the message and returns the updated message. + + Will open and close a message span, and add tracing context to the app properties of the message. + """ + try: + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + if span_impl_type is not None: + current_span = parent_span or span_impl_type( + span_impl_type.get_current_span() + ) + + with current_span.span(name=SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER) as message_span: + headers = message_span.to_header() + + if "traceparent" in headers: + message = amqp_transport.update_message_app_properties( + message, + TRACE_DIAGNOSTIC_ID_PROPERTY, + headers["traceparent"] + ) + message = amqp_transport.update_message_app_properties( + message, + TRACE_PARENT_PROPERTY, + headers["traceparent"] + ) + + if "tracestate" in headers: + message = amqp_transport.update_message_app_properties( + message, + TRACE_STATE_PROPERTY, + headers["tracestate"] + ) + + message_span.add_attribute( + TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE + ) + message_span.add_attribute( + TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM + ) + + if additional_attributes: + for key, value in additional_attributes.items(): + if value is not None: + message_span.add_attribute(key, value) + + except Exception as exp: # pylint:disable=broad-except + _LOGGER.warning("trace_message had an exception %r", exp) + + return message + + +def get_receive_links(messages: Union[ReceiveMessageTypes, Iterable[ReceiveMessageTypes]]) -> List[Link]: + if not is_tracing_enabled(): + return [] + + trace_messages = ( + messages if isinstance(messages, Iterable) # pylint:disable=isinstance-second-argument-not-valid-type + else (messages,) + ) + + links = [] + try: + for message in trace_messages: + if message.application_properties: + headers = {} + + traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") + if hasattr(traceparent, "decode"): + traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) + if traceparent: + headers["traceparent"] = cast(str, traceparent) + + tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") + if hasattr(tracestate, "decode"): + tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) + if tracestate: + headers["tracestate"] = cast(str, tracestate) + + enqueued_time = message.raw_amqp_message.annotations.get(TRACE_ENQUEUED_TIME_PROPERTY) + attributes = {SPAN_ENQUEUED_TIME_PROPERTY: enqueued_time} if enqueued_time else None + + if headers: + links.append(Link(headers, attributes=attributes)) + except AttributeError: + pass + return links + + +def get_span_links_from_batch(batch: ServiceBusMessageBatch) -> List[Link]: + """Create span links from a batch of messages.""" + links = [] + for message in batch._messages: # pylint: disable=protected-access + link = get_span_links_from_message(message._message) # pylint: disable=protected-access + if link: + links.append(link) + return links + + +def get_span_links_from_message(message: Union[uamqp_Message, pyamqp_Message, ServiceBusMessage]) -> Optional[Link]: + """Create a span link from a message. + + This will extract the traceparent and tracestate from the message application properties and create span links + based on these values. + """ + headers = {} + try: + if message.application_properties: + traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") + if hasattr(traceparent, "decode"): + traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) + if traceparent: + headers["traceparent"] = cast(str, traceparent) + + tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") + if hasattr(tracestate, "decode"): + tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) + if tracestate: + headers["tracestate"] = cast(str, tracestate) + except AttributeError : + return None + return Link(headers) if headers else None + + +def add_span_attributes( + span: AbstractSpan, + operation_type: TraceOperationTypes, + handler: Union[BaseHandler, BaseHandlerAsync], + message_count: int = 0 +) -> None: + """Add attributes to span based on the operation type.""" + + span.add_attribute(TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_OPERATION_ATTRIBUTE, operation_type) + + if message_count > 1: + span.add_attribute(TraceAttributes.TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE, message_count) + + if operation_type == TraceOperationTypes.PUBLISH: + # Maintain legacy attributes for backwards compatibility. + span.add_attribute(TraceAttributes.LEGACY_TRACE_COMPONENT_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM) + span.add_attribute(TraceAttributes.LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access + span.add_attribute(TraceAttributes.LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE, handler.fully_qualified_namespace) + + elif operation_type == TraceOperationTypes.RECEIVE: + # Maintain legacy attributes for backwards compatibility. + span.add_attribute(TraceAttributes.LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE, handler.fully_qualified_namespace) + span.add_attribute(TraceAttributes.LEGACY_TRACE_COMPONENT_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM) + + span.add_attribute(TraceAttributes.TRACE_MESSAGING_SOURCE_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access + + elif operation_type == TraceOperationTypes.SETTLE: + span.add_attribute(TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE, handler.fully_qualified_namespace) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_SOURCE_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 5f3094c80f534..7753ea849e4ca 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -3,7 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- -from __future__ import annotations import sys import datetime import logging @@ -12,8 +11,6 @@ from typing import ( Any, Dict, - Iterable, - Iterator, List, Mapping, Optional, @@ -24,7 +21,6 @@ cast, Callable ) -from contextlib import contextmanager from datetime import timezone try: @@ -32,9 +28,6 @@ except ImportError: from urllib.parse import urlparse -from azure.core.settings import settings -from azure.core.tracing import SpanKind, Link - from .._version import VERSION from .constants import ( JWT_TOKEN_SCOPE, @@ -43,27 +36,6 @@ DEAD_LETTER_QUEUE_SUFFIX, TRANSFER_DEAD_LETTER_QUEUE_SUFFIX, USER_AGENT_PREFIX, - SPAN_NAME_SEND, - SPAN_NAME_MESSAGE, - DIAGNOSTIC_ID_PROPERTY, - TRACE_PARENT_PROPERTY, - TRACE_STATE_PROPERTY, - TRACE_NAMESPACE, - TRACE_NAMESPACE_ATTRIBUTE, - TRACE_COMPONENT_PROPERTY, - TRACE_COMPONENT, - TRACE_PROPERTY_ENCODING, - TRACE_ENQUEUED_TIME_PROPERTY, - TRACE_MESSAGING_SYSTEM_ATTRIBUTE, - TRACE_MESSAGING_SYSTEM, - TRACE_NET_PEER_NAME_ATTRIBUTE, - TRACE_MESSAGING_OPERATION_ATTRIBUTE, - TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE, - TRACE_MESSAGING_SOURCE_ATTRIBUTE, - SPAN_ENQUEUED_TIME_PROPERTY, - SPAN_NAME_RECEIVE, - TraceOperationTypes ) from ..amqp import AmqpAnnotatedMessage @@ -71,28 +43,16 @@ try: # pylint:disable=unused-import from uamqp import ( - Message as uamqp_Message, types as uamqp_types ) from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth except ImportError: pass - from .._pyamqp.message import Message as pyamqp_Message from .._pyamqp.authentication import JWTTokenAuth as pyamqp_JWTTokenAuth - from .message import ( - ServiceBusReceivedMessage, - ServiceBusMessage, - ServiceBusMessageBatch - ) - from azure.core.tracing import AbstractSpan + from .message import ServiceBusReceivedMessage, ServiceBusMessage from azure.core.credentials import AzureSasCredential - from .._base_handler import BaseHandler - from ..aio._base_handler_async import BaseHandler as BaseHandlerAsync - from .._servicebus_receiver import ServiceBusReceiver - from ..aio._servicebus_receiver_async import ServiceBusReceiver as ServiceBusReceiverAsync from .._servicebus_session import BaseSession from .._transport._base import AmqpTransport - from ..aio._transport._base_async import AmqpTransportAsync MessagesType = Union[ Mapping[str, Any], @@ -291,209 +251,6 @@ def strip_protocol_from_uri(uri: str) -> str: return uri -def is_tracing_enabled(): - span_impl_type = settings.tracing_implementation() - return span_impl_type is not None - - -@contextmanager -def send_trace_context_manager(span_name=SPAN_NAME_SEND, links=None): - span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() - - if span_impl_type is not None: - with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links) as child: - yield child - else: - yield None - - -@contextmanager -def receive_trace_context_manager( - receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], - span_name: str = SPAN_NAME_RECEIVE, - links: Optional[List[Link]] = None, - start_time: Optional[int] = None -) -> Iterator[None]: - """Tracing""" - span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() - if span_impl_type is None: - yield - else: - receive_span = span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links, start_time=start_time) - add_span_attributes( # pylint: disable=protected-access - receiver, - receive_span, - TraceOperationTypes.RECEIVE, - message_count=len(links) if links else 0 - ) - - with receive_span: - yield - - -@contextmanager -def settle_trace_context_manager( - receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], - operation: str, - links: Optional[List[Link]] = None -): - span_impl_type = settings.tracing_implementation() - if span_impl_type is None: - yield - else: - settle_span = span_impl_type(name=f"ServiceBus.{operation}", kind=SpanKind.CLIENT, links=links) - add_span_attributes(receiver, settle_span, TraceOperationTypes.SETTLE) # pylint: disable=protected-access - - with settle_span: - yield - - -def trace_message( - message: Union[uamqp_Message, pyamqp_Message], - amqp_transport: Union[AmqpTransport, AmqpTransportAsync], - additional_attributes: Optional[Dict[str, Union[str, int]]] = None, - parent_span: Optional[AbstractSpan] = None -) -> Union["uamqp_Message", "pyamqp_Message"]: - """Add tracing information to this message. - - Will open and close a "ServiceBus.message" span, and add tracing context to the app properties of the message. - """ - try: - span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() - if span_impl_type is not None: - current_span = parent_span or span_impl_type( - span_impl_type.get_current_span() - ) - - with current_span.span(name=SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER) as message_span: - headers = message_span.to_header() - - if "traceparent" in headers: - message = amqp_transport.update_message_app_properties( - message, - DIAGNOSTIC_ID_PROPERTY, - headers["traceparent"] - ) - message = amqp_transport.update_message_app_properties( - message, - TRACE_PARENT_PROPERTY, - headers["traceparent"] - ) - - if "tracestate" in headers: - message = amqp_transport.update_message_app_properties( - message, - TRACE_STATE_PROPERTY, - headers["tracestate"] - ) - - message_span.add_attribute(TRACE_NAMESPACE_ATTRIBUTE, TRACE_NAMESPACE) - message_span.add_attribute(TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TRACE_MESSAGING_SYSTEM) - - if additional_attributes: - for key, value in additional_attributes.items(): - if value is not None: - message_span.add_attribute(key, value) - - except Exception as exp: # pylint:disable=broad-except - _log.warning("trace_message had an exception %r", exp) - - return message - - -def get_receive_links(messages): - if not is_tracing_enabled(): - return [] - - trace_messages = ( - messages if isinstance(messages, Iterable) # pylint:disable=isinstance-second-argument-not-valid-type - else (messages,) - ) - - links = [] - try: - for message in trace_messages: - if message.application_properties: - headers = {} - - traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") - if hasattr(traceparent, "decode"): - traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) - if traceparent: - headers["traceparent"] = cast(str, traceparent) - - tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") - if hasattr(tracestate, "decode"): - tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) - if tracestate: - headers["tracestate"] = cast(str, tracestate) - - enqueued_time = message.raw_amqp_message.annotations.get(TRACE_ENQUEUED_TIME_PROPERTY) - attributes = {SPAN_ENQUEUED_TIME_PROPERTY: enqueued_time} if enqueued_time else None - - if headers: - links.append(Link(headers, attributes=attributes)) - except AttributeError: - pass - return links - - -def get_span_links_from_batch(batch: ServiceBusMessageBatch) -> List[Link]: - """Create span links from a batch of messages.""" - links = [] - for message in batch._messages: # pylint: disable=protected-access - links.extend(get_span_links_from_message(message._message)) # pylint: disable=protected-access - return links - - -def get_span_links_from_message(message: Union[uamqp_Message, pyamqp_Message, ServiceBusMessage]) -> List[Link]: - """Create a span link from a message. - - This will extract the traceparent and tracestate from the message application properties and create span links - based on these values. - """ - headers = {} - try: - if message.application_properties: - traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") - if hasattr(traceparent, "decode"): - traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) - if traceparent: - headers["traceparent"] = cast(str, traceparent) - - tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") - if hasattr(tracestate, "decode"): - tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) - if tracestate: - headers["tracestate"] = cast(str, tracestate) - except AttributeError : - return [] - return [Link(headers)] if headers else [] - - -def add_span_attributes( - handler: Union[BaseHandler, BaseHandlerAsync], - span: AbstractSpan, - operation_type: TraceOperationTypes, - message_count: int = 0 -) -> None: - """Add attributes to span based on the operation type.""" - - span.add_attribute(TRACE_NAMESPACE_ATTRIBUTE, TRACE_NAMESPACE) - span.add_attribute(TRACE_NET_PEER_NAME_ATTRIBUTE, handler.fully_qualified_namespace) - span.add_attribute(TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TRACE_MESSAGING_SYSTEM) - span.add_attribute(TRACE_MESSAGING_OPERATION_ATTRIBUTE, operation_type) - - if message_count > 1: - span.add_attribute(TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE, message_count) - - if operation_type == TraceOperationTypes.PUBLISH: - span.add_attribute(TRACE_COMPONENT_PROPERTY, TRACE_COMPONENT) - span.add_attribute(TRACE_MESSAGING_DESTINATION_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access - else: - span.add_attribute(TRACE_MESSAGING_SOURCE_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access - - def parse_sas_credential(credential: "AzureSasCredential") -> Tuple: sas = credential.signature parsed_sas = sas.split('&') diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 9d05ebaded037..fd49b577659d7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -16,12 +16,14 @@ from .exceptions import ServiceBusError from ._base_handler import BaseHandler from ._common.message import ServiceBusReceivedMessage -from ._common.utils import ( - create_authentication, +from ._common.utils import create_authentication +from ._common.tracing import ( get_receive_links, receive_trace_context_manager, settle_trace_context_manager, - get_span_links_from_message + get_span_links_from_message, + SPAN_NAME_RECEIVE_DEFERRED, + SPAN_NAME_PEEK, ) from ._common.constants import ( CONSUMER_IDENTIFIER, @@ -36,8 +38,6 @@ MGMT_REQUEST_RECEIVER_SETTLE_MODE, MGMT_REQUEST_FROM_SEQUENCE_NUMBER, MGMT_REQUEST_MAX_MESSAGE_COUNT, - SPAN_NAME_RECEIVE_DEFERRED, - SPAN_NAME_PEEK, MESSAGE_COMPLETE, MESSAGE_ABANDON, MESSAGE_DEFER, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 0c35a633a1257..eb024115d7723 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -16,15 +16,15 @@ ServiceBusMessageBatch, ) from .amqp import AmqpAnnotatedMessage -from ._common.utils import ( - create_authentication, - transform_outbound_messages, +from ._common.utils import create_authentication, transform_outbound_messages +from ._common.tracing import ( send_trace_context_manager, trace_message, is_tracing_enabled, get_span_links_from_batch, get_span_links_from_message, - add_span_attributes, + SPAN_NAME_SCHEDULE, + TraceAttributes, ) from ._common.constants import ( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, @@ -35,11 +35,7 @@ MGMT_REQUEST_MESSAGES, MGMT_REQUEST_MESSAGE_ID, MGMT_REQUEST_PARTITION_KEY, - SPAN_NAME_SCHEDULE, MAX_MESSAGE_LENGTH_BYTES, - TRACE_NET_PEER_NAME_ATTRIBUTE, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE, - TraceOperationTypes, ) if TYPE_CHECKING: @@ -318,8 +314,8 @@ def schedule_messages( raise ValueError("The timeout must be greater than 0.") tracing_attributes = { - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, } if isinstance(obj_messages, ServiceBusMessage): request_body, trace_links = self._build_schedule_request( @@ -338,9 +334,7 @@ def schedule_messages( *obj_messages ) - with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE, links=trace_links) as send_span: - if send_span: - add_span_attributes(self, send_span, TraceOperationTypes.PUBLISH, message_count=len(trace_links)) + with send_trace_context_manager(self, span_name=SPAN_NAME_SCHEDULE, links=trace_links): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -468,8 +462,8 @@ def send_messages( obj_message._message, amqp_transport=self._amqp_transport, additional_attributes={ - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, } ) @@ -478,11 +472,11 @@ def send_messages( if isinstance(obj_message, ServiceBusMessageBatch): trace_links = get_span_links_from_batch(obj_message) else: - trace_links = get_span_links_from_message(obj_message._message) # pylint: disable=protected-access + link = get_span_links_from_message(obj_message._message) # pylint: disable=protected-access + if link: + trace_links.append(link) - with send_trace_context_manager(links=trace_links) as send_span: - if send_span: - add_span_attributes(self, send_span, TraceOperationTypes.PUBLISH, message_count=len(trace_links)) + with send_trace_context_manager(self, links=trace_links): self._do_retryable_operation( self._send, message=obj_message, @@ -522,17 +516,15 @@ def create_message_batch( f"acceptable max batch size is: {self._max_message_size_on_link} bytes." ) - batch = ServiceBusMessageBatch( - max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), amqp_transport=self._amqp_transport + return ServiceBusMessageBatch( + max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), + amqp_transport=self._amqp_transport, + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) - # Embed tracing data into the batch so they can be added to message spans. - batch._tracing_attributes = { # pylint: disable=protected-access - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, - } - return batch - @property def client_identifier(self) -> str: """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py index b7be99aff9571..3d2b4e443f85f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py @@ -34,12 +34,8 @@ from .._pyamqp._connection import Connection, _CLOSING_STATES from ._base import AmqpTransport -from .._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager -) +from .._common.utils import utc_from_timestamp, utc_now +from .._common.tracing import get_receive_links, receive_trace_context_manager from .._common.constants import ( PYAMQP_LIBRARY, DATETIMEOFFSET_EPOCH, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py index b0c5a808488f2..04bdb9f9e52ce 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py @@ -54,12 +54,8 @@ ) from ._base import AmqpTransport from ..amqp._constants import AmqpMessageBodyType - from .._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager, - ) + from .._common.utils import utc_from_timestamp, utc_now + from .._common.tracing import get_receive_links, receive_trace_context_manager from .._common.constants import ( UAMQP_LIBRARY, DATETIMEOFFSET_EPOCH, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 6e5831478c74e..22630b48700b3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -41,16 +41,16 @@ MGMT_REQUEST_DEAD_LETTER_REASON, MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION, MGMT_RESPONSE_MESSAGE_EXPIRATION, - SPAN_NAME_RECEIVE_DEFERRED, - SPAN_NAME_PEEK, ) from .._common import mgmt_handlers -from .._common.utils import ( +from .._common.utils import utc_from_timestamp +from .._common.tracing import ( receive_trace_context_manager, - utc_from_timestamp, - get_receive_links, settle_trace_context_manager, - get_span_links_from_message + get_receive_links, + get_span_links_from_message, + SPAN_NAME_RECEIVE_DEFERRED, + SPAN_NAME_PEEK, ) from ._async_utils import create_authentication diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 89ad5e3d7a104..beac7e7e1f37a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -21,21 +21,18 @@ REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, MGMT_REQUEST_SEQUENCE_NUMBERS, - SPAN_NAME_SCHEDULE, MAX_MESSAGE_LENGTH_BYTES, - TRACE_NET_PEER_NAME_ATTRIBUTE, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE, - TraceOperationTypes, ) from .._common import mgmt_handlers -from .._common.utils import ( - transform_outbound_messages, +from .._common.utils import transform_outbound_messages +from .._common.tracing import ( send_trace_context_manager, trace_message, is_tracing_enabled, get_span_links_from_batch, get_span_links_from_message, - add_span_attributes, + SPAN_NAME_SCHEDULE, + TraceAttributes, ) from ._async_utils import create_authentication @@ -268,8 +265,8 @@ async def schedule_messages( raise ValueError("The timeout must be greater than 0.") tracing_attributes = { - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, } if isinstance(obj_messages, ServiceBusMessage): request_body, trace_links = self._build_schedule_request( @@ -287,9 +284,7 @@ async def schedule_messages( tracing_attributes, *obj_messages ) - with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE, links=trace_links) as send_span: - if send_span: - add_span_attributes(self, send_span, TraceOperationTypes.PUBLISH, message_count=len(trace_links)) + with send_trace_context_manager(self, span_name=SPAN_NAME_SCHEDULE, links=trace_links): return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -417,8 +412,8 @@ async def send_messages( obj_message._message, amqp_transport=self._amqp_transport, additional_attributes={ - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, } ) @@ -427,11 +422,11 @@ async def send_messages( if isinstance(obj_message, ServiceBusMessageBatch): trace_links = get_span_links_from_batch(obj_message) else: - trace_links = get_span_links_from_message(obj_message._message) # pylint: disable=protected-access + link = get_span_links_from_message(obj_message._message) # pylint: disable=protected-access + if link: + trace_links.append(link) - with send_trace_context_manager(links=trace_links) as send_span: - if send_span: - add_span_attributes(self, send_span, TraceOperationTypes.PUBLISH, message_count=len(trace_links)) + with send_trace_context_manager(self, links=trace_links): await self._do_retryable_operation( self._send, message=obj_message, @@ -470,17 +465,15 @@ async def create_message_batch( "acceptable max batch size is: {self._max_message_size_on_link} bytes." ) - batch = ServiceBusMessageBatch( - max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), amqp_transport=self._amqp_transport + return ServiceBusMessageBatch( + max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), + amqp_transport=self._amqp_transport, + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) - # Embed tracing data into the batch so they can be added to message spans. - batch._tracing_attributes = { # pylint: disable=protected-access - TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, - TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, - } - return batch - @property def client_identifier(self) -> str: """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py index f38bb2b9af3ed..eec305cb1667f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py @@ -20,12 +20,8 @@ ) from ._base_async import AmqpTransportAsync -from ..._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager -) +from ..._common.utils import utc_from_timestamp, utc_now +from ..._common.tracing import get_receive_links, receive_trace_context_manager from ..._common.constants import ( DATETIMEOFFSET_EPOCH, SESSION_LOCKED_UNTIL, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py index 819eeef32863c..b11babed9f857 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py @@ -18,10 +18,7 @@ from ..._transport._uamqp_transport import UamqpTransport from ._base_async import AmqpTransportAsync from .._async_utils import get_running_loop - from ..._common.utils import ( - get_receive_links, - receive_trace_context_manager - ) + from ..._common.tracing import get_receive_links, receive_trace_context_manager from ..._common.constants import ServiceBusReceiveMode if TYPE_CHECKING: