Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Set changed size during iteration with celery prefork #1774

Closed
18 tasks done
claudinoac opened this issue Jul 27, 2023 · 5 comments
Closed
18 tasks done

Comments

@claudinoac
Copy link
Contributor

claudinoac commented Jul 27, 2023

In celery and kombu v5.3.1, we are constantly being hit by the error below
There are around 4 workers distributed across 3 different servers, listening to the same queues in prefork mode, with multiple processes (4 to 16 processes each).
Most of the workers are daemonized using celeryd init-script. Some of them are manually spawned through CLI for debug monitoring.
All of them are connected to the same redis broker
Some workers simply throw the error below and stop working, randomly.
We've been forced to restart those every few hours.
This was happening before v5.3, but from celery/celery#7162 we thought that issue would be fixed after that update, but that unfortunately didn't happen.

[2023-07-27 01:28:05,206: CRITICAL/MainProcess] Unrecoverable error: RuntimeError('Set changed size during iteration')
Traceback (most recent call last):
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/worker.py", line 202, in start
self.blueprint.start(self)
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 336, in start
blueprint.start(self)
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 726, in start
c.loop(*c.loop_args())
File "/opt/Conversus/venv/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/opt/app/venv/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 310, in create_loop
for tick_callback in on_tick:
RuntimeError: Set changed size during iteration

  • I have verified that the issue exists against the main branch of Celery.
  • This has already been asked to the discussions forum first (similar, but different issue)
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the main branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation (3.10).
  • I have tried reproducing the issue on more than one message broker and/or
    result backend (SQS) .
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system. (Debian, Amazon linux)
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies. (from v4.3.x to v5.3.1)

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

Environment & Settings

Celery version:

celery report Output:

software -> celery:5.3.1 (emerald-rush) kombu:5.3.1 py:3.8.6
            billiard:4.1.0 redis:3.5.3
platform -> system:Linux arch:64bit, ELF
            kernel version:4.14.209-160.339.amzn2.x86_64 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://:**@**:6379/0

task_queues: 
    (<unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>,
 <unbound Queue queue_1 -> <unbound Exchange queue_1(direct)> -> queue_1.#>,
 <unbound Queue queue_2 -> <unbound Exchange queue_2(direct)> -> queue_2.#>,
 <unbound Queue queue_3 -> <unbound Exchange queue_3(direct)> -> queue_3.#>,
 <unbound Queue queue_4 -> <unbound Exchange queue_4(direct)> -> queue_4.#>,
 <unbound Queue queue_5 -> <unbound Exchange queue_5(direct)> -> queue_5>,
 <unbound Queue queue_6 -> <unbound Exchange queue_6(direct)> -> queue_6.#>)
 <unbound Queue queue_7 -> <unbound Exchange queue_7(direct)> -> queue_7.#>,
task_routes: { ... undisclosed ... } 
beat_schedule: { ... undisclosed ... } 
timezone: 'UTC'
result_backend: 'redis://:********@****:6379/0'
broker_transport_options: 
 'visibility_timeout': 604800}
broker_url: 'redis://:********@****:6379/0'
redis_retry_on_timeout: True
redis_socket_keepalive: True
redis_backend_health_check_interval: 5
redis_password: '********'
broker_connection_max_retries: None
broker_pool_limit: None
accept_content: ['json']
task_always_eager: False
task_ignore_result: True
task_soft_time_limit: 36000
worker_pool_restarts: True
worker_prefetch_multiplier: 1
worker_max_memory_per_child: 500000
worker_proc_alive_timeout: 60
worker_send_task_events: True
task_send_sent_event: True

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: N/A or Unknown
  • Minimal Celery Version: N/A or Unknown
  • Minimal Kombu Version: N/A or Unknown
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

``` absl-py==1.4.0 amqp==5.1.1 asgiref==3.4.1 astunparse==1.6.3 backcall==0.2.0 backports.zoneinfo==0.2.1 beautifulsoup4==4.9.3 billiard==4.1.0 blis==0.7.9 botocore==1.30.0 cachetools==4.2.2 catalogue==2.0.6 celery==5.3.1 certifi==2021.10.8 charset-normalizer==2.0.7 clarifai==2.6.2 click==8.1.3 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 colorama==0.4.3 colour==0.1.5 confection==0.0.3 configparser==3.8.1 corextopic==1.1 cx-Oracle==8.2.1 cycler==0.11.0 cymem==2.0.5 decorator==5.0.9 dill==0.3.4 distro==1.6.0 Django==3.2.19 docutils==0.15.2 elasticsearch==7.16.2 et-xmlfile==1.1.0 flashtext==2.7 flatbuffers==23.3.3 flower==1.2.0 ftfy==6.0.3 future==0.18.2 gast==0.3.3 grpcio==1.32.0 h5py==2.10.0 hickle==4.0.4 httplib2==0.19.1 humanize==3.13.1 idna==3.3 ijson==3.1.4 inflection==0.5.1 jedi==0.18.0 Jinja2==3.0.1 jmespath==0.10.0 joblib==1.0.1 jsonschema==2.6.0 kiwisolver==1.3.2 kombu==5.3.1 langcodes==3.3.0 libclang==16.0.0 lxml==4.6.3 Markdown==3.3.4 MarkupSafe==2.0.1 mongoengine==0.23.1 mysqlclient==2.0.3 networkx==2.6.3 numpy==1.22.2 opt-einsum==3.3.0 packaging==21.0 pandas==1.3.2 parso==0.8.2 pathy==0.6.0 pendulum==2.1.2 pexpect==4.8.0 Pillow==9.4.0 preshed==3.0.5 prompt-toolkit==3.0.20 protobuf==3.17.3 psycopg2-binary==2.9.1 ptyprocess==0.7.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycurl==7.44.1 pydantic==1.8.2 Pygments==2.10.0 pylibmc==1.6.3 pymemcache==3.5.0 pymongo==3.12.0 pyparsing==2.4.7 python-dateutil==2.8.2 python-Levenshtein==0.12.2 pytz==2022.5 pytzdata==2020.1 PyYAML==5.4.1 redis==3.5.3 regex==2022.10.31 requests==2.26.0 rsa==4.7.2 scipy==1.7.1 setuptools-rust==0.12.1 simplejson==3.17.5 six==1.15.0 smart-open==5.2.1 soupsieve==2.2.1 sqlparse==0.4.1 srsly==2.4.5 termcolor==1.1.0 thinc==8.1.5 threadpoolctl==2.2.0 ThreeScalePY==2.6.0 tokenizers==0.12.1 toml==0.10.2 tornado==6.1 tqdm==4.62.2 traitlets==5.1.0 treelib==1.6.1 typer==0.4.2 typing_extensions==4.1.1 tzdata==2023.3 uritemplate==3.0.1 urllib3==1.26.7 vine==5.0.0 wasabi==0.10.1 wcwidth==0.2.5 websocket-client==0.48.0 wrapt==1.12.1 zope.event==4.6 zope.interface==5.5.2 ```

Other Dependencies

Minimally Reproducible Test Case

It's not that easily reproducible as it happens after running and processing a bunch of tasks for a few hours

Expected Behavior

Keep running the worker non-stop

Actual Behavior

At some point, the worker throws the error and gracefully stop all the processes, as it claims to be an Unrecoverable Error

@jaroslawporada
Copy link

Hi,
I have same issue and I can reproduce it by having following preconditions:

  • set timeout in redis server 127.0.0.1:6379> config set timeout 60,
  • have for example one worker with 2 preforked subworkers,
  • run on all 2 subworkers tasks with long sleep and ensure this worker prefetch all possible tasks.
    Zrzut ekranu z 2023-09-14 15-18-35
    Following timeline occur during this execution:
  • two tasks are started,
  • after redis timeout (two tasks are running and nothing is happening on that connection) events = poll(poll_timeout) start to return in very shorts periods of time <bound method Transport.on_readable of <kombu.transport.redis.Transport object at 0x7f6e8222aad0>> (21,)
    if chan.qos.can_consume(): is False because prefetch is full, cpu on this process is highly utilized and memory allocation/s is also high due to self._ready = set()

image

  • after task is completed in new kombu this error occur
Traceback (most recent call last):
 File "/usr/local/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
   self.blueprint.start(self)
 File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
   step.start(parent)
 File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
   return self.obj.start()
          ^^^^^^^^^^^^^^^^
 File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
   blueprint.start(self)
 File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
   step.start(parent)
 File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 742, in start
   c.loop(*c.loop_args())
 File "/usr/local/lib/python3.11/site-packages/celery/worker/loops.py", line 97, in asynloop
   next(loop)
 File "/usr/local/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 310, in create_loop
   for tick_callback in on_tick:
   RuntimeError: Set changed size during iteration
in old kombu I had this error:
    Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 628, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.11/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/usr/local/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1326, in on_readable
    self.cycle.on_readable(fileno)
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 562, in on_readable
    chan.handlers[type]()
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 955, in _brpop_read
    dest__item = self.client.parse_response(self.client.connection,
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/client.py", line 1286, in parse_response
    response = connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 882, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 349, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 359, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 262, in readline
    self._read_from_socket()
  File "/usr/local/lib/python3.11/site-packages/redis/connection.py", line 215, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.

Library version for old/new errors:
Zrzut ekranu z 2023-09-14 15-07-24

@auvipy auvipy added this to the 5.4 milestone Nov 23, 2023
@auvipy
Copy link
Member

auvipy commented Nov 23, 2023

would be happy to get a PR which fix the issue

@claudinoac
Copy link
Contributor Author

@auvipy I'll give it a try :)

@claudinoac
Copy link
Contributor Author

claudinoac commented Nov 25, 2023

On that exact point where we observe

File "/usr/local/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 310, in create_loop
   for tick_callback in on_tick:
   RuntimeError: Set changed size during iteration

I've inspected the content of on_tick and here's what I've found.
on_tick has changed from

{
    <function Transport.register_with_event_loop.<locals>.on_poll_start at 0x7f01bbb71700>,
    <function AsynPool._create_write_handlers.<locals>.on_poll_start at 0x7f020d9ea430>
}

to

 {<function AsynPool._create_write_handlers.<locals>.on_poll_start at 0x7f020d9ea430>}

When I've done a shallow copy of on_tick right before iterating, the server kept running, but at given point it simply stopped consuming the queues. Transport.register_with_event_loop.<locals>.on_poll_start maps back to the kombu/transport/redis.py module.

I also tried to force redis to timeout while running cpu-heavy operations locally, but the worker returns the right error ConnectionError and restarts itself.
This specific issue keeps happening only in production.

@claudinoac
Copy link
Contributor Author

claudinoac commented Dec 5, 2023

Hey eveyone, quick update.
After a while going over this, I've been finally able to reproduce the issue

Steps:

  • Create a cpu-bound task like the following one:
@app.task
def test_stress():
    curr_time = timezone.now()
    while curr_time + timedelta(seconds=60) > timezone.now():
        10 * 10
  • Set redis timeout to some value less than to task expected execution (in this example, the task takes around 60 sec, so I've put 20 sec)
  • Schedule the task many times (enough to fill all workers threads/processes and preftech)
  • The first task to finish will throw the error, making the worker start the teardown process, waiting for the other tasks to finish and reenqueue the prefetch.

Now that I can reliably reproduce it, I'll dig into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants