Skip to content

Commit

Permalink
feat(celery): Send queue name to Sentry
Browse files Browse the repository at this point in the history
Send the queue name to Sentry for Celery tasks using the default exchange. The queue name is sent as span data with the key `messaging.destination.name` within a new span op named "queue.process".

Also, add tests for the new behavior.

Ref GH-2961
  • Loading branch information
szokeasaurusrex committed May 15, 2024
1 parent cc11c0f commit 72ac630
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 5 deletions.
7 changes: 7 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ class SPANDATA:
Example: 418
"""

MESSAGING_DESTINATION_NAME = "messaging.destination.name"
"""
The destination name where the message is being consumed from,
e.g. the queue name or topic.
"""

SERVER_ADDRESS = "server.address"
"""
Name of the database host.
Expand Down Expand Up @@ -366,6 +372,7 @@ class OP:
LANGCHAIN_TOOL = "ai.tool.langchain"
LANGCHAIN_AGENT = "ai.agent.langchain"
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
QUEUE_PROCESS = "queue.process"
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
QUEUE_TASK_ARQ = "queue.task.arq"
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
Expand Down
26 changes: 22 additions & 4 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sentry_sdk
from sentry_sdk import isolation_scope
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.celery.beat import (
_patch_beat_apply_entry,
Expand Down Expand Up @@ -325,20 +325,38 @@ def _inner(*args, **kwargs):
return _inner # type: ignore


def _set_messaging_destination_name(task, span):
# type: (Any, Span) -> None
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
routing_key = delivery_info.get("routing_key")
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)


def _wrap_task_call(task, f):
# type: (Any, F) -> F

# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.

# functools.wraps is important here because celery-once looks at this
# method's name.
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
# https://github.com/getsentry/sentry-python/issues/421
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
try:
return f(*args, **kwargs)
with sentry_sdk.start_span(
op=OP.QUEUE_PROCESS, description=task.name
) as span:
_set_messaging_destination_name(task, span)
return f(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
Expand Down
55 changes: 54 additions & 1 deletion tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,17 @@ def dummy_task(x, y):
else:
assert execution_event["contexts"]["trace"]["status"] == "ok"

assert execution_event["spans"] == []
assert len(execution_event["spans"]) == 1
assert (
execution_event["spans"][0].items()
>= {
"trace_id": str(transaction.trace_id),
"same_process_as_parent": True,
"op": "queue.process",
"description": "dummy_task",
"data": ApproxDict(),
}.items()
)
assert submission_event["spans"] == [
{
"data": ApproxDict(),
Expand Down Expand Up @@ -606,3 +616,46 @@ def example_task():
pytest.fail("Calling `apply_async` without arguments raised a TypeError")

assert result.get() == "success"


@pytest.mark.parametrize("routing_key", ("celery", "custom"))
@mock.patch("celery.app.task.Task.request")
def test_messaging_destination_name_default_exchange(
mock_request, routing_key, init_celery, capture_events
):
celery_app = init_celery(enable_tracing=True)
events = capture_events()
mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""}

@celery_app.task()
def task(): ...

task.apply_async()

(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.destination.name"] == routing_key


@mock.patch("celery.app.task.Task.request")
def test_messaging_destination_name_nondefault_exchange(
mock_request, init_celery, capture_events
):
"""
Currently, we only capture the routing key as the messaging.destination.name when
we are using the default exchange (""). This is because the default exchange ensures
that the routing key is the queue name. Other exchanges may not guarantee this
behavior.
"""
celery_app = init_celery(enable_tracing=True)
events = capture_events()
mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"}

@celery_app.task()
def task(): ...

task.apply_async()

(event,) = events
(span,) = event["spans"]
assert "messaging.destination.name" not in span["data"]

0 comments on commit 72ac630

Please sign in to comment.