Skip to content

Commit

Permalink
[Otel] Improve live test coverage
Browse files Browse the repository at this point in the history
Here, ServiceBus tests are added to verify span creation, links, and
attributes.

Signed-off-by: Paul Van Eck <paulvaneck@microsoft.com>
  • Loading branch information
pvaneck committed Feb 15, 2024
1 parent 02d8be6 commit 8875938
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 8 deletions.
5 changes: 3 additions & 2 deletions eng/tox/install_depend_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

MAXIMUM_VERSION_GENERIC_OVERRIDES = {}

# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by
# SPECIFIC OVERRIDES provide additional filtering of upper and lower bound by
# binding an override to the specific package being processed. As an example, when
# processing the latest or minimum deps for "azure-eventhub", the minimum version of "azure-core"
# will be overridden to 1.25.0.
Expand All @@ -59,6 +59,7 @@
"azure-eventhub-checkpointstoreblob": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"},
"azure-eventhub-checkpointstoretable": {"azure-core": "1.25.0", "azure-eventhub": "5.11.0"},
"azure-identity": {"msal": "1.23.0"},
"azure-core-tracing-opentelemetry": {"azure-core": "1.28.0"},
}

MAXIMUM_VERSION_SPECIFIC_OVERRIDES = {}
Expand Down Expand Up @@ -212,7 +213,7 @@ def process_bounded_versions(originating_pkg_name: str, pkg_name: str, versions:
v for v in versions if parse_version(v) <= parse_version(restrictions[pkg_name])
]

# upper bound package-specific
# upper bound package-specific
if (
originating_pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES
and pkg_name in MAXIMUM_VERSION_SPECIFIC_OVERRIDES[originating_pkg_name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ opentelemetry-sdk<2.0.0,>=1.12.0
opentelemetry-instrumentation-requests>=0.32b0
requests
azure-storage-blob
-e ../../servicebus/azure-servicebus
69 changes: 66 additions & 3 deletions sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
@minLength(6)
@maxLength(21)
@description('The base resource name.')
param baseName string = resourceGroup().name

Expand All @@ -7,8 +9,10 @@ param location string = resourceGroup().location
@description('The client OID to grant access to test resources.')
param testApplicationOid string

var sbVersion = '2017-04-01'

resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
name: '${baseName}storage'
name: '${baseName}sa'
location: location
kind: 'StorageV2'
sku: {
Expand All @@ -19,10 +23,69 @@ resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
}
}

resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2017-04-01' = {
name: '${baseName}sbnamespace'
location: location
sku: {
name: 'Standard'
}
properties: {}
}

resource serviceBusQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = {
parent: serviceBusNamespace
name: '${baseName}sbqueue'
properties: {
lockDuration: 'PT5M'
maxSizeInMegabytes: 4096
requiresDuplicateDetection: false
requiresSession: false
defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S'
deadLetteringOnMessageExpiration: false
duplicateDetectionHistoryTimeWindow: 'PT10M'
maxDeliveryCount: 10
autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S'
enablePartitioning: false
enableExpress: false
}
}

resource serviceBusTopic 'Microsoft.ServiceBus/namespaces/topics@2017-04-01' = {
parent: serviceBusNamespace
name: '${baseName}sbtopic'
properties: {
autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S'
defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S'
duplicateDetectionHistoryTimeWindow: 'PT10M'
enableBatchedOperations: true
enableExpress: false
enablePartitioning: false
maxSizeInMegabytes: 4096
requiresDuplicateDetection: false
status: 'Active'
supportOrdering: true
}
}

resource serviceBusSubscription 'Microsoft.ServiceBus/namespaces/topics/subscriptions@2017-04-01' = {
parent: serviceBusTopic
name: '${baseName}sbtopic'
properties: {}
}


var name = storageAccount.name
var key = storageAccount.listKeys().keys[0].value
var connectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}'
var storageConnectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}'
var serviceBusConnectionString = listkeys(authRuleResourceId, sbVersion).primaryConnectionString


var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespace.name, 'RootManageSharedAccessKey')

