Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Dynamic tasks #817

Closed
wants to merge 2 commits into from
Closed

Feature: Dynamic tasks #817

wants to merge 2 commits into from

Conversation

steeve
Copy link
Contributor

@steeve steeve commented Jun 21, 2012

So I submit to you dynamic tasks, and dynamic chords. This is a really powerful concept, and I think it changes the game.

Subtasks returned by dynamic tasks are executed right after the first task executes. As if they were in a chain.
You can also return chains and chords, and they will be properly inserted.

This allows you to design a pipeline that can be completely dynamic, while benefiting from Celery's powerful idioms (subtasks, chains, chords...).

Our whole backend at @veezio is powered by these. They allow use to have extensively dynamic pipelines.
We have pipes inside chords, chords inside chords, tasks put before pipes etc....

To be honest, I can't think of something you can't do with this.

How to use:

import celery.contrib.dynamic

@celery.dynamic_task
def test(number):
    if number > 5:
        return more.s()
    return less.()


@celery.dynamic_task
def more():
    print "it's more !"

@celery.dynamic_task
def less():
    print "it's less !"

But you can do cool shit too!

@celery.dynamic_task
def one(n):
    if n:
        return three.si(n + 4) | four.s() # if you don't use an immutable task, the result of one (a subtask) will be sent to three.
    return n
pipe = one.s(1) | two.s()

pipe.apply_async() # will run: one(1) | three(1+4) | four | two

You can also use them in chords! And have chords in chords!:

from celery.contrib.dynamic import dynamic_chord as chord

@celery.dynamic_task
def resolve_url(url):
    # do some stuff, and paralellize all that
    return chord(map(do_stuff.si, data), on_finished.s())

resolve_urls = chord(map(resolve_url.si, urls), url_resolved.s())

In that case url_resolved will be called with the results from on_finished(), which is:

[ [..., ], [..., ] ]

@ask
Copy link
Contributor

ask commented Jun 23, 2012

Interesting! Does it work with other serializers than pickle?

I moved it to the 2.7 milestone, need to get 2.6 out the door first

@steeve
Copy link
Contributor Author

steeve commented Jun 23, 2012

it does. used it with pickle, json and msgpack.
wise to put it for 2.7 indeed

@steeve
Copy link
Contributor Author

steeve commented Aug 14, 2012

Major update to the dynamic design. Waiting for your feedback @ask :)

@steeve
Copy link
Contributor Author

steeve commented Aug 20, 2012

Should I merge this ??

@ask
Copy link
Contributor

ask commented Aug 22, 2012

I'm not sure, I'm reluctant to add new stuff into contrib, if something really is useful why can't we support it in core?
Is there any way the regular task can be extended to support this?

Why is the calling of the tasks delayed to until after the task has returned?
I don't see the benefit over:

 return task.s(2)

instead of:

 return task.delay(2)

@steeve
Copy link
Contributor Author

steeve commented Aug 22, 2012

Yes, the returned subtask is placed a the new callback for the current task, and the old callback as a callback to the returned subtask.

The rationale for returning subtasks is that you can now do this:

@app.dynamic_task
def one():
    return two.si() | three.s()

@app.dynamic_task
def two():
    return four.si()

@app.dynamic_task
def three():
    return 3

@app.dynamic_task
def four():
    return 4

one.apply_async()

And have one | two | four | three(4) => 3 executed in this order.
That said, I'm all for supporting in core.
This allows to have dynamic canvas, pretty much, where, instead of the whole thing being laid out before, it can evolve, branch, merge etc... completely dynamically.

Admitedly though, having it in return is just syntax sugar.

@steeve
Copy link
Contributor Author

steeve commented Aug 22, 2012

For instance, here is one of our use:

@app.dynamic_task
def _get_freebase_object_by_guid(guid):
    query = DEFAULT_MQL_QUERY.copy()
    query["id"] = guid
    return _mql_read.si(query) | _on_get_freebase_object_by_guid.s()

@app.dynamic_task
def _on_get_freebase_object_by_guid(mql_result):
    if mql_result.get("result"):
        return _add_text(mql_result["result"])

And all of this executes as part of a big pipeline.

