New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support chords with empty headers #4443
Conversation
6588867
to
13578e3
Compare
13578e3
to
dd66d85
Compare
Quite the radical change we have here. I'd have to review this after coffee :) |
Yep, fair enough - the kernel of the changes is that the header is called in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather avoid changing public APIs as this can result in code breakage. So I need you to be 100% sure this won't affect anyone before merging this.
Furthermore, other result backends might be affected by this change so I'm quite surprised why this works with the DynamoDB backend for example.
If we can answer yes to both questions then LGTM.
def fallback_chord_unlock(self, group_id, body, result=None, | ||
countdown=1, **kwargs): | ||
kwargs['result'] = [r.as_tuple() for r in result] | ||
def fallback_chord_unlock(self, header_result, body, countdown=1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this won't break anyone's code?
# Overrides this to avoid calling GroupResult.save | ||
# pylint: disable=method-hidden | ||
# Note that KeyValueStoreBackend.__init__ sets self.apply_chord | ||
# if the implements_incr attr is set. Redis backend doesn't set | ||
# this flag. | ||
options['task_id'] = group_id | ||
return header(*partial_args, **options or {}) | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this applies for all cases? That code is there for a reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - apply_chord()
is only called from chord.run()
, and we're unconditionally calling the header (with the replaced task ID) there now: https://github.com/celery/celery/pull/4443/files#diff-e27381cb5c64031dd28aa6855c78cfddR1277
Yeah, I wasn't entirely sure about what's considered a public. If there are third-party backends that out there that override If we can get away with it's a nice simplification - we were calling the header (and replacing the group ID) in three separate places, and having to pass along the arguments everywhere and so on. Now we just call the header in the one place the same way, and the backends use That said if you want to be strict about maintaining these signatures (and the behaviour of calling the header inside each backend's |
This fixes a real problem so I'm going to risk it. |
When a chord's header group contains no tasks, its body is never executed. The reason for this is pretty straightforward: execution of the body is triggered by completion of header tasks. If there are no header tasks to be completed, the body is never executed.
An empty
GroupResult
is considered completed and successful, so I think it's reasonable to expect a chord with an empty group header to complete.The fix is to run the body "manually" in
chord.run()
when there are no header tasks.In addition, I've pulled out the parts of the various
apply_chord()
s that actually call the header and changed its signature accordingly, instead calling the header inchord.run()
before callingapply_chord()
only if there are header tasks.Fixes #4333