Skip to content
This repository
Browse code

Merge branch 'master' of github.com:celery/celery

  • Loading branch information...
commit 015bda39e487efa6a871ad51973449b6556d8b9f 2 parents ef70164 + 173d6f3
Ask Solem Hoel ask authored

Showing 1 changed file with 15 additions and 7 deletions. Show diff stats Hide diff stats

  1. +15 7 celery/app/builtins.py
22 celery/app/builtins.py
@@ -229,7 +229,7 @@ def prepare_steps(self, args, tasks):
229 229 return tasks, results
230 230
231 231 def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
232   - task_id=None, **options):
  232 + task_id=None, link=None, link_error=None, **options):
233 233 if self.app.conf.CELERY_ALWAYS_EAGER:
234 234 return self.apply(args, kwargs, **options)
235 235 options.pop('publisher', None)
@@ -242,6 +242,13 @@ def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
242 242 if task_id:
243 243 tasks[-1].set(task_id=task_id)
244 244 result = tasks[-1].type.AsyncResult(task_id)
  245 + # make sure we can do a link() and link_error() on a chain object.
  246 + if link:
  247 + tasks[-1].set(link=link)
  248 + # and if any task in the chain fails, call the errbacks
  249 + if link_error:
  250 + for task in tasks:
  251 + task.set(link_error=link_error)
245 252 tasks[0].apply_async()
246 253 return result
247 254
@@ -307,16 +314,17 @@ def _prepare_member(self, task, body, group_id):
307 314 def apply_async(self, args=(), kwargs={}, task_id=None, **options):
308 315 if self.app.conf.CELERY_ALWAYS_EAGER:
309 316 return self.apply(args, kwargs, **options)
310   - group_id = options.pop('group_id', None)
311   - chord = options.pop('chord', None)
312 317 header = kwargs.pop('header')
313 318 body = kwargs.pop('body')
314 319 header, body = (list(maybe_subtask(header)),
315 320 maybe_subtask(body))
316   - if group_id:
317   - body.set(group_id=group_id)
318   - if chord:
319   - body.set(chord=chord)
  321 + # forward certain options to body
  322 + for opt_name in ['group_id', 'chord']:
  323 + opt_value = options.pop(opt_name, None)
  324 + if opt_value:
  325 + body.set(**{opt_name: opt_value})
  326 + map(body.link, options.pop('link', []))
  327 + map(body.link_error, options.pop('link_error', []))
320 328 callback_id = body.options.setdefault('task_id', task_id or uuid())
321 329 parent = super(Chord, self).apply_async((header, body, args),
322 330 kwargs, **options)

0 comments on commit 015bda3

Please sign in to comment.
Something went wrong with that request. Please try again.