tasks waiting for a subtask with retry() can lose their task id (eventlet) #521

Closed
paulswartz opened this Issue Nov 3, 2011 · 6 comments

2 participants

@paulswartz

When a task is waiting on a subtask, and polling with .retry() for the sub result to be done, it can lose its task_id under the eventlet backend. The processes backend does not have this issue.

The usecase for this is splitting up an HTTP fetch, and some action on the result of that HTTP fetch, in a way that gives me the result of the action.

Example code:

from celery.task import task
from celery.task.sets import TaskSet

@task(max_retries=5)
def noop(i, r=None):
    print 'noop %i %s' % (i, noop.request.id)
    if not r:
        r = noop2.delay()

    if not r.ready():
        noop.retry((i, r), countdown=1)

    print 'noop %i returning' % i
    return i

@task
def noop2(fire=False):
    if not fire:
        noop2.retry((True,), countdown=2)
    else:
        return True

def main():
    return TaskSet(
        tasks=[noop.subtask((i,)) for i in range(2)]).apply_async().join()

if __name__ == '__main__':
    print main()

Example log:

[2011-11-03 18:54:51,714: WARNING/MainProcess] celery@lucid64 has started.
[2011-11-03 18:54:56,532: WARNING/MainProcess] noop 0 b9e2cf25-7b23-4a15-b3e7-eba7123beb8b
[2011-11-03 18:54:56,534: WARNING/MainProcess] noop 1 20ddeef8-aa9c-47c4-ad58-deb572770f4d
[2011-11-03 18:54:57,016: WARNING/MainProcess] noop 0 20ddeef8-aa9c-47c4-ad58-deb572770f4d
[2011-11-03 18:54:57,023: WARNING/MainProcess] noop 1 20ddeef8-aa9c-47c4-ad58-deb572770f4d
[2011-11-03 18:54:58,026: WARNING/MainProcess] noop 0 20ddeef8-aa9c-47c4-ad58-deb572770f4d
[2011-11-03 18:54:58,029: WARNING/MainProcess] noop 0 returning
[2011-11-03 18:54:58,042: WARNING/MainProcess] noop 1 20ddeef8-aa9c-47c4-ad58-deb572770f4d
[2011-11-03 18:54:58,046: WARNING/MainProcess] noop 1 returning
@paulswartz

Also, I've tested this against 2.3.3 and master; both exhibit this issue.

@paulswartz

Workaround is to save the current request id, and restore it after functions which can switch greenthreads:

@task(max_retries=5)
def noop(i, r=None):
    print 'noop %i %r' % (i, r)
    id_ = noop.request.id # save current ID
    if not r:
        r = noop2.delay()
    ready = r.ready()
    noop.request.id = id_ # restore when we're back from the network
    if not ready:
        noop.retry((i, r), countdown=1)
    print 'noop %i returning' % i
    return i
@ask
Celery member

But, task.request is a thread local, so are you saying another noop task subsequently runs in the same greenlet?

@paulswartz

My guess is that the noop2.delay() call doesn't get a new thread-local request, so it overwrites the existing one.

@ask
Celery member
ask commented Feb 1, 2012

But, neither task.delay or task.retry modify task.request. Only calling the task does

@ask ask closed this in e2b052c May 16, 2012
@ask
Celery member

Try the above commit please, you have to install celery, kombu and billiard from git:

pip install https://github.com/ask/celery/zipball/master
pip install https://github.com/ask/kombu/zipball/master
pip install https://github.com/ask/billiard/zipball/master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment