From 72ac6302b7000422d6a3e7e63e2e28453fbbf8f5 Mon Sep 17 00:00:00 2001 From: Daniel Szoke Date: Tue, 16 Apr 2024 17:34:00 +0200 Subject: [PATCH] feat(celery): Send queue name to Sentry Send the queue name to Sentry for Celery tasks using the default exchange. The queue name is sent as span data with the key `messaging.destination.name` within a new span op named "queue.process". Also, add tests for the new behavior. Ref GH-2961 --- sentry_sdk/consts.py | 7 +++ sentry_sdk/integrations/celery/__init__.py | 26 ++++++++-- tests/integrations/celery/test_celery.py | 55 +++++++++++++++++++++- 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 7217f61472..a5def07c71 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -264,6 +264,12 @@ class SPANDATA: Example: 418 """ + MESSAGING_DESTINATION_NAME = "messaging.destination.name" + """ + The destination name where the message is being consumed from, + e.g. the queue name or topic. + """ + SERVER_ADDRESS = "server.address" """ Name of the database host. @@ -366,6 +372,7 @@ class OP: LANGCHAIN_TOOL = "ai.tool.langchain" LANGCHAIN_AGENT = "ai.agent.langchain" LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain" + QUEUE_PROCESS = "queue.process" QUEUE_SUBMIT_ARQ = "queue.submit.arq" QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 62fdb1da6f..c5cfae58d9 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -4,7 +4,7 @@ import sentry_sdk from sentry_sdk import isolation_scope from sentry_sdk.api import continue_trace -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.integrations.celery.beat import ( _patch_beat_apply_entry, @@ -325,6 +325,18 @@ def _inner(*args, **kwargs): return _inner # type: ignore +def _set_messaging_destination_name(task, span): + # type: (Any, Span) -> None + """Set "messaging.destination.name" tag for span""" + with capture_internal_exceptions(): + delivery_info = task.request.delivery_info + routing_key = delivery_info.get("routing_key") + if delivery_info.get("exchange") == "" and routing_key is not None: + # Empty exchange indicates the default exchange, meaning the tasks + # are sent to the queue with the same name as the routing key. + span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key) + + def _wrap_task_call(task, f): # type: (Any, F) -> F @@ -332,13 +344,19 @@ def _wrap_task_call(task, f): # see it. Also celery's reported stacktrace is untrustworthy. # functools.wraps is important here because celery-once looks at this - # method's name. + # method's name. @ensure_integration_enabled internally calls functools.wraps, + # but if we ever remove the @ensure_integration_enabled decorator, we need + # to add @functools.wraps(f) here. # https://github.com/getsentry/sentry-python/issues/421 - @wraps(f) + @ensure_integration_enabled(CeleryIntegration, f) def _inner(*args, **kwargs): # type: (*Any, **Any) -> Any try: - return f(*args, **kwargs) + with sentry_sdk.start_span( + op=OP.QUEUE_PROCESS, description=task.name + ) as span: + _set_messaging_destination_name(task, span) + return f(*args, **kwargs) except Exception: exc_info = sys.exc_info() with capture_internal_exceptions(): diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 708294cf7e..e115f381d9 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -209,7 +209,17 @@ def dummy_task(x, y): else: assert execution_event["contexts"]["trace"]["status"] == "ok" - assert execution_event["spans"] == [] + assert len(execution_event["spans"]) == 1 + assert ( + execution_event["spans"][0].items() + >= { + "trace_id": str(transaction.trace_id), + "same_process_as_parent": True, + "op": "queue.process", + "description": "dummy_task", + "data": ApproxDict(), + }.items() + ) assert submission_event["spans"] == [ { "data": ApproxDict(), @@ -606,3 +616,46 @@ def example_task(): pytest.fail("Calling `apply_async` without arguments raised a TypeError") assert result.get() == "success" + + +@pytest.mark.parametrize("routing_key", ("celery", "custom")) +@mock.patch("celery.app.task.Task.request") +def test_messaging_destination_name_default_exchange( + mock_request, routing_key, init_celery, capture_events +): + celery_app = init_celery(enable_tracing=True) + events = capture_events() + mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""} + + @celery_app.task() + def task(): ... + + task.apply_async() + + (event,) = events + (span,) = event["spans"] + assert span["data"]["messaging.destination.name"] == routing_key + + +@mock.patch("celery.app.task.Task.request") +def test_messaging_destination_name_nondefault_exchange( + mock_request, init_celery, capture_events +): + """ + Currently, we only capture the routing key as the messaging.destination.name when + we are using the default exchange (""). This is because the default exchange ensures + that the routing key is the queue name. Other exchanges may not guarantee this + behavior. + """ + celery_app = init_celery(enable_tracing=True) + events = capture_events() + mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"} + + @celery_app.task() + def task(): ... + + task.apply_async() + + (event,) = events + (span,) = event["spans"] + assert "messaging.destination.name" not in span["data"]