@steeve
Copy link
Contributor Author

steeve commented Aug 28, 2012

Also, by the way, regular tasks could be extended to support this, but we might now want to change their behaviour?
Or perhaps via a configuration entry ?

In any case, yes, it's possible to change the task decorator to support this.

@steeve
Copy link
Contributor Author

steeve commented Aug 28, 2012

But I'd rather have a @app.dynamic_task decorator, as to have people who use it aware of what it does.

@steeve
Copy link
Contributor Author

steeve commented Aug 28, 2012

Also, this enables stuff like:
chord([a.s() | b.s()], c.s())

Even if a and b are dynamic.

And chords inside chords with chains etc...

@ask
Copy link
Contributor

ask commented Aug 31, 2012

Hmm, but dynamic task is maybe not descriptive enough, at least I don't get what it does by the name, regular tasks are "dynamic" after all.

It could be something like @task(dynamic=True) except dynamic is a less generic term.

@thekashifmalik
Copy link

Dynamic tasks made me think of dynamically defining tasks during execution. Maybe a more suitable name would be 'Support for dynamic chaining' instead of 'Support for dynamic tasks'?

@dalbani
Copy link

dalbani commented Dec 13, 2012

I've been using dynamic tasks for a while now and it works fine so far.
In regards to my needs, I really couldn't make it without this feature.

However, I'm wondering if it could be the cause of an issue I have.
In my RabbitMQ instance, thousands (!) of queues are created and that seems to bog down the beam.smp process, which RES memory usage grows rapidly to several GB.

@steeve
Copy link
Contributor Author

steeve commented Dec 13, 2012

Thanks for the feedback!

In regards to your queue leaking issue, I bet you are using the AMQP result backend and do not consume the results? This is not related to dynamic tasks.
AMQP backend creates a queue for each task result.

@steeve
Copy link
Contributor Author

steeve commented Dec 13, 2012

We are also using them in production @veezio for quite some time, works fine. By the way if anyone has a better name than dynamic, I'm all for it :)

@dalbani
Copy link

dalbani commented Dec 13, 2012

Indeed Steeve, I do use the AMQP result backend.
So how could I fix that? If I'm not mistaken, I had to use a result backend, otherwise it wouldn't work.
Should I configure it through CELERY_TASK_RESULT_EXPIRES?

@steeve
Copy link
Contributor Author

steeve commented Dec 13, 2012

Indeed yes, but I think you can put CELERY_IGNORE_RESULTS so that it won't store them.

@ldng
Copy link

ldng commented Dec 13, 2012

First, I'm new to Celery and evaluating it for inclusion in our project. And I'm not sure q fully understand this pull request. Let say I have a queue of three tasks :
t_1
t_2
t_3

But some times I would have :
t_1
t_A
t_2
t_3
where t_3 must wait on t_A i present in the queue.
Would your task chaining system allow me to dynamically make t_3 depends on t_A if present ?
Or is it already do able with current Celery ?
If it solves that problem and is not already doable, it would be a great feature to include !

@steeve
Copy link
Contributor Author

steeve commented Dec 14, 2012

dynamic tasks would allow you to do this:

chain = t_1.si() | t_2.s() | t_3.s()

def t_1():
    if something:
        return t_A.si(some, args)
    return some_result

def t_A(some, args):
    return some_other_result

Then t_A would get executed before (and give its result to) t_2 but only if something :)

In the end the pipe would be as if: t_1.si() | t_A.si(some, args) | t_2.s() | t_3.s() of course where some and args are t_1 locals.

@dalbani
Copy link

dalbani commented Dec 14, 2012

Steeve, I've tried setting CELERY_IGNORE_RESULTS = True but then the whole dynamic behavior seemed to stop working.
Instead, I've used CELERY_TASK_RESULT_EXPIRES = timedelta(minutes = 2). However, even with a modest dynamic tasks setup, I still get up to 1 or 2 thousand queues apparently.
And I think there is a risk in lowering CELERY_TASK_RESULT_EXPIRES more, as that could break long running tasks, couldn't it?

@dalbani
Copy link

dalbani commented Dec 14, 2012

I've also thought of using another result backend, like Memcache.
However I get errors like :

[2012-12-14 19:02:46,190: CRITICAL/MainProcess] Task celery.checkpoint[e94f0eac-e8b4-47b7-aaca-66338093585c] INTERNAL ERROR: TypeError("object of type 'NoneType' has no len()",)
Traceback (most recent call last):
  File "/home/ubuntu/venv/local/lib/python2.7/site-packages/celery/task/trace.py", line 266, in trace_task
    on_chord_part_return(task)
  File "/home/ubuntu/venv/local/lib/python2.7/site-packages/celery/backends/base.py", line 475, in on_chord_part_return
    if val >= len(deps):
TypeError: object of type 'NoneType' has no len()

@Rigdon
Copy link

Rigdon commented Apr 4, 2013

I just wanted to throw in and say this sounds like an excellent idea. I'm currently building a processing framework and right now we're pre-defining everything up front. This capability would give us much greater flexibility.

@steeve
Copy link
Contributor Author

steeve commented Apr 6, 2013

Perhaps we could provide this feature as a recipe maybe ?

But I agree with @ask that it maybe needs a better name

@baconalot baconalot mentioned this pull request Apr 8, 2013
@ask
Copy link
Contributor

ask commented Apr 11, 2013

I don't doubt that this is useful, I just don't like introducing different @task decorators as I think that will be confusing for the user. I also don't want to add major features to celery.contrib, as that has proven to be a bad idea in the past. If it's generically useful it should be in core, and if it's a special use case it should probably be in a separate library. It's probably possible to support this in the existing @task decorator.

@ask
Copy link
Contributor

ask commented Apr 11, 2013

And yeah, a recipe is also possible. But then the recipe should be simple enough that the user don't have to copy and paste a lot of code. If there's too much code, parts of it can be in core (but integrated rather than hidden in contrib), or there can be a separate library.

@JonPeel
Copy link

JonPeel commented Apr 18, 2013

Any pointers on how one would go about inspecting dynamically created tasks? I've just been toying with this addition try and break down one large, beastly and very specific task into lots of different generic sub tasks. That much works really (!) well but I feel like I've lost touch with that top level task and can't easily determine when it's 'done'.

@dbrgn
Copy link
Contributor

dbrgn commented May 24, 2013

After a lot of playing around with these new tasks, I think I mostly understood them.

One thing left open for me: I want to pass on data up through multiple levels of dynamic tasks. Currently I'm doing this:

testtasks.py

import os
import time
import random
from celery import Celery, group
from celery.utils.log import get_task_logger
import worker.dynamic_task
from worker.dynamic_task import dynamic_chord

celery = Celery('hello',
    backend=os.environ['BROKER_URL'],
    broker=os.environ['BROKER_URL'])
logger = get_task_logger(__name__)


@celery.dynamic_task
def pass_on(*args):
    return args

@celery.dynamic_task
def one(name):
    logger.warning('[1] ' + name)
    time.sleep(1)
    amount = random.randint(2, 3)
    subtasks = [two.si(name, '{} von {}'.format(i + 1, amount))
            for i in range(amount)]
    chord = dynamic_chord(subtasks, pass_on.s())
    return chord

@celery.dynamic_task
def two(name, label):
    logger.warning(' [2] {} | {}'.format(name, label))
    time.sleep(1)
    amount = random.randint(2, 3)
    subtasks = group([thr.si(name, '{} von {}'.format(i + 1, amount))
            for i in range(amount)])
    return subtasks

@celery.dynamic_task
def thr(name, label):
    logger.warning('  [3] {} | {}'.format(name, label))
    time.sleep(1)
    return 'URL'

@celery.dynamic_task
def notify(arg):
    logger.warning('[NOTIFY] done!')
    logger.warning('Arg: {}'.format(arg))

test.py

from worker.dynamic_task import dynamic_chord
from testtasks import *

maintask = dynamic_chord(map(one.si, ['A', 'B', 'C']), notify.s())
maintask.apply_async()

This prints the following:

[2013-05-24 20:58:26,519: WARNING/MainProcess] testtasks.notify[b75be245-799c-422b-97a6-daaf93848fb8]: [NOTIFY] done!
[2013-05-24 20:58:26,519: WARNING/MainProcess] testtasks.notify[b75be245-799c-422b-97a6-daaf93848fb8]: Arg: [[['URL', 'URL', 'URL'], ['URL', 'URL'], ['URL', 'URL']], [['URL', 'URL'], ['URL', 'URL'], ['URL', 'URL', 'URL']], [['URL', 'URL', 'URL'], ['URL', 'URL'], ['URL', 'URL']]]

Which is good, because I'm passing up data from the lowest level to the top level, without having a blocking task that waits for all subtasks to finish.

The point is that I need a "dummy callback" that passes on all data from one level to the next. Wouldn't this snippet also make sense in dynamic_task.py, analogous to dynamic_chord?

def dynamic_group(tasks):
    return group([(task | subtask("celery.checkpoint")) for task in tasks])

This way I don't need the dummy callback and everything still works. (But maybe I've overlooked some bad implicatoins or an easier alternative.)


As a sidenode, I can't get this to work together with Django / djcelery. This traceback shows up when using dynamic_chord: https://gist.github.com/dbrgn/5646021

@dbrgn
Copy link
Contributor

dbrgn commented Jul 30, 2013

Any news on design decisions about this feature? In the past weeks I've seen a lot of use cases where this feature would be perfectly suited and much needed.

@gotpredictions
Copy link

It would come in handy for us too. Especially the dynamic chord. Currently we need to have one of the tasks holding up the process to keep checking for status.

See my post in celery-users. I am hoping this is the solution for it: https://groups.google.com/forum/#!topic/celery-users/eNGbLlAwhi0

@chris-hailstorm
Copy link

Is this being considered for a Celery release? This is very useful.

@ask
Copy link
Contributor

ask commented Aug 28, 2013

As I wrote earlier in the thread, if this is useful then it should be possible to implement this in the existing 'task' decorator. Adding things to celery.contrib has been a mistake in the past and it will be confusing to have two decorators with wildly different semantics.

@ask
Copy link
Contributor

ask commented Aug 28, 2013

If merging this as-is I imagine having to ask users 'do you use tasks or dynamic tasks?' when responding to bug reports and that is something I want to avoid :)

The dynamic task basically links the return value to the currently executing task so this should be possible to do with existing tasks, but more explicit e.g.::

@app.task(bind=True)
 def one(self, i):
     if i > 3:
         return self.link(two.s(i))
     return i + 1

Not sure I like the name link, so hope someone can figure out a better name.

@chris-hailstorm
Copy link

self.replace or self.subst?

self.transmogrify? :)

On Wednesday, August 28, 2013, Ask Solem Hoel wrote:

If merging this as-is I imagine having to ask users 'do you use tasks or
dynamic tasks?' when responding to bug reports and that is something I want
to avoid :)

The dynamic task basically links the return value to the currently
executing task so this should be possible to do with existing tasks, but
more explicit e.g.::

@app.task(bind=True)
def one(self, i):
if i > 3:
return self.link(two.s(i))
return i + 1

Not sure I like the name link, so hope someone can figure out a better
name.


Reply to this email directly or view it on GitHubhttps://github.com//pull/817#issuecomment-23408197
.

@dbrgn
Copy link
Contributor

dbrgn commented Aug 29, 2013

I think both replace and subst or substitute are better than link.

@Catincan
Copy link

A new pledge is available on this issue: https://www.catincan.com/bounty/https-github-com-celery-celery-pull-817 .

@mehcode
Copy link

mehcode commented Sep 17, 2013

What's wrong with just auto-detecting if a Signature is returned from a task and replacing it?

@app.task
 def one(i):
     if i > 3:
         return two.s(i)
     return i + 1

Also, something like this should just work (chains in chords in chords in chains ... .etc.):

chord(map(...), chain(chord(...), chord(...., chain(...))))

It doesn't at the moment with this patch.


On another note. When I replayed this PR on top of 3.0 I had to change the following:

retval |= current_task.request.callbacks[0] 

To:

retval |= Signature(current_task.request.callbacks[0])

@ask
Copy link
Contributor

ask commented Oct 7, 2013

@mehcode: We cannot convert any sig in a return value as that would not be backwards compatible.

Regarding the Signature change, you are maybe using the json serializer?
All subtasks must be converted back into subtasks when they are received to be compatible with non-pickle subtasks
the idiom is:

from celery import subtask
retval = subtask(current_task.request.callbacks[0])

@ask
Copy link
Contributor

ask commented Nov 22, 2013

The c32-dynamic branch implements a first draft of Task.replace().

The way it replaces callbacks is no good as I imagine it's only supposed to replace the chain node, not all of the callbacks, but this may be improved when the new 'chain' message field is added (also for 3.2).

Dynamic chord I'm not sure about, what does it solve?

Btw, that cantincan site seems pretty useless as it's more work to fill in the forms than it's to simply implement a feature :)

@dbrgn
Copy link
Contributor

dbrgn commented Nov 22, 2013

Dynamic chord I'm not sure about, what does it solve?

If a task can be replaced by a chord using the Task.replace() method, then we don't need explicit dynamic chords. Otherwise, it's definitely a feature that should be implemented in order to create dynamic workflows.

Example: I have a task that parses URLs of different types. For each URL, according to the type, a series of tasks should be executed. But each URL can be processed in parallel. So the main task should return a group of chords. For example:

- maintask (processes 3 URLs)
-- process_url_1 (type A)
   1. download url
   2. process results
   3. finish processing and write resulting data to database
-- process_url_2 (type A)
   1. download url
   2. process results
   3. finish processing and write resulting data to database
-- process_url_3 (type B)
   1. download url
   2. do something else than with type A URLs
   3. send an email if a condition is matched

This can be implemented if the maintask can replace itself with a group of process_url tasks. Each of those process_url tasks would then process the URL and - depending on the type of the URL - replace itself with chord_a(url) or chord_b(url).

@mehcode
Copy link

mehcode commented Nov 25, 2013

@ask This task.replace looks like it can be used to fulfill our use case for dynamic (but I would say conditional so .replace fits) tasks.

And yes, I was using the JSON serializer.

@ryanhiebert
Copy link
Contributor

The .replace feature by @ask looks great for my use-case of dynamic tasks. What can I do to help it get released? Does it need some testing?

@AlJohri
Copy link

AlJohri commented Mar 18, 2014

+1 what can be done to help get the .replace feature released?

@ask
Copy link
Contributor

ask commented Mar 28, 2014

Documentation and tests would help, as well as further defining the semantics of this feature and how it works with the canvas (e.g. chord)

@dbrgn
Copy link
Contributor

dbrgn commented Oct 15, 2014

The add_to_chord and replace_in_chord methods have been added recently, thanks @ask.

I just tested the new feature. This is the base script I used:

# test.py
from celery import Celery, chord
from celery.utils.log import get_task_logger

app = Celery('test', backend='redis://localhost:6379/10', broker='redis://localhost:6379/11')
app.conf.CELERY_ALWAYS_EAGER = False

logger = get_task_logger(__name__)

@app.task(bind=True)
def get_one(self):
    return 1

@app.task
def get_two():
    return 2

@app.task
def sum_all(data):
    logger.error(data)
    return sum(data)

if __name__ == '__main__':
    x = chord(get_one.s() for i in range(3))
    body = sum_all.s()
    result = x(body)

    print(result.get())

Standard output is 3 (sum of [1, 1, 1]).

Replacement works great:

@app.task(bind=True)
def get_one(self):
    self.replace_in_chord(get_two.s())
    return 1

Output:

