Skip to content

Loading…

Feature: Dynamic tasks #817

Closed
wants to merge 2 commits into from
@steeve
Celery member

So I submit to you dynamic tasks, and dynamic chords. This is a really powerful concept, and I think it changes the game.

Subtasks returned by dynamic tasks are executed right after the first task executes. As if they were in a chain.
You can also return chains and chords, and they will be properly inserted.

This allows you to design a pipeline that can be completely dynamic, while benefiting from Celery's powerful idioms (subtasks, chains, chords...).

Our whole backend at @veezio is powered by these. They allow use to have extensively dynamic pipelines.
We have pipes inside chords, chords inside chords, tasks put before pipes etc....

To be honest, I can't think of something you can't do with this.

How to use:

import celery.contrib.dynamic

@celery.dynamic_task
def test(number):
    if number > 5:
        return more.s()
    return less.()


@celery.dynamic_task
def more():
    print "it's more !"

@celery.dynamic_task
def less():
    print "it's less !"

But you can do cool shit too!

@celery.dynamic_task
def one(n):
    if n:
        return three.si(n + 4) | four.s() # if you don't use an immutable task, the result of one (a subtask) will be sent to three.
    return n
pipe = one.s(1) | two.s()

pipe.apply_async() # will run: one(1) | three(1+4) | four | two

You can also use them in chords! And have chords in chords!:

from celery.contrib.dynamic import dynamic_chord as chord

@celery.dynamic_task
def resolve_url(url):
    # do some stuff, and paralellize all that
    return chord(map(do_stuff.si, data), on_finished.s())

resolve_urls = chord(map(resolve_url.si, urls), url_resolved.s())

In that case url_resolved will be called with the results from on_finished(), which is:

[ [..., ], [..., ] ]
@ask
Celery member

Interesting! Does it work with other serializers than pickle?

I moved it to the 2.7 milestone, need to get 2.6 out the door first

@steeve
Celery member

it does. used it with pickle, json and msgpack.
wise to put it for 2.7 indeed

@steeve
Celery member

Major update to the dynamic design. Waiting for your feedback @ask :)

@steeve
Celery member

Should I merge this ??

@ask
Celery member

I'm not sure, I'm reluctant to add new stuff into contrib, if something really is useful why can't we support it in core?
Is there any way the regular task can be extended to support this?

Why is the calling of the tasks delayed to until after the task has returned?
I don't see the benefit over:

 return task.s(2)

instead of:

 return task.delay(2)
@steeve
Celery member

Yes, the returned subtask is placed a the new callback for the current task, and the old callback as a callback to the returned subtask.

The rationale for returning subtasks is that you can now do this:

@app.dynamic_task
def one():
    return two.si() | three.s()

@app.dynamic_task
def two():
    return four.si()

@app.dynamic_task
def three():
    return 3

@app.dynamic_task
def four():
    return 4

one.apply_async()

And have one | two | four | three(4) => 3 executed in this order.
That said, I'm all for supporting in core.
This allows to have dynamic canvas, pretty much, where, instead of the whole thing being laid out before, it can evolve, branch, merge etc... completely dynamically.

Admitedly though, having it in return is just syntax sugar.

@steeve
Celery member

For instance, here is one of our use:

@app.dynamic_task
def _get_freebase_object_by_guid(guid):
    query = DEFAULT_MQL_QUERY.copy()
    query["id"] = guid
    return _mql_read.si(query) | _on_get_freebase_object_by_guid.s()

@app.dynamic_task
def _on_get_freebase_object_by_guid(mql_result):
    if mql_result.get("result"):
        return _add_text(mql_result["result"])

And all of this executes as part of a big pipeline.

@steeve
Celery member

Also, by the way, regular tasks could be extended to support this, but we might now want to change their behaviour?
Or perhaps via a configuration entry ?

In any case, yes, it's possible to change the task decorator to support this.

@steeve
Celery member

But I'd rather have a @app.dynamic_task decorator, as to have people who use it aware of what it does.

@steeve
Celery member

Also, this enables stuff like:
chord([a.s() | b.s()], c.s())

Even if a and b are dynamic.

And chords inside chords with chains etc...

@ask
Celery member

Hmm, but dynamic task is maybe not descriptive enough, at least I don't get what it does by the name, regular tasks are "dynamic" after all.

