New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc backend doesn't seem to report message status correctly. #4084

Open
mke21 opened this Issue Jun 13, 2017 · 8 comments

Comments

Projects
None yet
6 participants
@mke21

mke21 commented Jun 13, 2017

This is version 4.0.2
If I use rabbitmq, the rpc result backend, and a custom queue the message's status never seems to change and stays 'PENDING', even if the logging in the worker reports that it has been successfully executed until I do a get() of some sorts when the status changes to 'SUCCESS'. When I change the backend to amqp this the system works as expected, giving SUCCESS before doing the get(). Also the redis backend doesn't give this problem, so it seems to be rpc specific.

Note that not setting a custom queue, so using the default also works as expected just like the other backends!

I've got in my tasks.py:

# tasks.py
from celery import Celery

app = Celery('tasks', 
broker='amqp://',
backend='rpc://' # here I use the rpc backend
)
@app.task
def test_task(s):
    return s

I start this with `celery -A tasks worker --loglevel=info -Q myqueue

On the other side I do:

>>> import tasks
>>> result = tasks.test_task.apply_async(queue='myqueue', args=['blaat',]) # set custom queue

>>> result.ready() # worker already executed, so should give True
False
>>> result.status # with RPC backend it returned False while it should have been True in previous statement
'PENDING'
>>> print(result.get())
'This is a string'
>>> result.ready()
True
>>> result.status
'SUCCESS'

If I start this qithout the -Q option: 'celery -A tasks worker --loglevel=info' I get:

>>> import tasks
>>> result = tasks.test_task.apply_async(args=['blaat',]) # set no custom queue

>>> result.ready() # worker already executed, so should give True
True
>>> result.status # previous seemed to work
'SUCCESS'
>>> print(result.get()) # SUCCESS was also reported
'This is a string'
>>> result.ready()
True
>>> result.status
'SUCCESS'
@jpescalona

This comment has been minimized.

Show comment
Hide comment
@jpescalona

jpescalona Jul 26, 2017

I've been able to reproduce this issue with:

  • Celery 4.0.2
  • backend_results = 'rpc://'

but it fails when I execute a group task before individual tasks. Anyway, GroupResult does not return ready even if all subtasks are ready. When I call GroupResult.get it marks GroupResult as ready.

In [1]: from myapp.tasks import invalidate_cache
In [2]: r = invalidate_cache.apply_async(queue='management')

In [3]: r.state
Out[3]: u'STARTED'

In [4]: r.state
Out[4]: u'STARTED'

In [5]: r.state
Out[5]: u'STARTED'

In [6]: r.ready()
Out[6]: True

In [7]: r.ready()
Out[7]: True

In [8]: r.state
Out[8]: u'SUCCESS'

In [9]: from myapp.tasks import invalidate_cache

In [10]: from celery import group

In [11]: g = group([invalidate_cache.s().set(queue='management')]).apply_async()

In [12]: g.ready()
Out[12]: False

In [13]: g.ready()
Out[13]: False

In [14]: g.ready()
Out[14]: False

In [15]: g.ready()
Out[15]: False

In [16]: g.ready()
Out[16]: False

In [17]: g.get()
Out[17]: [None]

In [18]: g.ready()
Out[18]: True

In [19]: r = invalidate_cache.apply_async(queue='management')

In [20]: r.state
Out[20]: u'PENDING'

In [21]: r.state
Out[21]: u'PENDING'

In [22]: r.state
Out[22]: u'PENDING'

In [23]: r.get()

In [24]: r.state
Out[24]: u'SUCCESS'

No matter what queue you define or start using celery command, it starts failing once a group task is executed

jpescalona commented Jul 26, 2017

I've been able to reproduce this issue with:

  • Celery 4.0.2
  • backend_results = 'rpc://'

but it fails when I execute a group task before individual tasks. Anyway, GroupResult does not return ready even if all subtasks are ready. When I call GroupResult.get it marks GroupResult as ready.

In [1]: from myapp.tasks import invalidate_cache
In [2]: r = invalidate_cache.apply_async(queue='management')

In [3]: r.state
Out[3]: u'STARTED'

In [4]: r.state
Out[4]: u'STARTED'

In [5]: r.state
Out[5]: u'STARTED'

In [6]: r.ready()
Out[6]: True

In [7]: r.ready()
Out[7]: True

In [8]: r.state
Out[8]: u'SUCCESS'

In [9]: from myapp.tasks import invalidate_cache

In [10]: from celery import group

In [11]: g = group([invalidate_cache.s().set(queue='management')]).apply_async()

In [12]: g.ready()
Out[12]: False

In [13]: g.ready()
Out[13]: False

In [14]: g.ready()
Out[14]: False

In [15]: g.ready()
Out[15]: False

In [16]: g.ready()
Out[16]: False

In [17]: g.get()
Out[17]: [None]

In [18]: g.ready()
Out[18]: True

In [19]: r = invalidate_cache.apply_async(queue='management')

In [20]: r.state
Out[20]: u'PENDING'

In [21]: r.state
Out[21]: u'PENDING'

In [22]: r.state
Out[22]: u'PENDING'

In [23]: r.get()

In [24]: r.state
Out[24]: u'SUCCESS'

No matter what queue you define or start using celery command, it starts failing once a group task is executed

@jpescalona

This comment has been minimized.

Show comment
Hide comment
@jpescalona

jpescalona Jul 27, 2017

After digging a while, i've discovered that this problem is reproduced when using rpc as results_backend instead of amqp. Maybe rpc results backend is broken?

jpescalona commented Jul 27, 2017

After digging a while, i've discovered that this problem is reproduced when using rpc as results_backend instead of amqp. Maybe rpc results backend is broken?

@thedrow

This comment has been minimized.

Show comment
Hide comment
@thedrow

thedrow Jul 27, 2017

Contributor

This sounds like a bug. Thanks for the report.
Did any of you try doing the same with master?

Contributor

thedrow commented Jul 27, 2017

This sounds like a bug. Thanks for the report.
Did any of you try doing the same with master?

@jpescalona

This comment has been minimized.

Show comment
Hide comment
@jpescalona

jpescalona Jul 27, 2017

I've tried against 4.0.2, 4.1.0 and master, and for all of them, rpc backend do not update the tasks status correctly.

jpescalona commented Jul 27, 2017

I've tried against 4.0.2, 4.1.0 and master, and for all of them, rpc backend do not update the tasks status correctly.

@donkopotamus

This comment has been minimized.

Show comment
Hide comment
@donkopotamus

donkopotamus Sep 6, 2017

Contributor

I have also observed something similar to this issue, where despite a worker being configure to send a STARTED message (via task_send_started), and having confirmed (by inserting a breakpoint) that it is being sent, the rpc backend does not appear to process it with the AsyncResult.state moving only between PENDING to SUCCESS.

Currently I have traced the issue to this piece of code in celery.backends.async:BaseResultConsumer:

    def on_state_change(self, meta, message):
        if self.on_message:
            self.on_message(meta)
        if meta['status'] in states.READY_STATES:    ### <===== HERE
            task_id = meta['task_id']
            try:
                result = self._get_pending_result(task_id)
            except KeyError:
                # send to buffer in case we received this result
                # before it was added to _pending_results.
                self._pending_messages.put(task_id, meta)
            else:
                result._maybe_set_cache(meta)
                buckets = self.buckets
                try:
                    # remove bucket for this result, since it's fulfilled
                    bucket = buckets.pop(result)
                except KeyError:
                    pass
                else:
                    # send to waiter via bucket
                    bucket.append(result)
        sleep(0)

I have confirmed that the STARTED message is being processed by this on_message but clearly as it is not in states.READY_STATES it will never be acted on in any way. Indeed other custom messages will also not be processed.

Contributor

donkopotamus commented Sep 6, 2017

I have also observed something similar to this issue, where despite a worker being configure to send a STARTED message (via task_send_started), and having confirmed (by inserting a breakpoint) that it is being sent, the rpc backend does not appear to process it with the AsyncResult.state moving only between PENDING to SUCCESS.

Currently I have traced the issue to this piece of code in celery.backends.async:BaseResultConsumer:

    def on_state_change(self, meta, message):
        if self.on_message:
            self.on_message(meta)
        if meta['status'] in states.READY_STATES:    ### <===== HERE
            task_id = meta['task_id']
            try:
                result = self._get_pending_result(task_id)
            except KeyError:
                # send to buffer in case we received this result
                # before it was added to _pending_results.
                self._pending_messages.put(task_id, meta)
            else:
                result._maybe_set_cache(meta)
                buckets = self.buckets
                try:
                    # remove bucket for this result, since it's fulfilled
                    bucket = buckets.pop(result)
                except KeyError:
                    pass
                else:
                    # send to waiter via bucket
                    bucket.append(result)
        sleep(0)

I have confirmed that the STARTED message is being processed by this on_message but clearly as it is not in states.READY_STATES it will never be acted on in any way. Indeed other custom messages will also not be processed.

@auvipy

This comment has been minimized.

Show comment
Hide comment
@auvipy

auvipy Dec 19, 2017

Member

could you also check latest master and report?

Member

auvipy commented Dec 19, 2017

could you also check latest master and report?

@auvipy auvipy added this to the v4.2 milestone Dec 19, 2017

@auvipy auvipy modified the milestones: v4.2, v5.0.0 Jan 13, 2018

@auvipy auvipy modified the milestones: v5.0.0, v4.3 May 27, 2018

@xirdneh

This comment has been minimized.

Show comment
Hide comment
@xirdneh

xirdneh Jun 5, 2018

Member

Somebody reported this as an issue on IRC.
I can confirm the state and ready values are not correct until you do result.get() when using RPC as result backend.
This is using the latest master

Member

xirdneh commented Jun 5, 2018

Somebody reported this as an issue on IRC.
I can confirm the state and ready values are not correct until you do result.get() when using RPC as result backend.
This is using the latest master

@xirdneh

This comment has been minimized.

Show comment
Hide comment
@xirdneh

xirdneh Jun 5, 2018

Member

Another update while I was testing this. Apparently it is working for single tasks but the same problem arises when using groups.

>>> res = group(add.s(3,4), add.s(5,7)).delay()
>>> for r in res.results:
...     print(r.state)
PENDING
PENDING

This might have to be a new issue...

Member

xirdneh commented Jun 5, 2018

Another update while I was testing this. Apparently it is working for single tasks but the same problem arises when using groups.

>>> res = group(add.s(3,4), add.s(5,7)).delay()
>>> for r in res.results:
...     print(r.state)
PENDING
PENDING

This might have to be a new issue...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment