Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chords with empty headers #4443

Merged
merged 1 commit into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 8 additions & 17 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,23 +412,19 @@ def add_to_chord(self, chord_id, result):
def on_chord_part_return(self, request, state, result, **kwargs):
pass

def fallback_chord_unlock(self, group_id, body, result=None,
countdown=1, **kwargs):
kwargs['result'] = [r.as_tuple() for r in result]
def fallback_chord_unlock(self, header_result, body, countdown=1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this won't break anyone's code?

**kwargs):
kwargs['result'] = [r.as_tuple() for r in header_result]
self.app.tasks['celery.chord_unlock'].apply_async(
(group_id, body,), kwargs, countdown=countdown,
(header_result.id, body,), kwargs, countdown=countdown,
)

def ensure_chords_allowed(self):
pass

def apply_chord(self, header, partial_args, group_id, body,
options={}, **kwargs):
def apply_chord(self, header_result, body, **kwargs):
self.ensure_chords_allowed()
fixed_options = {k: v for k, v in items(options) if k != 'task_id'}
result = header(*partial_args, task_id=group_id, **fixed_options or {})
self.fallback_chord_unlock(group_id, body, **kwargs)
return result
self.fallback_chord_unlock(header_result, body, **kwargs)

def current_task_children(self, request=None):
request = request or getattr(get_current_task(), 'request', None)
Expand Down Expand Up @@ -683,14 +679,9 @@ def _restore_group(self, group_id):
meta['result'] = result_from_tuple(result, self.app)
return meta

def _apply_chord_incr(self, header, partial_args, group_id, body,
result=None, options={}, **kwargs):
def _apply_chord_incr(self, header_result, body, **kwargs):
self.ensure_chords_allowed()
self.save_group(group_id, self.app.GroupResult(group_id, result))

fixed_options = {k: v for k, v in items(options) if k != 'task_id'}

return header(*partial_args, task_id=group_id, **fixed_options or {})
header_result.save(backend=self)

def on_chord_part_return(self, request, state, result, **kwargs):
if not self.implements_incr:
Expand Down
7 changes: 4 additions & 3 deletions celery/backends/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ def set(self, key, value):
def delete(self, key):
return self.client.delete(key)

def _apply_chord_incr(self, header, partial_args, group_id, body, **opts):
self.client.set(self.get_key_for_chord(group_id), 0, time=self.expires)
def _apply_chord_incr(self, header_result, body, **kwargs):
chord_key = self.get_key_for_chord(header_result.id)
self.client.set(chord_key, 0, time=self.expires)
return super(CacheBackend, self)._apply_chord_incr(
header, partial_args, group_id, body, **opts)
header_result, body, **kwargs)

def incr(self, key):
return self.client.incr(key)
Expand Down
6 changes: 2 additions & 4 deletions celery/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,13 @@ def _unpack_chord_result(self, tup, decode,
raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
return retval

def apply_chord(self, header, partial_args, group_id, body,
result=None, options={}, **kwargs):
def apply_chord(self, header_result, body, **kwargs):
# Overrides this to avoid calling GroupResult.save
# pylint: disable=method-hidden
# Note that KeyValueStoreBackend.__init__ sets self.apply_chord
# if the implements_incr attr is set. Redis backend doesn't set
# this flag.
options['task_id'] = group_id
return header(*partial_args, **options or {})
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this applies for all cases? That code is there for a reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - apply_chord() is only called from chord.run(), and we're unconditionally calling the header (with the replaced task ID) there now: https://github.com/celery/celery/pull/4443/files#diff-e27381cb5c64031dd28aa6855c78cfddR1277


def on_chord_part_return(self, request, state, result,
propagate=None, **kwargs):
Expand Down
27 changes: 19 additions & 8 deletions celery/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,21 +1268,32 @@ def run(self, header, body, partial_args, app=None, interval=None,
options.pop('task_id', None)
body.options.update(options)

results = header.freeze(
group_id=group_id, chord=body, root_id=root_id).results
bodyres = body.freeze(task_id, root_id=root_id)

# Chains should not be passed to the header tasks. See #3771
options.pop('chain', None)
# Neither should chords, for deeply nested chords to work
options.pop('chord', None)
options.pop('task_id', None)

header.freeze(group_id=group_id, chord=body, root_id=root_id)
header_result = header(*partial_args, task_id=group_id, **options)

if len(header_result) > 0:
app.backend.apply_chord(
header_result,
body,
interval=interval,
countdown=countdown,
max_retries=max_retries,
)
# The execution of a chord body is normally triggered by its header's
# tasks completing. If the header is empty this will never happen, so
# we execute the body manually here.
else:
body.delay([])

parent = app.backend.apply_chord(
header, partial_args, group_id, body,
interval=interval, countdown=countdown,
options=options, max_retries=max_retries,
result=results)
bodyres.parent = parent
bodyres.parent = header_result
return bodyres

def clone(self, *args, **kwargs):
Expand Down
14 changes: 14 additions & 0 deletions t/integration/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ def test_single_task_header(self, manager):
res2 = c2()
assert res2.get(timeout=TIMEOUT) == [16]

def test_empty_header_chord(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])

c1 = chord([], body=add_to_all.s(9))
res1 = c1()
assert res1.get(timeout=TIMEOUT) == []

c2 = group([]) | add_to_all.s(9)
res2 = c2()
assert res2.get(timeout=TIMEOUT) == []

@flaky
def test_nested_chord(self, manager):
try:
Expand Down
16 changes: 10 additions & 6 deletions t/unit/backends/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ def test_on_chord_part_return(self):

def test_apply_chord(self, unlock='celery.chord_unlock'):
self.app.tasks[unlock] = Mock()
self.b.apply_chord(
group(app=self.app), (), 'dakj221', None,
result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
header_result = self.app.GroupResult(
uuid(),
[self.app.AsyncResult(x) for x in range(3)],
)
self.b.apply_chord(header_result, None)
assert self.app.tasks[unlock].apply_async.call_count


Expand Down Expand Up @@ -527,12 +528,15 @@ def test_restore_group_from_pickle(self):
def test_chord_apply_fallback(self):
self.b.implements_incr = False
self.b.fallback_chord_unlock = Mock()
header_result = self.app.GroupResult(
'group_id',
[self.app.AsyncResult(x) for x in range(3)],
)
self.b.apply_chord(
group(app=self.app), (), 'group_id', 'body',
result='result', foo=1,
header_result, 'body', foo=1,
)
self.b.fallback_chord_unlock.assert_called_with(
'group_id', 'body', result='result', foo=1,
header_result, 'body', foo=1,
)

def test_get_missing_meta(self):
Expand Down
19 changes: 13 additions & 6 deletions t/unit/backends/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from case import Mock, mock, patch, skip
from kombu.utils.encoding import ensure_bytes, str_to_bytes

from celery import group, signature, states, uuid
from celery import signature, states, uuid
from celery.backends.cache import CacheBackend, DummyClient, backends
from celery.exceptions import ImproperlyConfigured
from celery.five import bytes_if_py2, items, string, text_t
Expand Down Expand Up @@ -65,8 +65,12 @@ def test_mark_as_failure(self):

def test_apply_chord(self):
tb = CacheBackend(backend='memory://', app=self.app)
gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
result = self.app.GroupResult(
uuid(),
[self.app.AsyncResult(uuid()) for _ in range(3)],
)
tb.apply_chord(result, None)
assert self.app.GroupResult.restore(result.id, backend=tb) == result

@patch('celery.result.GroupResult.restore')
def test_on_chord_part_return(self, restore):
Expand All @@ -81,9 +85,12 @@ def test_on_chord_part_return(self, restore):
self.app.tasks['foobarbaz'] = task
task.request.chord = signature(task)

gid, res = uuid(), [self.app.AsyncResult(uuid()) for _ in range(3)]
task.request.group = gid
tb.apply_chord(group(app=self.app), (), gid, {}, result=res)
result = self.app.GroupResult(
uuid(),
[self.app.AsyncResult(uuid()) for _ in range(3)],
)
task.request.group = result.id
tb.apply_chord(result, None)

deps.join_native.assert_not_called()
tb.on_chord_part_return(task.request, 'SUCCESS', 10)
Expand Down
14 changes: 7 additions & 7 deletions t/unit/backends/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ def test_expire(self):
self.b.expire('foo', 300)
self.b.client.expire.assert_called_with('foo', 300)

def test_apply_chord(self):
header = Mock(name='header')
header.results = [Mock(name='t1'), Mock(name='t2')]
self.b.apply_chord(
header, (1, 2), 'gid', None,
options={'max_retries': 10},
def test_apply_chord(self, unlock='celery.chord_unlock'):
self.app.tasks[unlock] = Mock()
header_result = self.app.GroupResult(
uuid(),
[self.app.AsyncResult(x) for x in range(3)],
)
header.assert_called_with(1, 2, max_retries=10, task_id='gid')
self.b.apply_chord(header_result, None)
assert self.app.tasks[unlock].apply_async.call_count == 0

def test_unpack_chord_result(self):
self.b.exception_to_python = Mock(name='etp')
Expand Down
2 changes: 1 addition & 1 deletion t/unit/backends/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_ensure_chords_allowed(self):

def test_apply_chord(self):
with pytest.raises(NotImplementedError):
self.b.apply_chord([], (), 'gid', Mock(name='body'))
self.b.apply_chord(self.app.GroupResult(), None)

@pytest.mark.celery(result_backend='rpc')
def test_chord_raises_error(self):
Expand Down