Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate baggage headers in Celery integration introduced in SDK 2.0 #2993

Merged
merged 16 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 66 additions & 79 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,77 @@ def event_processor(event, hint):
return event_processor


def _wrap_apply_async(f):
# type: (F) -> F
def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
# type: (dict[str, Any], Optional[Span], bool) -> dict[str, Any]
"""
Apply_async is always called to put a task in the queue. This is called by the
celery client (for example the Django project or the Celery Beat process)
Updates the headers of the Celery task with the tracing information
and eventually Sentry Crons monitoring information for beat tasks.
"""
updated_headers = original_headers.copy()
with capture_internal_exceptions():
headers = {}
if span is not None:
headers = dict(
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
)
if monitor_beat_tasks:
headers.update(
{
"sentry-monitor-start-timestamp-s": "%.9f"
% _now_seconds_since_epoch(),
}
)

if headers:
existing_baggage = updated_headers.get(BAGGAGE_HEADER_NAME)
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)

combined_baggage = sentry_baggage or existing_baggage
if sentry_baggage and existing_baggage:
combined_baggage = "{},{}".format(
existing_baggage,
sentry_baggage,
)

updated_headers.update(headers)
if combined_baggage:
updated_headers[BAGGAGE_HEADER_NAME] = combined_baggage

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
updated_headers.setdefault("headers", {}).update(headers)
if combined_baggage:
updated_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage

# Add the Sentry options potentially added in `sentry_apply_entry`
# to the headers (done when auto-instrumenting Celery Beat tasks)
for key, value in updated_headers.items():
if key.startswith("sentry-"):
updated_headers["headers"][key] = value

return updated_headers


def _wrap_apply_async(f):
# type: (F) -> F
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
propagate_traces = kwarg_headers.pop(
"sentry-propagate-traces", integration.propagate_traces
)

if not propagate_traces:
return f(*args, **kwargs)

task = args[0]

# Do not create a span when the task is a Celery Beat task
Expand All @@ -177,82 +237,9 @@ def apply_async(*args, **kwargs):
) # type: Union[Span, NoOpMgr]

with span_mgr as span:
incoming_headers = kwargs.get("headers") or {}
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)

# If Sentry Crons monitoring for Celery Beat tasks is enabled
# add start timestamp of task,
if integration is not None and integration.monitor_beat_tasks:
incoming_headers.update(
{
"sentry-monitor-start-timestamp-s": "%.9f"
% _now_seconds_since_epoch(),
}
)

# Propagate Sentry trace information into the Celery task if desired
default_propagate_traces = (
integration.propagate_traces if integration is not None else True
)
propagate_traces = incoming_headers.pop(
"sentry-propagate-traces", default_propagate_traces
kwargs["headers"] = _update_celery_task_headers(
kwarg_headers, span, integration.monitor_beat_tasks
)

if propagate_traces:
with capture_internal_exceptions():
sentry_trace_headers = dict(
Scope.get_current_scope().iter_trace_propagation_headers(
span=span
)
)
# Set Sentry trace data in the headers of the Celery task
if sentry_trace_headers:
# Make sure we don't overwrite existing baggage
incoming_baggage = incoming_headers.get(BAGGAGE_HEADER_NAME)
sentry_baggage = sentry_trace_headers.get(BAGGAGE_HEADER_NAME)

combined_baggage = sentry_baggage or incoming_baggage
if sentry_baggage and incoming_baggage:
combined_baggage = "{},{}".format(
incoming_baggage,
sentry_baggage,
)

# Set Sentry trace data to the headers of the Celery task
incoming_headers.update(sentry_trace_headers)

if combined_baggage:
incoming_headers[BAGGAGE_HEADER_NAME] = combined_baggage

# Set sentry trace data also to the inner headers of the Celery task
# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
incoming_headers.setdefault("headers", {}).update(
sentry_trace_headers
)
if combined_baggage:
incoming_headers["headers"][
BAGGAGE_HEADER_NAME
] = combined_baggage

# Add the Sentry options potentially added in `sentry_sdk.integrations.beat.sentry_apply_entry`
# to the inner headers (done when auto-instrumenting Celery Beat tasks)
# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
incoming_headers.setdefault("headers", {})
for key, value in incoming_headers.items():
if key.startswith("sentry-"):
incoming_headers["headers"][key] = value

# Run the task (with updated headers in kwargs)
kwargs["headers"] = incoming_headers

return f(*args, **kwargs)

return apply_async # type: ignore
Expand Down
8 changes: 7 additions & 1 deletion tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,13 @@ def dummy_task(self, x, y):
# in the monkey patched version of `apply_async`
# in `sentry_sdk/integrations/celery.py::_wrap_apply_async()`
result = dummy_task.apply_async(args=(1, 0), headers=sentry_crons_setup)
assert result.get() == sentry_crons_setup

expected_headers = sentry_crons_setup.copy()
# Newly added headers
expected_headers["sentry-trace"] = mock.ANY
expected_headers["baggage"] = mock.ANY

assert result.get() == expected_headers


def test_baggage_propagation(init_celery):
Expand Down
162 changes: 162 additions & 0 deletions tests/integrations/celery/test_update_celery_task_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import pytest

from unittest import mock

from sentry_sdk.integrations.celery import _update_celery_task_headers
import sentry_sdk


BAGGAGE_VALUE = (
"sentry-trace_id=771a43a4192642f0b136d5159a501700,"
"sentry-public_key=49d0f7386ad645858ae85020e393bef3,"
"sentry-sample_rate=0.1337,"
"custom=value"
)

SENTRY_TRACE_VALUE = "771a43a4192642f0b136d5159a501700-1234567890abcdef-1"


@pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0])
def test_monitor_beat_tasks(monitor_beat_tasks):
headers = {}
span = None

updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks)

assert headers == {} # left unchanged

if monitor_beat_tasks:
assert updated_headers == {
"headers": {"sentry-monitor-start-timestamp-s": mock.ANY},
"sentry-monitor-start-timestamp-s": mock.ANY,
}
else:
assert updated_headers == headers


@pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0])
def test_monitor_beat_tasks_with_headers(monitor_beat_tasks):
headers = {
"blub": "foo",
"sentry-something": "bar",
}
span = None

updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks)

if monitor_beat_tasks:
assert updated_headers == {
"blub": "foo",
"sentry-something": "bar",
"headers": {
"sentry-monitor-start-timestamp-s": mock.ANY,
"sentry-something": "bar",
},
"sentry-monitor-start-timestamp-s": mock.ANY,
}
else:
assert updated_headers == headers


def test_span_with_transaction(sentry_init):
sentry_init(enable_tracing=True)
headers = {}

with sentry_sdk.start_transaction(name="test_transaction") as transaction:
with sentry_sdk.start_span(op="test_span") as span:
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
assert updated_headers["baggage"] == transaction.get_baggage().serialize()
assert (
updated_headers["headers"]["baggage"]
== transaction.get_baggage().serialize()
)


def test_span_with_no_transaction(sentry_init):
sentry_init(enable_tracing=True)
headers = {}

with sentry_sdk.start_span(op="test_span") as span:
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
assert "baggage" not in updated_headers.keys()
assert "baggage" not in updated_headers["headers"].keys()


def test_custom_span(sentry_init):
sentry_init(enable_tracing=True)
span = sentry_sdk.tracing.Span()
headers = {}

with sentry_sdk.start_transaction(name="test_transaction"):
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
assert "baggage" not in updated_headers.keys()
assert "baggage" not in updated_headers["headers"].keys()


def test_span_with_transaction_custom_headers(sentry_init):
sentry_init(enable_tracing=True)
headers = {
"baggage": BAGGAGE_VALUE,
"sentry-trace": SENTRY_TRACE_VALUE,
}

with sentry_sdk.start_transaction(name="test_transaction") as transaction:
with sentry_sdk.start_span(op="test_span") as span:
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
# This is probably the cause for https://github.com/getsentry/sentry-python/issues/2916
# If incoming baggage includes sentry data, we should not concatenate a new baggage value to it
# but just keep the incoming sentry baggage values and concatenate new third-party items to the baggage
# I have some code somewhere where I have implemented this.
assert (
updated_headers["baggage"]
== headers["baggage"] + "," + transaction.get_baggage().serialize()
)
assert (
updated_headers["headers"]["baggage"]
== headers["baggage"] + "," + transaction.get_baggage().serialize()
)


def test_span_with_no_transaction_custom_headers(sentry_init):
sentry_init(enable_tracing=True)
headers = {
"baggage": BAGGAGE_VALUE,
"sentry-trace": SENTRY_TRACE_VALUE,
}

with sentry_sdk.start_span(op="test_span") as span:
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
assert updated_headers["baggage"] == headers["baggage"]
assert updated_headers["headers"]["baggage"] == headers["baggage"]


def test_custom_span_custom_headers(sentry_init):
sentry_init(enable_tracing=True)
span = sentry_sdk.tracing.Span()
headers = {
"baggage": BAGGAGE_VALUE,
"sentry-trace": SENTRY_TRACE_VALUE,
}

with sentry_sdk.start_transaction(name="test_transaction"):
updated_headers = _update_celery_task_headers(headers, span, False)

assert updated_headers["sentry-trace"] == span.to_traceparent()
assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent()
assert updated_headers["baggage"] == headers["baggage"]
assert updated_headers["headers"]["baggage"] == headers["baggage"]
Loading