[2014-10-15 11:15:32,632: INFO/MainProcess] Received task: test.get_one[2632c0f1-365e-4f20-9db6-8f0cc0769ff0]
[2014-10-15 11:15:32,634: INFO/MainProcess] Received task: test.get_one[288e2b60-b32b-4710-ac40-66364eb33a2d]
[2014-10-15 11:15:32,642: INFO/MainProcess] Received task: test.get_one[f0b5150a-c4a5-461c-886a-d61d10a7ade3]
[2014-10-15 11:15:32,670: INFO/MainProcess] Received task: test.get_two[2632c0f1-365e-4f20-9db6-8f0cc0769ff0]
[2014-10-15 11:15:32,670: INFO/Worker-3] Task test.get_one[2632c0f1-365e-4f20-9db6-8f0cc0769ff0] ignored
[2014-10-15 11:15:32,678: INFO/Worker-4] Task test.get_one[288e2b60-b32b-4710-ac40-66364eb33a2d] ignored
[2014-10-15 11:15:32,678: INFO/MainProcess] Received task: test.get_two[288e2b60-b32b-4710-ac40-66364eb33a2d]
[2014-10-15 11:15:32,682: INFO/Worker-4] Task test.get_two[2632c0f1-365e-4f20-9db6-8f0cc0769ff0] succeeded in 0.00283580459654s: 2
[2014-10-15 11:15:32,685: INFO/Worker-2] Task test.get_one[f0b5150a-c4a5-461c-886a-d61d10a7ade3] ignored
[2014-10-15 11:15:32,686: INFO/MainProcess] Received task: test.get_two[f0b5150a-c4a5-461c-886a-d61d10a7ade3]
[2014-10-15 11:15:32,687: INFO/Worker-2] Task test.get_two[288e2b60-b32b-4710-ac40-66364eb33a2d] succeeded in 0.00108842598274s: 2
[2014-10-15 11:15:32,689: INFO/Worker-4] Task test.get_two[f0b5150a-c4a5-461c-886a-d61d10a7ade3] succeeded in 0.00150422612205s: 2
[2014-10-15 11:15:33,194: INFO/MainProcess] Received task: test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf]
[2014-10-15 11:15:33,195: ERROR/Worker-2] test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf]: [2, 2, 2]
[2014-10-15 11:15:33,197: INFO/Worker-2] Task test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf] succeeded in 0.00115499692038s: 6

The result is 6, which is correct (2+2+2). When I use add_to_chord instead of replace_in_chord though, it doesn't seem to work as intended:

@app.task(bind=True)
def get_one(self):
    self.add_to_chord(get_two.s())
    return 1

Output:

[2014-10-15 11:16:35,170: INFO/MainProcess] Received task: test.get_one[20665696-3e12-4f98-902d-755aa78b392c]
[2014-10-15 11:16:35,172: INFO/MainProcess] Received task: test.get_one[9c78c35f-1678-429d-a05e-bf18c4c6e5f7]
[2014-10-15 11:16:35,175: INFO/Worker-3] Task test.get_one[20665696-3e12-4f98-902d-755aa78b392c] succeeded in 0.00263688992709s: 1
[2014-10-15 11:16:35,177: INFO/MainProcess] Received task: test.get_one[caea3644-7696-421a-ae54-510eaba63029]
[2014-10-15 11:16:35,178: INFO/MainProcess] Received task: test.get_two[821f43da-5742-467a-bb39-b01c1687dc9f]
[2014-10-15 11:16:35,180: INFO/Worker-3] Task test.get_two[821f43da-5742-467a-bb39-b01c1687dc9f] succeeded in 0.000982443802059s: 2
[2014-10-15 11:16:35,182: INFO/MainProcess] Received task: test.get_two[3a484990-d400-4efa-98ce-c1c07a5d8d07]
[2014-10-15 11:16:35,182: INFO/Worker-2] Task test.get_one[caea3644-7696-421a-ae54-510eaba63029] succeeded in 0.00319112883881s: 1
[2014-10-15 11:16:35,184: INFO/Worker-4] Task test.get_two[3a484990-d400-4efa-98ce-c1c07a5d8d07] succeeded in 0.00067730108276s: 2
[2014-10-15 11:16:35,185: WARNING/Worker-4] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:35,200: INFO/Worker-1] Task test.get_one[9c78c35f-1678-429d-a05e-bf18c4c6e5f7] succeeded in 0.0277538709342s: 1
[2014-10-15 11:16:35,201: INFO/MainProcess] Received task: test.get_two[71fb937f-fc4e-4c6d-9e5c-d12827322e0a]
[2014-10-15 11:16:35,201: WARNING/Worker-1] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:35,203: INFO/Worker-3] Task test.get_two[71fb937f-fc4e-4c6d-9e5c-d12827322e0a] succeeded in 0.000652418937534s: 2
[2014-10-15 11:16:35,204: WARNING/Worker-3] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:36,191: INFO/MainProcess] Received task: test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771]
[2014-10-15 11:16:36,193: ERROR/Worker-4] test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771]: [1, 1, 1]
[2014-10-15 11:16:36,194: INFO/Worker-4] Task test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771] succeeded in 0.00115421414375s: 3

