Chord callback never fired with nested canvas objects #1662

Closed
naftulikay opened this Issue Nov 18, 2013 · 10 comments

Projects

None yet

4 participants

@naftulikay
Contributor

System Info
Ubuntu 12.04, Celery 3.1.4, Redis 2.6.16

Celery Configuration

import djcelery
djcelery.setup_loader()

BROKER_URL = "amqp://guest:guest@localhost:5672/"

CELERY_DEFAULT_QUEUE = 'rtshiurim'
CELERY_RESULT_BACKEND = 'redis://'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_CHORD_PROPAGATES = False

I seem to have come across a chord callback deadlock across various result backends including the Django database backend, the Redis backend, and the MongoDB backend. (I have tried all three by commenting out the result backend, setting it to redis://, and setting it to mongodb://, respectively)

Reproducing the Issue

I'm using some pretty simple tasks to reproduce the issue:

from time import sleep

import celery

@celery.task
def generate():
    sleep(1.0)
    print "Generate done!"
    return 'result'

@celery.task
def lower(result):
    sleep(1.0)
    print "Lower done!"
    return result.lower()

@celery.task
def upper(result):
    sleep(1.0)
    print "Upper done!"
    return result.upper()

@celery.task
def upload(result):
    sleep(1.0)
    print "Upload done for: %s!" % (result,)
    return 'upload'

@celery.task
def callback(results):
    print "It's all done! %s" % (results,)

To reproduce the issue, generate a chord like this:

from tasks import generate, lower, upper, upload, callback
from celery import chain, chord, group

chord(header=chain(
    generate.s(),
    group(
        chain(lower.s(), upload.s()),
        chain(upper.s(), upload.s())
    ), body=callback.s()
).delay()

When I run this, I see the following log output from my celery worker:

[2013-11-17 23:11:10,063: WARNING/Worker-8] Generate done!
[2013-11-17 23:11:11,103: WARNING/Worker-3] Lower done!
[2013-11-17 23:11:11,106: WARNING/Worker-8] Upper done!
[2013-11-17 23:11:12,128: WARNING/Worker-5] Upload done for: RESULT!
[2013-11-17 23:11:12,132: WARNING/Worker-8] Upload done for: result!

However, the callback task is never called.

Again, I've tested this with the Redis, MongoDB, and Django database backends and can't seem to get the callback to run.

I assume that what's happening behind the scenes is that (in Redis' case), the counter is never properly decremented or incremented as the group tasks are finished. I assume that this is caused by celery.result.GroupResult not fully crawling the tasks for their children, but I'm not sure.

I'm looking into this one, but I'm still getting familiar with the Celery codebase. Can anyone else reproduce this issue?

@ask
Member
ask commented Nov 18, 2013

Thanks! I don't think the header of a chord can be a chain, it's really expecting a group object here. I'm not sure how
it could be extended to support this construct either, but may it should simply raise an error if the header is not a group.

I think the construct is a bit upside down so to say, as what you really want is to add a callback to the inner group, not to the chain:

    chain(
        generate.s(),
        chord(
            header=group(
                chain(lower.s(), upload.s()),
                chain(upper.s(), upload.s())),
            body=callback.s()
        ),
    ).delay()
@naftulikay
Contributor

Awesome, yeah that makes a bit more sense, but what if the generate task fails?

I'm looking to run a pretty complex process with chains and groups and receive a single callback after they're all done to collect results and errors. In my actual code, the callback does this, and updates an object in my data store with a status, failed or complete.

Any ideas as to how I can make that happen?

In other words, with your example if the generate task fails, nothing happens further, am I right?

@ask
Member
ask commented Nov 19, 2013

In the case of a task raising an error:

  • chains: will stop after the first task that raises
  • group: tasks in the group are not affected by other tasks in the group raising.
  • chord: propagates errors to the chord body (callback), so if a task in the header fails
    the callback state will be updated with the failure (a ChordError), the callback will not be called.

If you want to perform an action when a task fails you can support a callback for this:

from celery import maybe_signature

 @app.task(bind=True)
 def generate(self, ..., on_error=None):
      try:
          ...
      except expected_exceptions:
          if on_error:
              maybe_signature(on_error, app=self.app).delay()

You can also use the link_error argument, but that will catch any exception and usually
you would want to classify errors into expected and unexpected errors, the latter which usually
requires manual intervention and possibly an alert to admins that something out of the ordinary
happened:

generate.s().set(link_error=handle_error_task.s()) | other_task.s()
@naftulikay
Contributor

Awesome, yeah, that makes sense. I'm closing this now.

@naftulikay naftulikay closed this Nov 19, 2013
@hrbonz
Contributor
hrbonz commented Dec 3, 2013

I have the same problem and can't get my callback to be called when the processing is long.
I started with this:

chord(rescore_result.s(article.pk) for article in articles)(rescore_callback.s())

After reading the discussion here, I modified my chord to be like this:

chord(
    header=group(rescore_result.s(article.pk) for article in articles),
    body=rescore_callback.s()).delay()

My callback is still not called at the end of my group. I use redis as a result backend and the articles queryset is ~ 2500 objects.

I set CELERY_IGNORE_RESULT=True but rescore_result is defined like this:

@task(ignore_result=False)
def rescore_result(pk):
    return _rescore(pk)

Any advice would be helping greatly as I'm still fairly new to celery.

@ask
Member
ask commented Dec 3, 2013

@hrbonz maybe you should check the Redis logs, maybe it removed keys due to memory pressure or similar

@hrbonz
Contributor
hrbonz commented Dec 4, 2013

@ask from the logs, it doesn't seem that it's dropping any entry.
I'm currently trying to get more logs with level debug and gettings rid of djcelery.

System

librabbitmq0 0.0.1.hg216-1
rabbitmq-server 3.2.1-1
redis-server 2:2.6.16-1~dotdeb.1

Python (2.7.3-4+deb7u1)

amqp==1.3.3
celery==3.1.3
kombu==3.0.4
librabbitmq==1.0.2
redis==2.8.0

Celery

I run celery in dev through the django manage command, I'm not sure it's the right way any more as I think djcelery is not the way to go anymore...

$ python manage.py celery worker --loglevel=info --without-mingle -n hrbonz

I use rabbitmq as a broker and redis as a result backend.

Configuration

CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS=True
CELERY_RESULT_BACKEND = 'redis://localhost/15'
BROKER_URL = 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'redis://localhost/15'
CELERY_DEFAULT_QUEUE = 'hrbonz'
from kombu import Exchange, Queue
CELERY_QUEUES = (
    Queue('hrbonz', Exchange('hrbonz'), routing_key='hrbonz'),
)

Tasks

def _rescore(pk):
    """recalculate the total score for the given article pk

    returns a tuple with pk, old score and new score

    :param pk: an :class:`items.models.Article` id
    :type pk: int
    """
    try:
        article = items.models.Article.objects.get(pk=pk)
    except items.models.Article.DoesNotExist:
        return
    old_score = article.score
    pk = article.id
    article.rescore(async=False, doit=True)
    return (pk, old_score, article.score)

@task(name='items.tasks.rescore')
def rescore(pk):
    _rescore(pk)

@task(name='items.tasks.rescore_result', ignore_result=False)
def rescore_result(pk):
    return _rescore(pk)
@ask
Member
ask commented Dec 4, 2013

I actually haven't seen logs about this either even though I know keys have been dropped, thought maybe it had changed in some recent version.

Chord for Redis could be optimized substantially (from O(N) to O(1) I believe), by keeping an index of the completed tasks.

@naftulikay
Contributor

@hrbonz See related issue #1671, created by me. (Chains do not work with nested canvas objects.)

@carolinux
carolinux commented Nov 23, 2016 edited

I also couldn't get the chord to execute properly, no matter what backend I used - the callback would be ignored and only executed if I stopped the workers and started them back up. So I cheated, I suppose, by implementing the logic manually.

I think chords are implemented internally the same way (with polling) at least for some backends. This code below now uses a group to execute the tasks, and when the group has finished, it executes the callback. Correct me if I'm wrong but I believe this is exactly what chords are.

   import time

   from celery import group

    tasks = [a list of celery task signatures] # this would be the actual chords
    ret = group(tasks).apply_async() 
    while(True):
        if ret.ready():
            # TODO: here should check for failure!
            results = [r.result for r in ret.results]
            inputs_to_callback = results
            # now I execute the callback task! :D
            return callback_task.s(inputs_to_callback, kwarg1=1, kwarg2="foo").apply() # this returns a celery.result.EagerResult
        time.sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment