-
Notifications
You must be signed in to change notification settings - Fork 395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[celery] use Celery signals instead of patch #530
[celery] use Celery signals instead of patch #530
Conversation
@@ -0,0 +1,64 @@ | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation is updated here: #531
ddtrace/contrib/celery/app.py
Outdated
pin = pin or Pin(service=WORKER_SERVICE, app=APP, app_type=AppTypes.worker) | ||
pin.onto(app) | ||
|
||
signals.task_prerun.connect(trace_prerun) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same signal is not called multiple times so our handlers are called globally if at least one app is instrumented. In general, we have only one Celery app active for each process, even though we support multiple apps running at the same time.
ddtrace/contrib/celery/signals.py
Outdated
|
||
# propagate the `Span` in the current task Context | ||
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=c.WORKER_SERVICE, resource=task.name) | ||
propagate_span(task, task_id, span) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is required to move the span from one signal to another. You can check the weak dictionary implementation for more details. There is also a test that checks for a memory leak. Would be great having your opinion on that.
Sidenote: we can't use objects inside the signal arguments. Any change to dictionary passed there, will alter the Celery behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my understanding (which could be faulty, I'm not well-versed in weak reference usage) it seems as though the only reference to the in-transit span is through the weakref dictionary. If this is the case then aren't the spans subject to being garbage collected at any time? Could this lead to spans being garbage collected before they finish?
Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense.
If I'm following things correctly, the tracer object has a strong reference to the span so it won't get garbage collected until the tracer finishes the trace and drops its reference to the span.
In theory having the link to the task as a normal link should be fine, but in the case where there's a memory leak for tasks (e.g. finished tasks being kept around in memory) we'd end up amplifying that problem.
This approach should buy us a bit of safety for fairly minimal risk and complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we have a strong reference in the tracer (Context
to be accurate). My idea is to avoid increasing the memory leak from our side, even though we can generate a leak from another part of the instrumentation. This change is to propagate tracing data from one signal to the other, reducing at minimum the risk of having a leak in case something goes wrong.
I'm keeping this implementation for now.
) | ||
|
||
# Enable instrumentation everywhere | ||
patch_app(task.app) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this behavior is correct. Probably if developers want to instrument only 2 or 3 tasks, it's better to make this a no-op too. Otherwise they will see everything instrumented. I think it's a matter of tradeoffs here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to allow developers to instrument a select set of tasks could we provide a configuration setting like patched_tasks=frozenset([task1, task2,...])
and then in the trace methods we check if the task is to be instrumented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this one feels pretty rough.
My initial reaction is to say that this should be a no-op as normally that's safer than having a more expansive execution than the original semantics.
That being said, after chewing on it a bit, I think this (patch everything) is the right call. The reason being, for us too much data
> lost data
. If somebody were to apply an update without noticing the deprecation and we made this a no-op, they'd irretrievably loose data. On the other hand, if their account gets spammed with more than they wanted it would both be obvious where the problem is and they'd still have access to the traces they cared about (among the noise).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will keep it that way. My reasoning to not provide a patched_tasks
config, is because I would like to understand if it's really a use case. Also, we don't have configurations in place right now, so it's hard to provide this setting for Celery. I may work on it in another PR.
ddtrace/contrib/celery/util.py
Outdated
return body.get('id') | ||
|
||
|
||
def require_pin(decorated): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not needed anymore. Probably we should simply remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it anywhere else in the PR. So yeah it looks safe to drop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was only used for testing purposes for old-style tasks, so it should be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing it then.
@@ -35,6 +35,7 @@ class Span(object): | |||
'_context', | |||
'_finished', | |||
'_parent', | |||
'__weakref__', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's required to store weak references for these instances. In a sense is a huge change in the API, but it's not changing anything about how the Span
works. If we have other ideas we may change how to propagate the Span
.
eq_(t, 42) | ||
|
||
traces = self.tracer.writer.pop_traces() | ||
eq_(0, len(traces)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
|
||
def test_fn_task_parameters_bind(self): | ||
# it should execute a traced task that has parameters | ||
eq_(1, len(traces[0])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because before the .run()
method was instrumented via monkey-patch, calling .apply()
generated 2 spans: one for the apply()
and one for the run()
. From the developer perspective, I think it's better to have only one span because apply()
(sync call) always call the body of the function.
t.run() | ||
spans = self.tracer.writer.pop() | ||
self.assertEqual(len(spans), 2) | ||
res = t.apply() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test changed to use apply()
instead of run()
because of the considerations above.
@@ -1,439 +0,0 @@ | |||
import celery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed entirely. All these cases are tested under test_integration.py
.
weak_dict.get(task_id).finish() | ||
self.tracer.writer.pop() | ||
self.tracer.writer.pop_traces() | ||
gc.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we have weak references, this means the element is eligible for garbage collection at any time after the Span
is finished. It's not garbage collected if we still have references of the Span
. This is needed because it's possible the signal mechanism is not called properly from Celery (unexpected behaviors / unhandled executions) and so even if a Span is not finished it can be garbage collected if the Tracer releases the reference to the Span
.
We may need to check if we have to connect to other signals to be sure Spans are always finished.
ddtrace/contrib/celery/signals.py
Outdated
return | ||
else: | ||
span.finish() | ||
remove_span(task, task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove_span
would make me think that we remove the span
itself. Maybe go with something like detach
? (in which can propagate
becomes attach
?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, changing with this verb is way more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it looks good to me.
My main meta question: do we have the certainty that pre and post signals are always caught?
Any setup in which one could be missing, which would then break a whole trace?
from what I've seen in the Celery code, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the signal patching. So much cleaner!
Mostly just nits, small improvements and me answering my own questions.
ddtrace/contrib/celery/app.py
Outdated
""" | ||
pin = Pin.get_from(app) | ||
if pin is not None: | ||
delattr(app, _DD_PIN_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Not in the scope of this PR] should we consider adding a Pin.remove_from(app)
method? It doesn't seem ideal that we have to import an implementation detail from the pin module to delete a pin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was the plan. Because of the nature of changes introduced in 0.13.0, I didn't want to change anything so critical in our core. Consider that we should change a bit the Pin
internal API because it's not a quick change with a small impact.
Scheduling some work for the future.
ddtrace/contrib/celery/util.py
Outdated
|
||
from .constants import CTX_KEY | ||
|
||
# Service info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are duplicates from the values defined in constants.py
:
dd-trace-py/ddtrace/contrib/celery/util.py
Lines 11 to 14 in b7f5598
# Service info | |
APP = 'celery' | |
PRODUCER_SERVICE = os.environ.get('DATADOG_SERVICE_NAME') or 'celery-producer' | |
WORKER_SERVICE = os.environ.get('DATADOG_SERVICE_NAME') or 'celery-worker' |
dd-trace-py/ddtrace/contrib/celery/constants.py
Lines 16 to 19 in b7f5598
# Service info | |
APP = 'celery' | |
PRODUCER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-producer' | |
WORKER_SERVICE = getenv('DATADOG_SERVICE_NAME') or 'celery-worker' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch, probably it's a wrong copy I did when trying to make the PR readable. Removing them and keeping only things on constants.py
.
) | ||
|
||
|
||
def patch_app(app, pin=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
patch_app
does not appear to be idempotent. If it were to be called repeatedly would we register the same signal receiver multiple times on the same signal?
It looks like in the implementation of Signal.connect
that we will add another identical receiver: https://github.com/celery/celery/blob/b24425ea6320c2b95fe1873f1f00966c3b38952b/celery/utils/dispatch/signal.py#L221
but I'm not 100% sure about this.
We could do similarly to our other integrations and set a patched attribute to the app and check for it when patching. Like we do here:
dd-trace-py/ddtrace/contrib/redis/patch.py
Lines 18 to 20 in 2eb60a9
if getattr(redis, '_datadog_patch', False): | |
return | |
setattr(redis, '_datadog_patch', True) |
Edit: I do see that there's an integration test covering the idempotent patching but I still think it'd be good practice to keep the patching attribute on the task to be explicit and not depend on celery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, worth changing especially because it's something internal that may change in the future heading to an unexpected behavior.
# changes in Celery | ||
task = kwargs.get('sender') | ||
task_id = kwargs.get('task_id') | ||
if task is None or task_id is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to maybe log something in these cases that we shortcut out without action?
This could be useful for users trying to get our tracing working with an unsupported celery version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah let's log something in debug mode.
) | ||
|
||
# Enable instrumentation everywhere | ||
patch_app(task.app) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to allow developers to instrument a select set of tasks could we provide a configuration setting like patched_tasks=frozenset([task1, task2,...])
and then in the trace methods we check if the task is to be instrumented?
tests/contrib/celery/test_app.py
Outdated
|
||
def test_unpatch_app(self): | ||
# When unpatch_app is called on a patched app we unpatch the `task()` method | ||
# When celery.App is patched it must not include a `Pin` instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo nit:
# When celery.App is patched it must not include a `Pin` instance |
patched
-> unpatched
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
ddtrace/contrib/celery/util.py
Outdated
@@ -0,0 +1,115 @@ | |||
# stdlib |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this file be utils.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing it to utils.py
. It's not exported from our __all__
list, so it should not be considered a breaking change.
ddtrace/contrib/celery/signals.py
Outdated
|
||
# propagate the `Span` in the current task Context | ||
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=c.WORKER_SERVICE, resource=task.name) | ||
propagate_span(task, task_id, span) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my understanding (which could be faulty, I'm not well-versed in weak reference usage) it seems as though the only reference to the in-transit span is through the weakref dictionary. If this is the case then aren't the spans subject to being garbage collected at any time? Could this lead to spans being garbage collected before they finish?
Am I missing something?
eq_(t, 42) | ||
|
||
traces = self.tracer.writer.pop_traces() | ||
eq_(0, len(traces)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be sure to document this so that users are aware that .run()
invocations will not generate spans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Provided feedback where requested. Otherwise it looks like other reviewers already commented on anything I'd look to change.
ddtrace/contrib/celery/signals.py
Outdated
|
||
# propagate the `Span` in the current task Context | ||
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=c.WORKER_SERVICE, resource=task.name) | ||
propagate_span(task, task_id, span) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense.
If I'm following things correctly, the tracer object has a strong reference to the span so it won't get garbage collected until the tracer finishes the trace and drops its reference to the span.
In theory having the link to the task as a normal link should be fine, but in the case where there's a memory leak for tasks (e.g. finished tasks being kept around in memory) we'd end up amplifying that problem.
This approach should buy us a bit of safety for fairly minimal risk and complexity.
) | ||
|
||
# Enable instrumentation everywhere | ||
patch_app(task.app) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this one feels pretty rough.
My initial reaction is to say that this should be a no-op as normally that's safer than having a more expansive execution than the original semantics.
That being said, after chewing on it a bit, I think this (patch everything) is the right call. The reason being, for us too much data
> lost data
. If somebody were to apply an update without noticing the deprecation and we made this a no-op, they'd irretrievably loose data. On the other hand, if their account gets spammed with more than they wanted it would both be obvious where the problem is and they'd still have access to the traces they cared about (among the noise).
* [core] revert argv patch * [core] patch celery via post-import hooks * [core] set patched to true for celery patching * [core] remove patched module count side effect * [celery] add tests verifying the import hook patching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Overview
Closes #495
The previous approach used a monkey-patch mechanism to instrument the base
Celery()
app, including any registered tasks. Unfortunately, the patch mechanism altered the way Celery registers old-styleTask
(forabstract
classes), making complicated the signature match for thetask
decorator. The approach introduced many problems with synchronous Celery calls (#495).This PR removes the old monkey-patch mechanism in favor of Celery signals. The new approach uses the internal tracing system to detect when tasks are executed via Celery API.
Caveats
Signals are not emitted when a plain function is called. It means if we have a task defined as:
calling
my_task()
will not generate any traces. This may be considered a breaking change in the integration behavior that cannot be achieved without reintroducing monkey-patch again (causing other problems with old-style tasks and signatures matching).Backward Compatibility
This PR introduces the following changes:
my_task()
or via the.run()
method), no traces are generated. Synchronous calls done via.apply()
work as expected. It could be considered more common using the Celery API (.delay()
,.apply_async([])
,.apply()
), even though it could stop working on some systemspatch_task()
andunpatch_task()
) are preserved, so there are no breaking changes in the API. The first API adds instrumentation to all tasks, while the other is a no-op.