The result is 3, not 9 (1+2+1+2+1+2).

Also, another time I started the same script, the following exceptions occured in the celery worker log:

[2014-10-15 11:16:27,236: ERROR/Worker-3] Chord callback u'bfaf94da-23b3-4f2f-b163-016cbac50f43' raised: ValueError(u'bfaf94da-23b3-4f2f-b163-016cbac50f43',)
Traceback (most recent call last):
  File "/home/danilo/Projects/celery/celery/backends/base.py", line 543, in on_chord_part_return
    raise ValueError(gid)
ValueError: bfaf94da-23b3-4f2f-b163-016cbac50f43
[2014-10-15 11:16:27,244: INFO/Worker-3] Task test.get_two[72762358-f0b4-4ae0-bc05-4b084d942e08] succeeded in 0.000811628066003s: 2
[2014-10-15 11:16:27,245: ERROR/Worker-3] Chord callback u'bfaf94da-23b3-4f2f-b163-016cbac50f43' raised: ValueError(u'bfaf94da-23b3-4f2f-b163-016cbac50f43',)
Traceback (most recent call last):
  File "/home/danilo/Projects/celery/celery/backends/base.py", line 543, in on_chord_part_return
    raise ValueError(gid)
ValueError: bfaf94da-23b3-4f2f-b163-016cbac50f43

And the python test.py output showed this exception:

Traceback (most recent call last):
  File "test.py", line 29, in <module>
    print(result.get())
  File "/home/danilo/Projects/celery/celery/result.py", line 177, in get
    raise self.backend.exception_to_python(meta['result'])
celery.backends.base.ChordError: GroupResult bfaf94da-23b3-4f2f-b163-016cbac50f43 no longer exists

@dbrgn
Copy link
Contributor

dbrgn commented Oct 15, 2014

Also, does this only support "simple" task signatures? Replacing a task with a chord doesn't currently seem to work.

@app.task(bind=True)
def get_one(self):
    x = chord(get_two.s() for i in range(3))
    body = sum_all.s()
    sig = x(body)
    self.replace_in_chord(sig)

@ask
Copy link
Contributor

ask commented Oct 15, 2014

On Oct 15, 2014, at 10:21 AM, Danilo Bargen notifications@github.com wrote:

app = Celery('test', backend='redis://localhost:6379/10', broker='redis://localhost:6379/11')

You’re not using redis:///10?new_join=1 for the result backend here,
the test returns 9 for me.

Ask Solem
twitter.com/asksol | +44 07454281208

@dbrgn
Copy link
Contributor

dbrgn commented Oct 15, 2014

You’re not using redis:///10?new_join=1 for the result backend here,

Aah, sorry for forgetting about that! I'll test it again tomorrow.

@dbrgn
Copy link
Contributor

dbrgn commented Oct 16, 2014

Great, seems to work! ✨

@ask ask closed this in 07ecd08 Oct 20, 2014
@flavianh
Copy link

flavianh commented Dec 7, 2014

Would you have working examples of this type of behavior? I cannot reproduce it...

This is my current code:

from celery import Celery

app = Celery('hello', broker='redis://localhost')

@app.task(bind=True)
def hello(self):
    print 'hello world'
    raise self.replace(goodbye.s())

@app.task
def goodbye():
    return 'good bye!'

but I get the error AttributeError: 'hello' object has no attribute 'replace'

I tried both versions 3.1.17 and the master from today (for which I had to git clone and install your amqp and kombu projects).

@dbrgn
Copy link
Contributor

dbrgn commented Dec 9, 2014

@traxair as discussed above, did you enable the Redis result backend with ?new_join=1 enabled?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet