Skip to content

Commit

Permalink
feat(celery): Set "messaging.system" on span
Browse files Browse the repository at this point in the history
Set the "messaging.system" data on the "queue.process" span in the Celery integration. The messaging.system span data attribute should be set to the Celery broker being used, e.g. "amqp" for RabbitMQ, "redis" for Redis, and "sqs" for Amazon SQS. Also, add tests for this feature.

ref #2951
  • Loading branch information
szokeasaurusrex committed May 15, 2024
1 parent 68332d8 commit a02eb9c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
5 changes: 5 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ class SPANDATA:
Number of retries/attempts to process a message.
"""

MESSAGING_SYSTEM = "messaging.system"
"""
The messaging system's name, e.g. `kafka`, `aws_sqs`
"""

SERVER_ADDRESS = "server.address"
"""
Name of the database host.
Expand Down
5 changes: 5 additions & 0 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ def _inner(*args, **kwargs):
span.set_data(
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
)
with capture_internal_exceptions():
span.set_data(
SPANDATA.MESSAGING_SYSTEM,
task.app.connection().transport.driver_type,
)

return f(*args, **kwargs)
except Exception:
Expand Down
20 changes: 19 additions & 1 deletion tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def init_celery(sentry_init, request):
def inner(propagate_traces=True, backend="always_eager", **kwargs):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
**kwargs
**kwargs,
)
celery = Celery(__name__)

Expand Down Expand Up @@ -704,3 +704,21 @@ def task(): ...
(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.message.retry.count"] == 3


@pytest.mark.parametrize("system", ("redis", "amqp"))
def test_messaging_system(system, init_celery, capture_events):
celery = init_celery(enable_tracing=True)
events = capture_events()

# Does not need to be a real URL, since we use always eager
celery.conf.broker_url = f"{system}://example.com" # noqa: E231

@celery.task()
def task(): ...

task.apply_async()

(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.system"] == system

0 comments on commit a02eb9c

Please sign in to comment.