output AZURE_STORAGE_ACCOUNT_NAME string = name
output AZURE_STORAGE_ACCOUNT_KEY string = key
output AZURE_STORAGE_CONNECTION_STRING string = connectionString
output AZURE_STORAGE_CONNECTION_STRING string = storageConnectionString
output AZURE_SERVICEBUS_CONNECTION_STRING string = serviceBusConnectionString
output AZURE_SERVICEBUS_QUEUE_NAME string = serviceBusQueue.name
output AZURE_SERVICEBUS_TOPIC_NAME string = serviceBusTopic.name
output AZURE_SERVICEBUS_SUBSCRIPTION_NAME string = serviceBusSubscription.name
10 changes: 7 additions & 3 deletions sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def exporter():
@pytest.fixture(scope="session")
def config():
return {
"storage_account_name": os.environ["AZURE_STORAGE_ACCOUNT_NAME"],
"storage_account_key": os.environ["AZURE_STORAGE_ACCOUNT_KEY"],
"storage_connection_string": os.environ["AZURE_STORAGE_CONNECTION_STRING"],
"storage_account_name": os.environ.get("AZURE_STORAGE_ACCOUNT_NAME"),
"storage_account_key": os.environ.get("AZURE_STORAGE_ACCOUNT_KEY"),
"storage_connection_string": os.environ.get("AZURE_STORAGE_CONNECTION_STRING"),
"servicebus_connection_string": os.environ.get("AZURE_SERVICEBUS_CONNECTION_STRING"),
"servicebus_queue_name": os.environ.get("AZURE_SERVICEBUS_QUEUE_NAME"),
"servicebus_topic_name": os.environ.get("AZURE_SERVICEBUS_TOPIC_NAME"),
"servicebus_subscription_name": os.environ.get("AZURE_SERVICEBUS_SUBSCRIPTION_NAME"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import pytest

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from opentelemetry.trace import SpanKind


class TestServiceBusTracing:
def _verify_message(self, *, span, dest, server_address):
assert span.name == "ServiceBus.message"
assert span.kind == SpanKind.PRODUCER
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["server.address"] == server_address

def _verify_send(self, *, span, dest, server_address, message_count):
assert span.name == "ServiceBus.send"
assert span.kind == SpanKind.CLIENT
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["messaging.operation"] == "publish"
assert span.attributes["server.address"] == server_address
if message_count > 1:
assert span.attributes["messaging.batch.message_count"] == message_count

def _verify_receive(self, *, span, dest, server_address, message_count):
assert span.name == "ServiceBus.receive"
assert span.kind == SpanKind.CLIENT
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.destination.name"] == dest
assert span.attributes["messaging.operation"] == "receive"
assert span.attributes["server.address"] == server_address
for link in span.links:
assert "enqueuedTime" in link.attributes
if message_count > 1:
assert span.attributes["messaging.batch.message_count"] == message_count

def _verify_complete(self, *, span, dest, server_address):
assert span.name == "ServiceBus.complete"
assert span.kind == SpanKind.CLIENT
assert span.attributes["az.namespace"] == "Microsoft.ServiceBus"
assert span.attributes["messaging.system"] == "servicebus"
assert span.attributes["messaging.operation"] == "settle"
assert span.attributes["server.address"] == server_address
assert span.attributes["messaging.destination.name"] == dest

@pytest.mark.live_test_only
def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
connection_string = config["servicebus_connection_string"]
queue_name = config["servicebus_queue_name"]
client = ServiceBusClient.from_connection_string(connection_string)

with tracer.start_as_current_span(name="root"):
with client.get_queue_sender(queue_name) as sender:

# Sending a single message
sender.send_messages(ServiceBusMessage("Test foo message"))

# Sending a batch of messages
message_batch = sender.create_message_batch()
message_batch.add_message(ServiceBusMessage("First batch foo message"))
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
sender.send_messages(message_batch)

send_spans = exporter.get_finished_spans()
server_address = sender.fully_qualified_namespace

# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
assert len(send_spans) == 5

# Verify the spans from the first send.
self._verify_message(span=send_spans[0], dest=queue_name, server_address=server_address)
self._verify_send(span=send_spans[1], dest=queue_name, server_address=server_address, message_count=1)

# Verify span links from single send.
link = send_spans[1].links[0]
assert link.context.span_id == send_spans[0].context.span_id
assert link.context.trace_id == send_spans[0].context.trace_id

# Verify the spans from the second send.
self._verify_message(span=send_spans[2], dest=queue_name, server_address=server_address)
self._verify_message(span=send_spans[3], dest=queue_name, server_address=server_address)
self._verify_send(span=send_spans[4], dest=queue_name, server_address=server_address, message_count=2)

# Verify span links from batch send.
assert len(send_spans[4].links) == 2
link = send_spans[4].links[0]
assert link.context.span_id == send_spans[2].context.span_id
assert link.context.trace_id == send_spans[2].context.trace_id

link = send_spans[4].links[1]
assert link.context.span_id == send_spans[3].context.span_id
assert link.context.trace_id == send_spans[3].context.trace_id

exporter.clear()

# Receive all the sent spans.
receiver = client.get_queue_receiver(queue_name=queue_name)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10)
for msg in received_msgs:
assert "foo" in str(msg)
receiver.complete_message(msg)

receive_spans = exporter.get_finished_spans()

# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
assert len(receive_spans) == 4
self._verify_receive(span=receive_spans[0], dest=queue_name, server_address=server_address, message_count=3)

# Verify span links from receive.
assert len(receive_spans[0].links) == 3
assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id
assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id
assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id

# Verify settlement spans.
self._verify_complete(span=receive_spans[1], dest=queue_name, server_address=server_address)
self._verify_complete(span=receive_spans[2], dest=queue_name, server_address=server_address)
self._verify_complete(span=receive_spans[3], dest=queue_name, server_address=server_address)

@pytest.mark.live_test_only
def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
connection_string = config["servicebus_connection_string"]
topic_name = config["servicebus_topic_name"]
subscription_name = config["servicebus_subscription_name"]
client = ServiceBusClient.from_connection_string(connection_string)

with tracer.start_as_current_span(name="root"):
with client.get_topic_sender(topic_name) as sender:

# Sending a single message
sender.send_messages(ServiceBusMessage("Test foo message"))

# Sending a batch of messages
message_batch = sender.create_message_batch()
message_batch.add_message(ServiceBusMessage("First batch foo message"))
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
sender.send_messages(message_batch)

send_spans = exporter.get_finished_spans()
server_address = sender.fully_qualified_namespace

# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
assert len(send_spans) == 5

# Verify the spans from the first send.
self._verify_message(span=send_spans[0], dest=topic_name, server_address=server_address)
self._verify_send(span=send_spans[1], dest=topic_name, server_address=server_address, message_count=1)

# Verify span links from single send.
link = send_spans[1].links[0]
assert link.context.span_id == send_spans[0].context.span_id
assert link.context.trace_id == send_spans[0].context.trace_id

# Verify the spans from the second send.
self._verify_message(span=send_spans[2], dest=topic_name, server_address=server_address)
self._verify_message(span=send_spans[3], dest=topic_name, server_address=server_address)
self._verify_send(span=send_spans[4], dest=topic_name, server_address=server_address, message_count=2)

# Verify span links from batch send.
assert len(send_spans[4].links) == 2
link = send_spans[4].links[0]
assert link.context.span_id == send_spans[2].context.span_id
assert link.context.trace_id == send_spans[2].context.trace_id

link = send_spans[4].links[1]
assert link.context.span_id == send_spans[3].context.span_id
assert link.context.trace_id == send_spans[3].context.trace_id

exporter.clear()

# Receive all the sent spans.
receiver = client.get_subscription_receiver(topic_name, subscription_name)
with receiver:
received_msgs = receiver.receive_messages(max_message_count=3, max_wait_time=10)
for msg in received_msgs:
assert "foo" in str(msg)
receiver.complete_message(msg)

receive_spans = exporter.get_finished_spans()

# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
assert len(receive_spans) == 4
self._verify_receive(span=receive_spans[0], dest=topic_name, server_address=server_address, message_count=3)

assert len(receive_spans[0].links) == 3
assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id
assert receive_spans[0].links[1].context.span_id == send_spans[2].context.span_id
assert receive_spans[0].links[2].context.span_id == send_spans[3].context.span_id

# Verify settlement spans.
self._verify_complete(span=receive_spans[1], dest=topic_name, server_address=server_address)
self._verify_complete(span=receive_spans[2], dest=topic_name, server_address=server_address)
self._verify_complete(span=receive_spans[3], dest=topic_name, server_address=server_address)
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ def test_blob_service_client_tracing(self, config, exporter, tracer):

http_span: ReadableSpan = spans[0]
assert http_span.kind == SpanKind.CLIENT
assert http_span.parent
assert http_span.parent.span_id == spans[1].context.span_id

assert http_span.attributes
assert http_span.attributes["http.request.method"] == "GET"
assert http_span.attributes["url.full"]
assert http_span.attributes["server.address"]
assert http_span.attributes["http.response.status_code"] == 200
assert http_span.attributes["az.client_request_id"]
assert http_span.attributes["az.service_request_id"]

method_span: ReadableSpan = spans[1]
assert method_span.kind == SpanKind.INTERNAL
assert method_span.parent
assert method_span.parent.span_id == spans[2].context.span_id

0 comments on commit 8875938

Please sign in to comment.