Skip to content

Commit

Permalink
[ServiceBus] Update tracing
Browse files Browse the repository at this point in the history
- "Send" span now contains links to message spans.
- Receive span is now kind CLIENT instead of CONSUMER.
- Added span creation logic for settlement methods.
- Attribute names were updated to align with distributed
  tracing conventions.
- Some span named renamed to align with other SDKs.
- Receive spans now have more accurate start times.

Signed-off-by: Paul Van Eck <paulvaneck@microsoft.com>
  • Loading branch information
pvaneck committed Apr 21, 2023
1 parent fbe7a0e commit 547f912
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 179 deletions.
11 changes: 0 additions & 11 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
TOKEN_TYPE_SASTOKEN,
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT,
ASSOCIATEDLINKPROPERTYNAME,
TRACE_NAMESPACE_PROPERTY,
TRACE_COMPONENT_PROPERTY,
TRACE_COMPONENT,
TRACE_PEER_ADDRESS_PROPERTY,
TRACE_BUS_DESTINATION_PROPERTY,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -543,12 +538,6 @@ def _mgmt_request_response_with_retry(
**kwargs
)

def _add_span_request_attributes(self, span):
span.add_attribute(TRACE_COMPONENT_PROPERTY, TRACE_COMPONENT)
span.add_attribute(TRACE_NAMESPACE_PROPERTY, TRACE_NAMESPACE_PROPERTY)
span.add_attribute(TRACE_BUS_DESTINATION_PROPERTY, self._entity_path)
span.add_attribute(TRACE_PEER_ADDRESS_PROPERTY, self.fully_qualified_namespace)

def _open(self): # pylint: disable=no-self-use
raise ValueError("Subclass should override the method.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,38 @@
TRACE_COMPONENT_PROPERTY = "component"
TRACE_COMPONENT = "servicebus"

TRACE_NAMESPACE_PROPERTY = "az.namespace"
TRACE_NAMESPACE = "ServiceBus"
TRACE_NAMESPACE_ATTRIBUTE = "az.namespace"
TRACE_NAMESPACE = "Microsoft.ServiceBus"

SPAN_NAME_RECEIVE = TRACE_NAMESPACE + ".receive"
SPAN_NAME_RECEIVE_DEFERRED = TRACE_NAMESPACE + ".receive_deferred"
SPAN_NAME_PEEK = TRACE_NAMESPACE + ".peek"
SPAN_NAME_SEND = TRACE_NAMESPACE + ".send"
SPAN_NAME_SCHEDULE = TRACE_NAMESPACE + ".schedule"
SPAN_NAME_MESSAGE = TRACE_NAMESPACE + ".message"

TRACE_BUS_DESTINATION_PROPERTY = "message_bus.destination"
TRACE_PEER_ADDRESS_PROPERTY = "peer.address"
SPAN_NAME_RECEIVE = "ServiceBus.receive"
SPAN_NAME_RECEIVE_DEFERRED = "ServiceBus.receive_deferred"
SPAN_NAME_PEEK = "ServiceBus.peek"
SPAN_NAME_SEND = "ServiceBus.send"
SPAN_NAME_SCHEDULE = "ServiceBus.schedule"
SPAN_NAME_MESSAGE = "ServiceBus.message"

SPAN_ENQUEUED_TIME_PROPERTY = "enqueuedTime"

TRACE_ENQUEUED_TIME_PROPERTY = b"x-opt-enqueued-time"
TRACE_PARENT_PROPERTY = b"Diagnostic-Id"
TRACE_PARENT_PROPERTY = b"traceparent"
TRACE_STATE_PROPERTY = b"tracestate"
TRACE_PROPERTY_ENCODING = "ascii"

TRACE_MESSAGING_SYSTEM_ATTRIBUTE = "messaging.system"
TRACE_MESSAGING_SYSTEM = "servicebus"

TRACE_NET_PEER_NAME_ATTRIBUTE = "net.peer.name"
TRACE_MESSAGING_DESTINATION_ATTRIBUTE = "messaging.destination.name"
TRACE_MESSAGING_SOURCE_ATTRIBUTE = "messaging.source.name"
TRACE_MESSAGING_OPERATION_ATTRIBUTE = "messaging.operation"
TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE = "messaging.batch.message_count"

DIAGNOSTIC_ID_PROPERTY = b"Diagnostic-Id"

class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta):
PUBLISH = "publish"
RECEIVE = "receive"
SETTLE = "settle"

MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP
MESSAGE_PROPERTY_MAX_LENGTH = 128
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
MAX_ABSOLUTE_EXPIRY_TIME,
MAX_DURATION_VALUE,
MAX_MESSAGE_LENGTH_BYTES,
MESSAGE_STATE_NAME
MESSAGE_STATE_NAME,
)
from ..amqp import (
AmqpAnnotatedMessage,
Expand Down Expand Up @@ -667,6 +667,8 @@ def __init__(
self._messages: List[ServiceBusMessage] = []
self._uamqp_message: Optional[LegacyBatchMessage] = None

self._tracing_attributes: Dict[str, Union[str, int]] = {}

def __repr__(self) -> str:
batch_repr = "max_size_in_bytes={}, message_count={}".format(
self.max_size_in_bytes, self._count
Expand All @@ -676,15 +678,11 @@ def __repr__(self) -> str:
def __len__(self) -> int:
return self._count

def _from_list(self, messages: Iterable[ServiceBusMessage], parent_span: Optional["AbstractSpan"] = None) -> None:
def _from_list(self, messages: Iterable[ServiceBusMessage]) -> None:
for message in messages:
self._add(message, parent_span)
self._add(message)

def _add(
self,
add_message: Union[ServiceBusMessage, Mapping[str, Any], AmqpAnnotatedMessage],
parent_span: Optional["AbstractSpan"] = None
) -> None:
def _add(self, add_message: Union[ServiceBusMessage, Mapping[str, Any], AmqpAnnotatedMessage]) -> None:
"""Actual add implementation. The shim exists to hide the internal parameters such as parent_span."""
outgoing_sb_message = transform_outbound_messages(
add_message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message
Expand All @@ -694,8 +692,8 @@ def _add(
outgoing_sb_message._message = trace_message(
outgoing_sb_message._message,
amqp_transport=self._amqp_transport,
parent_span=parent_span
) # parent_span is e.g. if built as part of a send operation.
additional_attributes=self._tracing_attributes
)
message_size = self._amqp_transport.get_message_encoded_size(
outgoing_sb_message._message # pylint: disable=protected-access
)
Expand Down
Loading

0 comments on commit 547f912

Please sign in to comment.