It could be something like @task(dynamic=True) except dynamic is a less generic term.

@kalail

Dynamic tasks made me think of dynamically defining tasks during execution. Maybe a more suitable name would be 'Support for dynamic chaining' instead of 'Support for dynamic tasks'?

@dalbani

I've been using dynamic tasks for a while now and it works fine so far.
In regards to my needs, I really couldn't make it without this feature.

However, I'm wondering if it could be the cause of an issue I have.
In my RabbitMQ instance, thousands (!) of queues are created and that seems to bog down the beam.smp process, which RES memory usage grows rapidly to several GB.

@steeve
Celery member

Thanks for the feedback!

In regards to your queue leaking issue, I bet you are using the AMQP result backend and do not consume the results? This is not related to dynamic tasks.
AMQP backend creates a queue for each task result.

@steeve
Celery member

We are also using them in production @veezio for quite some time, works fine. By the way if anyone has a better name than dynamic, I'm all for it :)

@dalbani

Indeed Steeve, I do use the AMQP result backend.
So how could I fix that? If I'm not mistaken, I had to use a result backend, otherwise it wouldn't work.
Should I configure it through CELERY_TASK_RESULT_EXPIRES?

@steeve
Celery member

Indeed yes, but I think you can put CELERY_IGNORE_RESULTS so that it won't store them.

@ldng

First, I'm new to Celery and evaluating it for inclusion in our project. And I'm not sure q fully understand this pull request. Let say I have a queue of three tasks :
t_1
t_2
t_3

But some times I would have :
t_1
t_A
t_2
t_3
where t_3 must wait on t_A i present in the queue.
Would your task chaining system allow me to dynamically make t_3 depends on t_A if present ?
Or is it already do able with current Celery ?
If it solves that problem and is not already doable, it would be a great feature to include !

@steeve
Celery member

dynamic tasks would allow you to do this:

chain = t_1.si() | t_2.s() | t_3.s()

def t_1():
    if something:
        return t_A.si(some, args)
    return some_result

def t_A(some, args):
    return some_other_result

Then t_A would get executed before (and give its result to) t_2 but only if something :)

In the end the pipe would be as if: t_1.si() | t_A.si(some, args) | t_2.s() | t_3.s() of course where some and args are t_1 locals.

@dalbani

Steeve, I've tried setting CELERY_IGNORE_RESULTS = True but then the whole dynamic behavior seemed to stop working.
Instead, I've used CELERY_TASK_RESULT_EXPIRES = timedelta(minutes = 2). However, even with a modest dynamic tasks setup, I still get up to 1 or 2 thousand queues apparently.
And I think there is a risk in lowering CELERY_TASK_RESULT_EXPIRES more, as that could break long running tasks, couldn't it?

@dalbani

I've also thought of using another result backend, like Memcache.
However I get errors like :

[2012-12-14 19:02:46,190: CRITICAL/MainProcess] Task celery.checkpoint[e94f0eac-e8b4-47b7-aaca-66338093585c] INTERNAL ERROR: TypeError("object of type 'NoneType' has no len()",)
Traceback (most recent call last):
  File "/home/ubuntu/venv/local/lib/python2.7/site-packages/celery/task/trace.py", line 266, in trace_task
    on_chord_part_return(task)
  File "/home/ubuntu/venv/local/lib/python2.7/site-packages/celery/backends/base.py", line 475, in on_chord_part_return
    if val >= len(deps):
TypeError: object of type 'NoneType' has no len()
@Rigdon

I just wanted to throw in and say this sounds like an excellent idea. I'm currently building a processing framework and right now we're pre-defining everything up front. This capability would give us much greater flexibility.

@steeve
Celery member

Perhaps we could provide this feature as a recipe maybe ?

But I agree with @ask that it maybe needs a better name

@baconalot baconalot referenced this pull request
Closed

Simple Chain hangs #1286

@ask
Celery member

I don't doubt that this is useful, I just don't like introducing different @task decorators as I think that will be confusing for the user. I also don't want to add major features to celery.contrib, as that has proven to be a bad idea in the past. If it's generically useful it should be in core, and if it's a special use case it should probably be in a separate library. It's probably possible to support this in the existing @task decorator.

@ask
Celery member

And yeah, a recipe is also possible. But then the recipe should be simple enough that the user don't have to copy and paste a lot of code. If there's too much code, parts of it can be in core (but integrated rather than hidden in contrib), or there can be a separate library.

@JonPeel

Any pointers on how one would go about inspecting dynamically created tasks? I've just been toying with this addition try and break down one large, beastly and very specific task into lots of different generic sub tasks. That much works really (!) well but I feel like I've lost touch with that top level task and can't easily determine when it's 'done'.

@steeve
Celery member

Well you can't really know when it's done. If you want to get its result, you will have to put a task at the end of the pipe, because, after all, it's a pipe!

@chrisw1229

In regards to knowing when a collection of dynamic tasks are done, I think the feature idea I just started playing with might help with that. Assigning a root_id to a top-level task that is automatically propagated to all descendant tasks. That way you can query your result backend for any tasks that match a particular root_id. Relevant issue here: #1309

@dbrgn

After discussion in irc with ionelmc, this seems to be exactly what I need to solve http://stackoverflow.com/questions/16737127/celery-callback-after-task-hierarchy/16738431. Would be great if this can somehow be included.

@dbrgn

As for the naming, the method reminds me of the C execl/execv/... functions. The parent task is basically "replaced" with the child subtask. Maybe something along the lines of replaceable, replacing, substituting etc would be suitable?

@dbrgn

After a lot of playing around with these new tasks, I think I mostly understood them.

One thing left open for me: I want to pass on data up through multiple levels of dynamic tasks. Currently I'm doing this:

testtasks.py

import os
import time
import random
from celery import Celery, group
from celery.utils.log import get_task_logger
import worker.dynamic_task
from worker.dynamic_task import dynamic_chord

celery = Celery('hello',
    backend=os.environ['BROKER_URL'],
    broker=os.environ['BROKER_URL'])
logger = get_task_logger(__name__)


@celery.dynamic_task
def pass_on(*args):
    return args

@celery.dynamic_task
def one(name):
    logger.warning('[1] ' + name)
    time.sleep(1)
    amount = random.randint(2, 3)
    subtasks = [two.si(name, '{} von {}'.format(i + 1, amount))
            for i in range(amount)]
    chord = dynamic_chord(subtasks, pass_on.s())
    return chord

@celery.dynamic_task
def two(name, label):
    logger.warning(' [2] {} | {}'.format(name, label))
    time.sleep(1)
    amount = random.randint(2, 3)
    subtasks = group([thr.si(name, '{} von {}'.format(i + 1, amount))
            for i in range(amount)])
    return subtasks

@celery.dynamic_task
def thr(name, label):
    logger.warning('  [3] {} | {}'.format(name, label))
    time.sleep(1)
    return 'URL'

@celery.dynamic_task
def notify(arg):
    logger.warning('[NOTIFY] done!')
    logger.warning('Arg: {}'.format(arg))

test.py

from worker.dynamic_task import dynamic_chord
from testtasks import *

maintask = dynamic_chord(map(one.si, ['A', 'B', 'C']), notify.s())
maintask.apply_async()

This prints the following:

[2013-05-24 20:58:26,519: WARNING/MainProcess] testtasks.notify[b75be245-799c-422b-97a6-daaf93848fb8]: [NOTIFY] done!
[2013-05-24 20:58:26,519: WARNING/MainProcess] testtasks.notify[b75be245-799c-422b-97a6-daaf93848fb8]: Arg: [[['URL', 'URL', 'URL'], ['URL', 'URL'], ['URL', 'URL']], [['URL', 'URL'], ['URL', 'URL'], ['URL', 'URL', 'URL']], [['URL', 'URL', 'URL'], ['URL', 'URL'], ['URL', 'URL']]]

Which is good, because I'm passing up data from the lowest level to the top level, without having a blocking task that waits for all subtasks to finish.

The point is that I need a "dummy callback" that passes on all data from one level to the next. Wouldn't this snippet also make sense in dynamic_task.py, analogous to dynamic_chord?

def dynamic_group(tasks):
    return group([(task | subtask("celery.checkpoint")) for task in tasks])

This way I don't need the dummy callback and everything still works. (But maybe I've overlooked some bad implicatoins or an easier alternative.)


As a sidenode, I can't get this to work together with Django / djcelery. This traceback shows up when using dynamic_chord: https://gist.github.com/dbrgn/5646021

@dbrgn

Any news on design decisions about this feature? In the past weeks I've seen a lot of use cases where this feature would be perfectly suited and much needed.

@gotpredictions

It would come in handy for us too. Especially the dynamic chord. Currently we need to have one of the tasks holding up the process to keep checking for status.

See my post in celery-users. I am hoping this is the solution for it: https://groups.google.com/forum/#!topic/celery-users/eNGbLlAwhi0

@chris-hailstorm

Is this being considered for a Celery release? This is very useful.

@ask
Celery member

As I wrote earlier in the thread, if this is useful then it should be possible to implement this in the existing 'task' decorator. Adding things to celery.contrib has been a mistake in the past and it will be confusing to have two decorators with wildly different semantics.

@ask
Celery member

If merging this as-is I imagine having to ask users 'do you use tasks or dynamic tasks?' when responding to bug reports and that is something I want to avoid :)

The dynamic task basically links the return value to the currently executing task so this should be possible to do with existing tasks, but more explicit e.g.::

@app.task(bind=True)
 def one(self, i):
     if i > 3:
         return self.link(two.s(i))
     return i + 1

Not sure I like the name link, so hope someone can figure out a better name.

@chris-hailstorm
@dbrgn

I think both replace and subst or substitute are better than link.

@mehcode

What's wrong with just auto-detecting if a Signature is returned from a task and replacing it?

@app.task
 def one(i):
     if i > 3:
         return two.s(i)
     return i + 1

Also, something like this should just work (chains in chords in chords in chains ... .etc.):

chord(map(...), chain(chord(...), chord(...., chain(...))))

It doesn't at the moment with this patch.


On another note. When I replayed this PR on top of 3.0 I had to change the following:

retval |= current_task.request.callbacks[0] 

To:

retval |= Signature(current_task.request.callbacks[0])
@ask
Celery member
ask commented

@mehcode: We cannot convert any sig in a return value as that would not be backwards compatible.

Regarding the Signature change, you are maybe using the json serializer?
All subtasks must be converted back into subtasks when they are received to be compatible with non-pickle subtasks
the idiom is:

from celery import subtask
retval = subtask(current_task.request.callbacks[0])
@ask
Celery member

The c32-dynamic branch implements a first draft of Task.replace().

The way it replaces callbacks is no good as I imagine it's only supposed to replace the chain node, not all of the callbacks, but this may be improved when the new 'chain' message field is added (also for 3.2).

Dynamic chord I'm not sure about, what does it solve?

Btw, that cantincan site seems pretty useless as it's more work to fill in the forms than it's to simply implement a feature :)

@dbrgn

Dynamic chord I'm not sure about, what does it solve?

If a task can be replaced by a chord using the Task.replace() method, then we don't need explicit dynamic chords. Otherwise, it's definitely a feature that should be implemented in order to create dynamic workflows.

Example: I have a task that parses URLs of different types. For each URL, according to the type, a series of tasks should be executed. But each URL can be processed in parallel. So the main task should return a group of chords. For example:

- maintask (processes 3 URLs)
-- process_url_1 (type A)
   1. download url
   2. process results
   3. finish processing and write resulting data to database
-- process_url_2 (type A)
   1. download url
   2. process results
   3. finish processing and write resulting data to database
-- process_url_3 (type B)
   1. download url
   2. do something else than with type A URLs
   3. send an email if a condition is matched

This can be implemented if the maintask can replace itself with a group of process_url tasks. Each of those process_url tasks would then process the URL and - depending on the type of the URL - replace itself with chord_a(url) or chord_b(url).

@mehcode

@ask This task.replace looks like it can be used to fulfill our use case for dynamic (but I would say conditional so .replace fits) tasks.

And yes, I was using the JSON serializer.

@ryanhiebert

The .replace feature by @ask looks great for my use-case of dynamic tasks. What can I do to help it get released? Does it need some testing?

@AlJohri

+1 what can be done to help get the .replace feature released?

@ask
Celery member

Documentation and tests would help, as well as further defining the semantics of this feature and how it works with the canvas (e.g. chord)

@dbrgn

The add_to_chord and replace_in_chord methods have been added recently, thanks @ask.

I just tested the new feature. This is the base script I used:

# test.py
from celery import Celery, chord
from celery.utils.log import get_task_logger

app = Celery('test', backend='redis://localhost:6379/10', broker='redis://localhost:6379/11')
app.conf.CELERY_ALWAYS_EAGER = False

logger = get_task_logger(__name__)

@app.task(bind=True)
def get_one(self):
    return 1

@app.task
def get_two():
    return 2

@app.task
def sum_all(data):
    logger.error(data)
    return sum(data)

if __name__ == '__main__':
    x = chord(get_one.s() for i in range(3))
    body = sum_all.s()
    result = x(body)

    print(result.get())

Standard output is 3 (sum of [1, 1, 1]).

Replacement works great:

@app.task(bind=True)
def get_one(self):
    self.replace_in_chord(get_two.s())
    return 1

Output:

[2014-10-15 11:15:32,632: INFO/MainProcess] Received task: test.get_one[2632c0f1-365e-4f20-9db6-8f0cc0769ff0]
[2014-10-15 11:15:32,634: INFO/MainProcess] Received task: test.get_one[288e2b60-b32b-4710-ac40-66364eb33a2d]
[2014-10-15 11:15:32,642: INFO/MainProcess] Received task: test.get_one[f0b5150a-c4a5-461c-886a-d61d10a7ade3]
[2014-10-15 11:15:32,670: INFO/MainProcess] Received task: test.get_two[2632c0f1-365e-4f20-9db6-8f0cc0769ff0]
[2014-10-15 11:15:32,670: INFO/Worker-3] Task test.get_one[2632c0f1-365e-4f20-9db6-8f0cc0769ff0] ignored
[2014-10-15 11:15:32,678: INFO/Worker-4] Task test.get_one[288e2b60-b32b-4710-ac40-66364eb33a2d] ignored
[2014-10-15 11:15:32,678: INFO/MainProcess] Received task: test.get_two[288e2b60-b32b-4710-ac40-66364eb33a2d]
[2014-10-15 11:15:32,682: INFO/Worker-4] Task test.get_two[2632c0f1-365e-4f20-9db6-8f0cc0769ff0] succeeded in 0.00283580459654s: 2
[2014-10-15 11:15:32,685: INFO/Worker-2] Task test.get_one[f0b5150a-c4a5-461c-886a-d61d10a7ade3] ignored
[2014-10-15 11:15:32,686: INFO/MainProcess] Received task: test.get_two[f0b5150a-c4a5-461c-886a-d61d10a7ade3]
[2014-10-15 11:15:32,687: INFO/Worker-2] Task test.get_two[288e2b60-b32b-4710-ac40-66364eb33a2d] succeeded in 0.00108842598274s: 2
[2014-10-15 11:15:32,689: INFO/Worker-4] Task test.get_two[f0b5150a-c4a5-461c-886a-d61d10a7ade3] succeeded in 0.00150422612205s: 2
[2014-10-15 11:15:33,194: INFO/MainProcess] Received task: test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf]
[2014-10-15 11:15:33,195: ERROR/Worker-2] test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf]: [2, 2, 2]
[2014-10-15 11:15:33,197: INFO/Worker-2] Task test.sum_all[74be920d-edb1-4175-be6e-bba6146b38cf] succeeded in 0.00115499692038s: 6

The result is 6, which is correct (2+2+2). When I use add_to_chord instead of replace_in_chord though, it doesn't seem to work as intended:

@app.task(bind=True)
def get_one(self):
    self.add_to_chord(get_two.s())
    return 1

Output:

[2014-10-15 11:16:35,170: INFO/MainProcess] Received task: test.get_one[20665696-3e12-4f98-902d-755aa78b392c]
[2014-10-15 11:16:35,172: INFO/MainProcess] Received task: test.get_one[9c78c35f-1678-429d-a05e-bf18c4c6e5f7]
[2014-10-15 11:16:35,175: INFO/Worker-3] Task test.get_one[20665696-3e12-4f98-902d-755aa78b392c] succeeded in 0.00263688992709s: 1
[2014-10-15 11:16:35,177: INFO/MainProcess] Received task: test.get_one[caea3644-7696-421a-ae54-510eaba63029]
[2014-10-15 11:16:35,178: INFO/MainProcess] Received task: test.get_two[821f43da-5742-467a-bb39-b01c1687dc9f]
[2014-10-15 11:16:35,180: INFO/Worker-3] Task test.get_two[821f43da-5742-467a-bb39-b01c1687dc9f] succeeded in 0.000982443802059s: 2
[2014-10-15 11:16:35,182: INFO/MainProcess] Received task: test.get_two[3a484990-d400-4efa-98ce-c1c07a5d8d07]
[2014-10-15 11:16:35,182: INFO/Worker-2] Task test.get_one[caea3644-7696-421a-ae54-510eaba63029] succeeded in 0.00319112883881s: 1
[2014-10-15 11:16:35,184: INFO/Worker-4] Task test.get_two[3a484990-d400-4efa-98ce-c1c07a5d8d07] succeeded in 0.00067730108276s: 2
[2014-10-15 11:16:35,185: WARNING/Worker-4] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:35,200: INFO/Worker-1] Task test.get_one[9c78c35f-1678-429d-a05e-bf18c4c6e5f7] succeeded in 0.0277538709342s: 1
[2014-10-15 11:16:35,201: INFO/MainProcess] Received task: test.get_two[71fb937f-fc4e-4c6d-9e5c-d12827322e0a]
[2014-10-15 11:16:35,201: WARNING/Worker-1] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:35,203: INFO/Worker-3] Task test.get_two[71fb937f-fc4e-4c6d-9e5c-d12827322e0a] succeeded in 0.000652418937534s: 2
[2014-10-15 11:16:35,204: WARNING/Worker-3] Chord counter incremented too many times for u'd54aa294-46af-43e6-981a-e52012f81620'
[2014-10-15 11:16:36,191: INFO/MainProcess] Received task: test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771]
[2014-10-15 11:16:36,193: ERROR/Worker-4] test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771]: [1, 1, 1]
[2014-10-15 11:16:36,194: INFO/Worker-4] Task test.sum_all[93c4c9a5-17c2-465f-abcd-0e265b66d771] succeeded in 0.00115421414375s: 3

The result is 3, not 9 (1+2+1+2+1+2).

Also, another time I started the same script, the following exceptions occured in the celery worker log:

[2014-10-15 11:16:27,236: ERROR/Worker-3] Chord callback u'bfaf94da-23b3-4f2f-b163-016cbac50f43' raised: ValueError(u'bfaf94da-23b3-4f2f-b163-016cbac50f43',)
Traceback (most recent call last):
  File "/home/danilo/Projects/celery/celery/backends/base.py", line 543, in on_chord_part_return
    raise ValueError(gid)
ValueError: bfaf94da-23b3-4f2f-b163-016cbac50f43
[2014-10-15 11:16:27,244: INFO/Worker-3] Task test.get_two[72762358-f0b4-4ae0-bc05-4b084d942e08] succeeded in 0.000811628066003s: 2
[2014-10-15 11:16:27,245: ERROR/Worker-3] Chord callback u'bfaf94da-23b3-4f2f-b163-016cbac50f43' raised: ValueError(u'bfaf94da-23b3-4f2f-b163-016cbac50f43',)
Traceback (most recent call last):
  File "/home/danilo/Projects/celery/celery/backends/base.py", line 543, in on_chord_part_return
    raise ValueError(gid)
ValueError: bfaf94da-23b3-4f2f-b163-016cbac50f43

And the python test.py output showed this exception:

Traceback (most recent call last):
  File "test.py", line 29, in <module>
    print(result.get())
  File "/home/danilo/Projects/celery/celery/result.py", line 177, in get
    raise self.backend.exception_to_python(meta['result'])
celery.backends.base.ChordError: GroupResult bfaf94da-23b3-4f2f-b163-016cbac50f43 no longer exists
@dbrgn

Also, does this only support "simple" task signatures? Replacing a task with a chord doesn't currently seem to work.

@app.task(bind=True)
def get_one(self):
    x = chord(get_two.s() for i in range(3))
    body = sum_all.s()
    sig = x(body)
    self.replace_in_chord(sig)
@ask
Celery member
@dbrgn

You’re not using redis:///10?new_join=1 for the result backend here,

Aah, sorry for forgetting about that! I'll test it again tomorrow.

@dbrgn

Great, seems to work! :sparkles:

@ask ask added a commit that closed this pull request
@ask ask Task.replace changed, removes Task.replace_in_chord.
The two methods had almost the same functionality, but the old Task.replace
would force the new task to inherit the callbacks/errbacks of the existing
task.

If you replace a node in a tree, then you would not expect the new node to
inherit the children of the old node, so this seems like unexpected
behavior.

So self.replace(sig) now works for any task, in addition sig can now
be a group.

Groups are automatically converted to a chord, where the callback
will "accumulate" the results of the group tasks.

A new builtin task (`celery.accumulate` was added for this purpose)

Closes #817
07ecd08
@ask ask closed this in 07ecd08
@traxair

Would you have working examples of this type of behavior? I cannot reproduce it...

This is my current code:

from celery import Celery

app = Celery('hello', broker='redis://localhost')

@app.task(bind=True)
def hello(self):
    print 'hello world'
    raise self.replace(goodbye.s())

@app.task
def goodbye():
    return 'good bye!'

but I get the error AttributeError: 'hello' object has no attribute 'replace'

I tried both versions 3.1.17 and the master from today (for which I had to git clone and install your amqp and kombu projects).

@dbrgn

@traxair as discussed above, did you enable the Redis result backend with ?new_join=1 enabled?

@ask ask added a commit that referenced this pull request
@ask ask Fixes typo in docstring for Issue #817 7566d2d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 15, 2012
  1. @steeve
  2. @steeve

    updating dynamic doc

    steeve committed
Showing with 82 additions and 0 deletions.
  1. +82 −0 celery/contrib/dynamic.py
View
82 celery/contrib/dynamic.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+"""
+=============
+Dynamic Tasks
+=============
+
+Subtasks returned by dynamic tasks are executed right after the first
+task executes. As if they were in a chain.
+You can also return chains and chords, and they will be properly inserted.
+You can have chords in chords etc...
+
+This allows you to design a pipeline that can be completely dynamic, while
+benefiting from Celery's powerful idioms (subtasks, chains, chords...).
+
+
+Usage example
+-------------
+
+.. code-block:: python
+
+ import celery.contrib.dynamic
+ from celery.contrib.dynamic import dynamic_chord as chord
+
+ @app.dynamic_task
+ def one(i):
+ if i > 3:
+ return two.s(i)
+ return i + 1
+
+ @app.dynamic_task(ignore_result=True)
+ def two(i):
+ return i + 2
+
+ def test():
+ one.delay(2) # will run: one.s(2) => 3
+ one.delay(5) # will run: one.s(5) | two.s() => 7
+
+"""
+from __future__ import absolute_import
+
+from celery import current_task, Celery, subtask, chord
+from celery.app.builtins import shared_task
+from celery.canvas import Signature
+from functools import wraps
+
+
+def dynamic_task(self, *args, **kwargs):
+ def do_wrap(fn):
+ @wraps(fn)
+ def _fn_wrapper(*args, **kwargs):
+ retval = fn(*args, **kwargs)
+ if isinstance(retval, Signature):
+ retval.immutable = True
+ if current_task.request.callbacks:
+ retval |= current_task.request.callbacks[0]
+ if current_task.request.chord:
+ retval.set(chord=current_task.request.chord)
+ current_task.request.chord = None
+ if current_task.request.group:
+ retval.set(group_id=current_task.request.group)
+ current_task.request.group = None
+ current_task.request.callbacks = [retval]
+ return retval
+ return self.task(*args, **kwargs)(_fn_wrapper)
+ if len(args) == 1 and callable(args[0]):
+ fn = args[0]
+ args = args[1:]
+ return do_wrap(fn)
+ return do_wrap
+setattr(Celery, "dynamic_task", dynamic_task)
+
+
+@shared_task
+def add_checkpoint_task(app):
+ @app.task(name='celery.checkpoint')
+ def checkpoint(result):
+ return result
+ return checkpoint
+
+
+def dynamic_chord(tasks, callback):
+ return chord([(task | subtask("celery.checkpoint")) for task in tasks], callback)
Something went wrong with that request. Please try again.