Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chord on_error callback does not work as documented #3709

Open
NickStefan opened this issue Dec 20, 2016 · 22 comments

Comments

@NickStefan
Copy link

commented Dec 20, 2016

Checklist

Steps to reproduce

file1.py

from celery import chord, group
from app.graph.tasks import on_chord_error, add, xsum

c = (group(add.s(i, i) for i in range(1)) | xsum.s().on_error(on_chord_error.s())).delay()

tasks.py

@shared_task
def add(x, y):
    raise Exception("bob")

@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task
def on_chord_error(request, exc, traceback):
    print('calling this random error method!!!!')
    print('Task {0!r} raised error: {1!r}'.format(request.id, exc))

Expected behavior

That the on_chord_error callback would be invoked and would print out "calling this random error method"

Actual behavior

Whether I define an on_error callback or not (e.g. c = (group(add.s(i, i) for i in range(1)) | xsum.s().on_error(on_chord_error.s())).delay() versus c = (group(add.s(i, i) for i in range(1)) | xsum.s()).delay() , i get this same output:

output

django      | [2016-12-20 23:32:14,688: INFO/MainProcess] Received task: app.graph.tasks.add[c5eeb13a-21f6-4f15-ac22-eab0784d2806]
django      | [2016-12-20 23:32:16,068: ERROR/PoolWorker-2] Task app.graph.tasks.add[c5eeb13a-21f6-4f15-ac22-eab0784d2806] raised unexpected: Exception('bob',)
django      | Traceback (most recent call last):
django      |   File "/usr/local/lib/python3.5/site-packages/celery/app/trace.py", line 367, in trace_task
django      |     R = retval = fun(*args, **kwargs)
django      |   File "/usr/local/lib/python3.5/site-packages/celery/app/trace.py", line 622, in __protected_call__
django      |     return self.run(*args, **kwargs)
django      |   File "/usr/src/app/graph/tasks.py", line 44, in add
django      |     raise Exception("bob")
django      | Exception: bob

It never calls the on_error handler.

normal circumstances work:

If I rewrite add from

@shared_task
def add(x, y):
    raise Exception("bob")

to this:

@shared_task
def add(x, y):
    return x + y

then everything works. So I believe one of three things is going on:

  • the documentation regarding group / chord error handling is incorrect
  • chord / group error handling does not work
  • i'm misinterpreting how error handling is supposed to work in celery (i have scoured the internet for good examples, but i cannot find any good examples of people propagating errors from a group of subprocesses. I would think this would be a very common thing!)

What is the celery equivalent of something like this from Node.js land?

var async = require('async');
async.auto({
     thing1: function(){},
     thing2: function(){}
}, function(err, results) {
    if (err) {
         // do stuff!
   }
   // do normal stuff post batch of stuff
})
@NickStefan

This comment has been minimized.

Copy link
Author

commented Dec 21, 2016

I am able to trigger the on_chord_error callback if I change add
from this

@shared_task(bind=True)
def add(self, x, y):
    raise Exception('bob')

to this

@shared_task(bind=True)
def add(self, x, y):
    self.update_state(state="FAILURE")

However, it doesnt call the on error callback with the correct arguments, AND it still calls the xsum method.

I still think this issue either points to missing documentation or a bug

@NickStefan

This comment has been minimized.

Copy link
Author

commented Dec 21, 2016

I found the issue! This is definitely related #3574

@NickStefan

This comment has been minimized.

Copy link
Author

commented Dec 21, 2016

#3597 also

@NickStefan

This comment has been minimized.

Copy link
Author

commented Dec 21, 2016

the code from here #3323 seems to be the cause

@NickStefan

This comment has been minimized.

Copy link
Author

commented Dec 21, 2016

callback and callback error doesn't work with one task (callback never called):

        callback = xsum.s().on_error(do_on_error.s())
        tasks = [ add.s(4, 4) ]
        c = chord(tasks)(callback)

callback and callback error does work with two tasks

        callback = xsum.s().on_error(do_on_error.s())
        tasks = [ add.s(4, 4), add.s(4, 5) ]
        c = chord(tasks)(callback)
@br1anjtuck

This comment has been minimized.

Copy link

commented Jan 30, 2017

Any new progress on this issue? I ran into the same issue that @NickStefan experienced, and unless I am misinterpreting the documentation, it does seem like the callback should be called for proper chord error handling

@NickStefan

This comment has been minimized.

Copy link
Author

commented Jan 30, 2017

@br1anjtuck we just made some helpers that always add a blank task when using chords (to ensure there are always at least 2 tasks)

@shared_task(bind=True)
def celery_bug_fix(self, *args, **kwargs):
    '''
    celery chords only correctly handle errors with at least 2 tasks, 
    so we always append a celery_bug_fix task
    https://github.com/celery/celery/issues/3709
    '''
    pass


@br1anjtuck

This comment has been minimized.

Copy link

commented Jan 31, 2017

Ok. So with the added helper task above, are you getting the user defined callback (your "do_on_error" task) to occur or are you getting a simple Chod Callback? (i.e. something like below):

Chord callback for '599a6689-3737-4e92-b774-9c4b56885fea' raised: ChordError(u"Dependency ec44e5c6-1396-44d9-acf7-86bdffaf0a10 raised RuntimeError('BLAH',)",)

Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/backends/redis.py", line 281, in on_chord_part_return
    callback.delay([unpack(tup, decode) for tup in resl])
  File "/usr/lib/python2.7/site-packages/celery/backends/redis.py", line 234, in _unpack_chord_result
    raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
ChordError: Dependency ec44e5c6-1396-44d9-acf7-86bdffaf0a10 raised RuntimeError('BLAH',)",)

I am only getting the above chord callback error and I am not getting my defined callback error "on_chord_error" when I define the SAME tasks that you have from your first cell of this post, and run a group of to tasks chained to the "xsum" callback (exactly like you did in the second part of the cell before I first posted) :

@shared_task
def on_chord_error(request, exc, traceback):
    print('\nTask {0!r} raised error: {1!r}\n'.format(request.id, exc))

I am supposed to getting this "on_chord_error" callback error, right? Or should it only be the Chord callback?

@NickStefan

This comment has been minimized.

Copy link
Author

commented Jan 31, 2017

yes, i am getting the user defined callback when I always make sure to add a task (to ensure theres always at least two)

       callback = xsum.s().on_error(do_on_error.s())
        tasks = [ add.s(4, 4), celery_bug_fix.s() ]
        c = chord(tasks)(callback)
@br1anjtuck

This comment has been minimized.

Copy link

commented Jan 31, 2017

Hmm..interesting. I don't know why, but with the same code, I am only getting the Chord callback and am not getting the user-defined callback. The only time I get the user-defined callback error is when I raise an exception in the "xsum" task rather than in the "add" task.

@br1anjtuck

This comment has been minimized.

Copy link

commented Jan 31, 2017

Are you doing the below instead of raising an exception in your task?:

@shared_task(bind=True)
def add(self, x, y):
    self.update_state(state="FAILURE")
@NickStefan

This comment has been minimized.

Copy link
Author

commented Feb 1, 2017

Just doing regular exceptions. It was definitely tricky to get working, but yes an exception in add task is triggering the error callback

@xealot

This comment has been minimized.

Copy link

commented May 12, 2017

Yeah, I'm experiencing this as well. Seems to also be reported in #3317

@rholloway

This comment has been minimized.

Copy link

commented May 30, 2017

@br1anjtuck did you ever figure out a solution to your problem? I seem to have the same situation as you with 4.0.2. My custom callback is called if the exception occurs in the chord callback (xsum), but if it occurs in individual task (add) it's never called.

I've tried sticking on_error onto each task in the header, onto the chord callback, onto the entire chord, everywhere. No luck.

@chrisbrantley

This comment has been minimized.

Copy link

commented Jun 20, 2017

I'm having the same problem. The on_error is simply never called. It doesn't seem to matter how many tasks I have in the group. Very frustrating.

JivanAmara added a commit to JivanAmara/eventkit-cloud that referenced this issue Jul 15, 2017
Replace TaskFactory.parse_tasks() chord with a chain & hack FinalizeR…
…unTask to work around

  problem with chord error handling.
See celery/celery#3709.
jsvrcek added a commit to EventKit/eventkit-cloud that referenced this issue Jul 17, 2017
Finalize hook (#213)
* Update finalize_export_provider_task to perform run_finished tasks via new task
  factory create_run_finished_task_chain instead of in-function.
Created prepare_export_zip task to contain zip code.
Created demo qgis_task to demo what hooked tasks will look like.

* Update TaskFactory.parse_tasks to collect provider-group tasks into a celery group, then submit the celery group instead of submitting each individually.

* Add tasks.FinalizeRunHookTaskRecord model. Add tasks.export_tasks.FinalizeRunHookTask as base to update record on start & finish.

* Finish up export_tasks.FinalizeRunHookTask with tests.

* Add test for FinalizeRunHookTask.record_task_state().

* Add timeout & failure checks for integration tests.

* Bump verbosity level to 2 for integration tests.

* Add check for invalid FinalizeRunHookTask new_zip_filepaths.

* Fix TestJob.setUp() urls.

* Restore zip_file_task kw params.

* Update cancel_export_provider_task to set status of the export run containing the provider task.

* Update task factory tests with immutable signatures.

* Add tests for new tasks.

* Fix export provider zip issue (Redmine #23218).

* Up default timeout for integration test cases from 45s -> 90s.

* Rename export_tasks.qgis_task -> example_finalize_run_hook_task.

* Renamed model ExportTaskResult -> FileProducingTaskResult.

* Added (not-yet-primary) FileProducingTaskResult.id and ExportTask.new_result, and populated both.

* Change over FileProducingTaskResult pk, rename ExportTask.new_result -> result.

* Update code to use new FileProducingTaskResult <-> ExportTask relationship.

* Update FinalizeRunHookTask to save files produced to FileProducingTaskResult..

* Fix migration conflicts

* Minor updates & fixes from rebase to master.

* Fix migration issue #25018.

* Updated FinalizeRunTask so problems in celery canvas don't cause endless chord unlock tasks.

* Update migration 0016 to account for ExportTask instances with no result.

* Update migration 0016 to use existing id value instead of creating new one.

* Remove last pieces of Run updating from provider tasks.
Fixes endless chord unlock for tasks with ExportTask.on_success() exceptions.

* Replace TaskFactory.parse_tasks() chord with a chain & hack FinalizeRunTask to work around
  problem with chord error handling.
See celery/celery#3709.

* Ensure finalize_run_task gets run, even in the event of an error in the run.
@vitenbergd

This comment has been minimized.

Copy link

commented Oct 3, 2017

Dunno if it will be useful, but that's what i found out:

test.py

import time
from celery import Celery
from celery.result import AsyncResult, GroupResult
from celery import group, chord

app = Celery('task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def ok():
    time.sleep(3)
    return True

@app.task
def err():
    raise ValueError

@app.task
def callback(res):
    print("Callback invoked: {0}".format(res))

@app.task
def errback0(request, exc, traceback):
    print("Chord error callback #0!")
    print('Task {0!r} raised error: {1!r}'.format(request.id, exc))

@app.task
def errback1():
    print("Chord error callback #1!")

@app.task
def errback2(*args, **kwargs):
    print("Chord error callback #2!")
    print("args:", *args)
    print("kwargs:", **kwargs)

@app.task
def errback3(arg):
    print("Chord error callback #3!")
    print("arg:", arg)

Now let's run some tasks.

1 - no err.s() tasks, so callback.s() should be invoked

from test import *
cb = callback.s().on_error(errback0.s())
tasks = [ok.s() for i in range(3)]
res = chord(tasks)(cb)

Result:
1 - works as expected, callback task invoked, not so interesting

2 - let's add some error task, same callbacks, errback0 - on_error callback implementation from documentation

tasks.append(err.s())
res = chord(tasks)(cb)

Result:
2 - no callbacks were invoked. So, same issue as mentioned above.

3 - let's try another on_error callback, takes no arguments, and it's signature is immutable - errback1

cb = callback.s().on_error(errback1.si())
res = chord(tasks)(cb)

Result:

[2017-10-03 17:11:17,894: INFO/MainProcess] Received task: test.ok[d10d1c72-02e1-4618-a851-a267f7def2d2]  
[2017-10-03 17:11:17,895: INFO/MainProcess] Received task: test.ok[8cfa0ca4-2c41-450d-8509-47b1a9ff03c7]  
[2017-10-03 17:11:17,896: INFO/MainProcess] Received task: test.ok[0e5ec088-9a3d-49cb-a464-657783d5195c]  
[2017-10-03 17:11:17,897: INFO/MainProcess] Received task: test.err[2016f02d-e07c-487f-98bf-ad551f68cc49]  
[2017-10-03 17:11:17,899: ERROR/ForkPoolWorker-2] Task test.err[2016f02d-e07c-487f-98bf-ad551f68cc49] raised unexpected: ValueError()
Traceback (most recent call last):
  File "/home/dev/.virtualenvs/celery/lib/python3.6/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/dev/.virtualenvs/celery/lib/python3.6/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/dev/projects/python/celerytest/test.py", line 15, in err
    raise ValueError
ValueError
[2017-10-03 17:11:20,899: INFO/ForkPoolWorker-3] Task test.ok[0e5ec088-9a3d-49cb-a464-657783d5195c] succeeded in 3.0017932470000233s: True
[2017-10-03 17:11:20,900: INFO/ForkPoolWorker-4] Task test.ok[8cfa0ca4-2c41-450d-8509-47b1a9ff03c7] succeeded in 3.003801435999776s: True
[2017-10-03 17:11:20,900: ERROR/ForkPoolWorker-1] Chord callback for '70329930-11fe-4fc6-885f-9037e8e839c4' raised: ChordError("Dependency 2016f02d-e07c-487f-98bf-ad551f68cc49 raised ValueError('',)",)
Traceback (most recent call last):
  File "/home/dev/.virtualenvs/celery/lib/python3.6/site-packages/celery/backends/redis.py", line 290, in on_chord_part_return
    callback.delay([unpack(tup, decode) for tup in resl])
  File "/home/dev/.virtualenvs/celery/lib/python3.6/site-packages/celery/backends/redis.py", line 290, in <listcomp>
    callback.delay([unpack(tup, decode) for tup in resl])
  File "/home/dev/.virtualenvs/celery/lib/python3.6/site-packages/celery/backends/redis.py", line 243, in _unpack_chord_result
    raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
celery.exceptions.ChordError: Dependency 2016f02d-e07c-487f-98bf-ad551f68cc49 raised ValueError('',)
[2017-10-03 17:11:20,911: INFO/MainProcess] Received task: test.errback1[16f1dedf-4678-4def-98a9-2008a37a4223]  
[2017-10-03 17:11:20,911: INFO/ForkPoolWorker-1] Task test.ok[d10d1c72-02e1-4618-a851-a267f7def2d2] succeeded in 3.015284213997802s: True
[2017-10-03 17:11:20,912: WARNING/ForkPoolWorker-3] Chord error callback #1!

3 - errback1 was invoked, so i suppose we can do the same trick with errback0

cb = callback.s().on_error(errback0.si())
res = chord(tasks)(cb)

but it's not working. In case of immutable signature nothing is passed to error callback, but there are some args in the task difinition, so i suppose that is the reason of failure here. And i also think documentation in that case is somehow misleading.

4 It seems like some arguments issue(?), ok let's try another one errback2 and
inspect arguments passed to error callback:

cb = callback.s().on_error(errback2.s())
res = chord(tasks)(cb)

Result:
4 - Worked for me (some part of log is omitted):

[2017-10-03 17:12:09,478: WARNING/ForkPoolWorker-3] Chord error callback #2!
[2017-10-03 17:12:09,478: WARNING/ForkPoolWorker-3] args:
[2017-10-03 17:12:09,478: WARNING/ForkPoolWorker-3] 4a484b7c-d2e1-43c2-a4b0-8758a8caa896
[2017-10-03 17:12:09,478: WARNING/ForkPoolWorker-3] kwargs:
[2017-10-03 17:12:09,478: INFO/ForkPoolWorker-3] Task test.errback2[92683879-9164-402f-a691-0c08854dc1c7] succeeded in 0.0006017090017849114s: None

We can see here that it's only 1 argument with string, which is the id of chord's callback, so errback3 will work abosuletly the same way.

I've tried 4.1/4.0.2 versions of celery.
So, you can:

  • use on_error callback with immutable signature (if you don't care about the passed id of chord), but sometimes it's useful to know chord's id
  • use on_error callback with *args, **kwargs arguments
@listingmirror

This comment has been minimized.

Copy link

commented Oct 5, 2017

@vitenbergd thanks a bunch, that was spot on!

Switching to args/kwargs style arguments make the chord error callback work. I am on Python 3.6.2 for what it is worth..

So the question is, does code need fixed, or do we just need to update the docs?

@vitenbergd

This comment has been minimized.

Copy link

commented Oct 10, 2017

@listingmirror i think docs should be updated because there are two cases when .on_error() callback ivocation occures:

example1 chord - group dependency fail
chord([ok_task.s(), fail_task.s()])(my_cool_callback.si().on_error(not_so_cool_callback.s()))
example2 chord - callback fail
chord([ok_task.s(), fail_task.s()])(my_fail_callback.si().on_error(not_so_cool_callback.s()))

  • ChordError - when one of the group dependencies fails (example1), then the only argument passed to
    not_so_cool_callback is string with my_cool_callback's id, cause my_cool_callback was never called and no task context(request) can be passed, i suppose (as metioned in docs):
    @app.task def on_chord_error(request, exc, traceback): print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
    And also in that case not_so_cool_callback will be executed as separate task.
  • Any my_fail_callback (example2, some uncaught exception raised from my_fail_callback) failure will cause not_so_cool_callback invocation, but it seems like it acts like on_failure task handler in the context of my_fail_callback, so it's not a separate task anymore, and request, exception and traceback can be passed to not_so_cool_callback as docs example.

UPD.
Sooo, i've found out some more info on the topic, and therefore my description is not full, so
let's again look at some example of different cases when on_error callback is called:
errcb_args - on_error callback implemented with (*args) or (*args, **kwargs) parameters
errcb_no_args - on_error callback implemented without parameters in the difinition
CE - ChordError one of the dependencies failure
CF - callback failure

  1. CE + chord(tasks)(callback.s().on_error(errcb_args.s())) - errcb_args.s() is invoked as seperate task with 1 argument passed (callback's id). So it works, but not the way docs describe.
  2. CE + chord(tasks)(callback.s().on_error(errcb_no_args.s())) - No error (but 1 argument is passed to the function with no parameters defined, seems like it's not wrapped in some protected call function) and also no invokation, nothing...
  3. CE + chord(tasks)(callback.s().on_error(errcb_args.si())) (immutable on_error callback signature) - errcb_args.si() is invoked as seperate task, no args passed. Works as expected.
  4. CE + chord(tasks)(callback.s().on_error(errcb_no_args.si())) (immutable on_error callback signature) - works like №3, nothing special
  5. CF + chord(tasks)(callback.s().on_error(errcb_args.s())) - errcb_args.s() invocation occures but with NO separate task (i really don't know why), the arguments passed are as described in docs (request, exc, tb). It seems like different implementation. And as i described above on error callback is called inside the callback task context
  6. CF + chord(tasks)(callback.s().on_error(errcb_no_args.s())) - Here the same arguments passed as in case №5, and error callback definition is checked and exception from protected call function raised, because no args were expected, no task and it's not called, but at least there is exception (see case №2 again)
  7. CF + chord(tasks)(callback.s().on_error(errcb_args.si())) (immutable on_error callback signature) - errcb_args.si() similar with №6, but this time no args passed, so works almost as i expected, but where is the separate task?
  8. CF + chord(tasks)(callback.s().on_error(errcb_no_args.si())) (immutable on_error callback signature) - here's the best part, no args passed, but separate task is invoked
@auvipy

This comment has been minimized.

Copy link
Member

commented Dec 19, 2017

can anyone verify the status on master branch?

@auvipy auvipy added this to the v4.2 milestone Dec 19, 2017

chrismeyersfsu added a commit to chrismeyersfsu/awx that referenced this issue Jan 4, 2018
handle_work_error signature to work
* celery error callback signature isn't well defined. Thus, our error
callback signature is made to handle just about any call signature and
depend on only 1 attribute, id, existing.

See celery/celery#3709

@auvipy auvipy modified the milestones: v4.2, v5.0.0 Jan 13, 2018

@xirdneh

This comment has been minimized.

Copy link
Member

commented Jul 23, 2018

Are there any current efforts on this?

@auvipy auvipy modified the milestones: v5.0.0, v4.3 Jul 23, 2018

@xirdneh

This comment has been minimized.

Copy link
Member

commented Jul 24, 2018

I've been doing some testing on this.
The callback is getting called on error with different types of chords.
Still documentation is wrong because the error task gets called only with a result id.
The result id corresponds to the ChordError result. The documentation states that a request, exc and traceback will get passed to the error callback.

@fjsj

This comment has been minimized.

Copy link

commented Aug 14, 2019

@auvipy Debugging some of this, behavior of CE + chord(tasks)(callback.s().on_error(errcb_args.s())) [1] is definitely wrong as per docs. Relevant code seems to be here:

).apply_async((callback.id,))
(this is current master code, so it's still wrong)

[1] #3709 (comment)

@auvipy auvipy modified the milestones: 4.7, 4.5 Aug 15, 2019

@auvipy auvipy self-assigned this Aug 15, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.