Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

celery_worker pytest fixture timeouts since celery 5.0.3 #6521

Closed
anlambert opened this issue Dec 4, 2020 · 16 comments · Fixed by #6524
Closed

celery_worker pytest fixture timeouts since celery 5.0.3 #6521

anlambert opened this issue Dec 4, 2020 · 16 comments · Fixed by #6524

Comments

@anlambert
Copy link

Since the 5.0.3 release of celery, the celery_worker pytest fixture leads to a timeout when performing ping check.

The issue can be reproduced using this simple test file:

pytest_plugins = ["celery.contrib.pytest"]

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16

Below is the pytest output:

$ pytest -sv test_celery_worker.py 
============================================================================================== test session starts ===============================================================================================
platform linux -- Python 3.7.3, pytest-6.1.2, py-1.9.0, pluggy-0.13.1 -- /home/anlambert/.virtualenvs/swh/bin/python3
cachedir: .pytest_cache
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/home/anlambert/tmp/.hypothesis/examples')
rootdir: /home/anlambert/tmp
plugins: postgresql-2.5.2, asyncio-0.14.0, mock-3.3.1, cov-2.10.1, django-4.1.0, requests-mock-1.8.0, hypothesis-5.41.3, forked-1.3.0, swh.core-0.9.2.dev4+g6f9779f, flask-1.1.0, xdist-2.1.0, dash-1.17.0, swh.journal-0.5.2.dev1+g12b31a2
collected 1 item                                                                                                                                                                                                 

test_celery_worker.py::test_create_task ERROR

===================================================================================================== ERRORS =====================================================================================================
_______________________________________________________________________________________ ERROR at setup of test_create_task _______________________________________________________________________________________

request = <SubRequest 'celery_worker' for <Function test_create_task>>, celery_app = <Celery celery.tests at 0x7f99b4b91d30>, celery_includes = (), celery_worker_pool = 'solo', celery_worker_parameters = {}

    @pytest.fixture()
    def celery_worker(request,
                      celery_app,
                      celery_includes,
                      celery_worker_pool,
                      celery_worker_parameters):
        # type: (Any, Celery, Sequence[str], str, Any) -> WorkController
        """Fixture: Start worker in a thread, stop it when the test returns."""
        if not NO_WORKER:
            for module in celery_includes:
                celery_app.loader.import_task_module(module)
            with worker.start_worker(celery_app,
                                     pool=celery_worker_pool,
>                                    **celery_worker_parameters) as w:

../dev/celery/celery/contrib/pytest.py:196: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.7/contextlib.py:112: in __enter__
    return next(self.gen)
../dev/celery/celery/contrib/testing/worker.py:82: in start_worker
    assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
../dev/celery/celery/result.py:230: in get
    on_message=on_message,
../dev/celery/celery/backends/base.py:655: in wait_for_pending
    no_ack=no_ack,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <celery.backends.cache.CacheBackend object at 0x7f99b411fb00>, task_id = '98b047a2-2027-453c-a317-eb31f44a2547', timeout = 10.0, interval = 0.5, no_ack = True, on_interval = <promise@0x7f99b4a2adf0>

    def wait_for(self, task_id,
                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
        """Wait for task and return its result.
    
        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.
    
        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        """
        self._ensure_not_eager()
    
        time_elapsed = 0.0
    
        while 1:
            meta = self.get_task_meta(task_id)
            if meta['status'] in states.READY_STATES:
                return meta
            if on_interval:
                on_interval()
            # avoid hammering the CPU checking status.
            time.sleep(interval)
            time_elapsed += interval
            if timeout and time_elapsed >= timeout:
>               raise TimeoutError('The operation timed out.')
E               celery.exceptions.TimeoutError: The operation timed out.

../dev/celery/celery/backends/base.py:687: TimeoutError
============================================================================================ short test summary info =============================================================================================
ERROR test_celery_worker.py::test_create_task - celery.exceptions.TimeoutError: The operation timed out.
=============================================================================================== 1 error in 10.41s ================================================================================================

After a quick git bisect session, I managed to identify the commit that introduced the issue: e203168

@flying-sheep
Copy link

flying-sheep commented Dec 4, 2020

hahaha I just created a reproducible example to submit this! You actually don’t even need any test body:

pytest_plugins = ['celery.contrib.pytest']

def test_worker_initializes(celery_worker):
    assert True

@thedrow
Copy link
Member

thedrow commented Dec 6, 2020

Have you tried using celery_worker before 5.0.3?

@flying-sheep
Copy link

flying-sheep commented Dec 6, 2020

Yes, that’s why the title says “since celery 5.0.3”. Bug was introduced in e203168, which went into 5.0.3.

Since pytest just hangs after the timeout, I just used the timeout command to be able to find this:

$ cat >test_worker.py <<EOF
pytest_plugins = ['celery.contrib.pytest']
def test_worker_initializes(celery_worker):
    assert True
EOF
$ git bisect start
$ git bisect good v5.0.2
$ git bisect bad v5.0.3
$ git bisect run timeout 10s pytest -q test_worker.py
...
e2031688284484d5b5a57ba29cd9cae2d9a81e39 is the first bad commit
...
$ git bisect reset

@thedrow
Copy link
Member

thedrow commented Dec 6, 2020

@matusvalo Any chance you can handle this?

@thedrow thedrow added this to the 5.0.4 milestone Dec 6, 2020
@anlambert
Copy link
Author

FYI, it seems the issue is caused by this specific change; e203168#diff-acf0d4c4ddf0742d0699b98f51eec8823d1f22e7b76d0b4eb6f42ab27100188cL1193-R1214

Hope that it will help

@matusvalo
Copy link
Member

I will check that ASAP.

@matusvalo
Copy link
Member

I did basic investigation and it seems that the problem is connected to celery.backends.cache.CacheBackend. The following example works without problem:

import pytest
pytest_plugins = ['celery.contrib.pytest']

@pytest.fixture(scope='session')
def celery_config():
    return {
        'worker_hijack_root_logger': False,
        'worker_log_color': False,
        'accept_content': {'json'},
        'enable_utc': True,
        'timezone': 'UTC',
        'broker_url': 'amqp://',
        'result_backend': 'redis://',
        'broker_heartbeat': 0,
    }

def test_worker_initializes(celery_worker):
    assert True

And that's the reason why this issue was not found by test suite of Celery.

@matusvalo
Copy link
Member

OK. I understand the root cause of the issue. The problem was in cache+memory:// result back-end. This back-end is supposed to share peace of memory and pass the results using this memory - see:

class DummyClient:
def __init__(self, *args, **kwargs):
self.cache = LRUCache(limit=5000)
def get(self, key, *args, **kwargs):
return self.cache.get(key)
def get_multi(self, keys):
cache = self.cache
return {k: cache[k] for k in keys if k in cache}
def set(self, key, value, *args, **kwargs):
self.cache[key] = value
def delete(self, key, *args, **kwargs):
self.cache.pop(key, None)
def incr(self, key, delta=1):
return self.cache.incr(key, delta)
def touch(self, key, expire):
pass
backends = {
'memcache': get_best_memcache,
'memcached': get_best_memcache,
'pylibmc': get_best_memcache,
'memory': lambda: (DummyClient, ensure_bytes),
}

But the problem is that this peace of code is not global and hence it was shared only since result back-end was global. This was changed e203168#diff-acf0d4c4ddf0742d0699b98f51eec8823d1f22e7b76d0b4eb6f42ab27100188cL1193-R1214 and hence broke this result back-end. (for clarity see the line

'memory': lambda: (DummyClient, ensure_bytes),
where it can be seen that each back-end creates it's own instance of DummyCache class). The fix is easy: Just create global instance of DummyCache class and use it as client. The fix should look like this:

class DummyClient:

    def __init__(self, *args, **kwargs):
        self.cache = LRUCache(limit=5000)

    def get(self, key, *args, **kwargs):
        return self.cache.get(key)

    def get_multi(self, keys):
        cache = self.cache
        return {k: cache[k] for k in keys if k in cache}

    def set(self, key, value, *args, **kwargs):
        self.cache[key] = value

    def delete(self, key, *args, **kwargs):
        self.cache.pop(key, None)

    def incr(self, key, delta=1):
        return self.cache.incr(key, delta)

    def touch(self, key, expire):
        pass

    def __call__(self, *args, **kwargs):
        return self


DUMMY_CLIENT = DummyClient()


backends = {
    'memcache': get_best_memcache,
    'memcached': get_best_memcache,
    'pylibmc': get_best_memcache,
    'memory': lambda: (DUMMY_CLIENT, ensure_bytes),
}

I have tested the fix and seems to be working. I will create the PR later on.

swhmirror pushed a commit to SoftwareHeritage/swh-scheduler that referenced this issue Dec 7, 2020
This adapts the celery requirements to the last known where our builds are
fine. Currently, 5.0.3 got released and this ends up making all the swh modules
relying on tasks timeout. A bug upstream is opened [1].

In the mean time, this workaround fixes [2] and most probably the remaining swh
builds.

[1] celery/celery#6521

[2] https://jenkins.softwareheritage.org/job/DSCH/job/tests/1132/console
@matusvalo
Copy link
Member

@flying-sheep @anlambert See the PR with the fix: #6524. Could you recheck it?

@thedrow
Copy link
Member

thedrow commented Dec 7, 2020

I may release 5.0.4 with just this fix.

@flying-sheep
Copy link

PR works flawlessly, thank you!

@yuvalmarciano
Copy link

I may release 5.0.4 with just this fix.

Would really appreciate it!

@thedrow
Copy link
Member

thedrow commented Dec 8, 2020

I'm on it.

@purujitgoyal
Copy link

I'm on celery 5.1.2, I'm facing the same issue in a little bit of a different manner.

When I run this,

pytest_plugins = ["celery.contrib.pytest"]

def test_worker_initializes(celery_app, celery_worker):
    assert True

it works perfectly fine. However, if I add celery_worker_parameters, it starts to throw the TimeoutError:

import pytest
pytest_plugins = ["celery.contrib.pytest"]

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('default'),
         # 'perform_ping_check': False

    }

def test_worker_initializes(celery_worker):
    assert True

A quick workaround that I found was setting perform_ping_check to False in celery_worker_parameters. I'm not sure if that's the correct use for that or not.

@thedrow
Copy link
Member

thedrow commented Aug 8, 2021

I'm on celery 5.1.2, I'm facing the same issue in a little bit of a different manner.

When I run this,

pytest_plugins = ["celery.contrib.pytest"]

def test_worker_initializes(celery_app, celery_worker):
    assert True

it works perfectly fine. However, if I add celery_worker_parameters, it starts to throw the TimeoutError:

import pytest
pytest_plugins = ["celery.contrib.pytest"]

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('default'),
         # 'perform_ping_check': False

    }

def test_worker_initializes(celery_worker):
    assert True

A quick workaround that I found was setting perform_ping_check to False in celery_worker_parameters. I'm not sure if that's the correct use for that or not.

This is probably something else. Are your broker and backend available before you run the test suite?
I'd recommend filing a new bug report with a reproducible test case.

@TFiroozian
Copy link

I'm on celery 5.1.2, I'm facing the same issue in a little bit of a different manner.

When I run this,

pytest_plugins = ["celery.contrib.pytest"]

def test_worker_initializes(celery_app, celery_worker):
    assert True

it works perfectly fine. However, if I add celery_worker_parameters, it starts to throw the TimeoutError:

import pytest
pytest_plugins = ["celery.contrib.pytest"]

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('default'),
         # 'perform_ping_check': False

    }

def test_worker_initializes(celery_worker):
    assert True

A quick workaround that I found was setting perform_ping_check to False in celery_worker_parameters. I'm not sure if that's the correct use for that or not.

Were you able to resolve this issue? I'm also facing the same issue with celery 5.2.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment