Tasks are not allowed to start subprocesses #1709

Closed
aromanovich opened this Issue Nov 29, 2013 · 56 comments

Projects

None yet
@aromanovich

Starting with Celery 3.1.0 the processes pool (celery.concurrency.prefork, former celery.concurrency.processes) uses daemon processes to perform tasks.

Daemon processes are not allowed to create child processes and, as a result, tasks that use multiprocessing package are not working:

[2013-11-29 14:27:48,297: ERROR/MainProcess] Task app.add[e5d184c0-471f-4fc4-804c-f760178d4847] raised exception: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 218, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 398, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/aromanovich/Projects/celery/app.py", line 10, in add
    manager = multiprocessing.Manager()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 99, in Manager
    m.start()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 524, in start
    self._process.start()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
    'daemonic processes are not allowed to have children'
Owner
ask commented Dec 2, 2013

This has not changed between 3.0 and 3.1, so I'm not sure why you would get this error now and not before.

This is how can this error be reproduced.

app.py:

import multiprocessing
from celery import Celery

app = Celery(__name__, broker='amqp://192.168.33.40')
@app.task
def f():
    manager = multiprocessing.Manager()

sendtask.py:

import app

app.f.delay()

I run worker using the following command: celery worker -A app.app -l debug.

With Celery 3.0.24 task succeeds:

[2013-12-02 20:43:56,454: INFO/MainProcess] Task app.f[bcaab028-dbec-43a8-9259-ff7c35ff13d0] 
succeeded in 0.0169339179993s: None

With Celery 3.1.5 it does not:

[2013-12-02 20:48:38,946: ERROR/MainProcess] Task app.f[c9f1cdd3-ae38-493e-b7c7-b9636ed473d0] 
raised exception: AssertionError('daemonic processes are not allowed to have children',)

My understanding of the issue is the following: celery.concurrency.prefork.TaskPool uses celery.concurrency.asynpool.AsynPool; AsynPool inherits from billiard.pool.Pool which spawns daemon worker processes and AsynPool does not override this behaviour. But you're right, this scheme does not seem to be changed between 3.0 and 3.1, so I'm confused too :)

And it seems that I'm not alone with that problem: http://stackoverflow.com/questions/20149421/threads-in-celery-3-1-5

Owner
ask commented Dec 2, 2013

One difference is that the worker process is now a subclass of 'Process', where before it used the function argument: Process(target=), maybe there is a difference in default values for these approaches.

Owner
ask commented Dec 2, 2013

multiprocessing and old versions of billiard sets daemon=True:
https://github.com/celery/billiard/blob/2.7/billiard/pool.py#L904

And it's the same in the latest version:
https://github.com/celery/billiard/blob/3.3/billiard/pool.py#L1039

ilyastam commented Dec 2, 2013

I think that task process being a daemon presents a serious limitation for tasks implementation.
I wrote a task which uses multiprocessing to speed up CPU-bound operations. Everything works fine when I start a worker in a terminal as follows:

celery worker --app=tasks -Q wb -l info --concurrency=1

But when I use celeryd script to start a worker, I get this exception:
AssertionError: daemonic processes are not allowed to have children

I figured out what caused the change in the behaviour.
Tasks are run using daemon processes both in 3.0 and 3.1, but until celery/billiard@4c32d2e and celery/billiard@c676b94 multiprocessing module wasn't aware of that and hence was allowing creating subprocesses.

To my understanding, there was a bug prior to version 3.1 (tasks were allowed to create subprocesses, which could result in orphaned state) and now this bug has been fixed.

ilyastam commented Dec 3, 2013

The decision to not allow python daemon processes to fork seems rather arbitrary to me. While I recognize the good faith of it, I feel like I should be able to have a full control over this behavior if I choose to.

Being bound to one process per task seems to be a serious limitation to me. Thoughts?

Owner
ask commented Dec 3, 2013

I wonder why that limitation is there in the first place, a warning I can understand but outright disallowing it seems silly when you are perfectly able to fork processes using other means.

ilyastam commented Dec 3, 2013

@ask, would that be possible to initialize celery worker process with daemon flag being False? Or make this configurable?

Owner
ask commented Dec 3, 2013

@ilyastam seems we were commenting at the same time

I agree that it seems like an arbitrary limitation, but I wish I knew the rationale behind adding it in the first place.

This is a well known pitfall in posix systems, but it's still allowed. You may clean up child processes in a signal handler, though that does not protect you against SIGKILL.

I think we should remove the limitation from billiard, even though that would diverge from the multiprocessing behavior. You can still create child processes using the subpocess module or using the low level fork call, so power users should be able to create child billiard.Process instances.

Owner
ask commented Dec 3, 2013

@ilyastam Should be able to remove the raise statement, don't have to make the processes "non-daemon"

That is, daemon processes will be allowed to create child processes even if it will not be able to reap them,
which is how posix works anyway.

Owner
ask commented Dec 3, 2013

Btw, note that this is not a raise, it's an assert statement, which will be removed if python is started with the PYTHONOPTIMIZE envvar or the -O argument.

Owner
ask commented Dec 3, 2013

billiard 3.3.0.11 is on PyPI including this change

ilyastam commented Dec 3, 2013

@ask thank you. Any idea what version of celery will see this improvement?

multiprocessing documentation explicitly states that daemon process are not allowed to create subprocesses and explains why. As to me, this assert statement looks more like it was put here as a shortcut for raise (people often do that).

This limitation is documented and I don't think that it is a good idea for Celery to silently monkey-patch multiprocessing and take it away. It could lead to really unexpected and harmful consequences.

I can think of the following example (it may seem a bit contrived, though):

@app.task
def f():
    p = multiprocessing.Pool(3)
    p.map_async(time.sleep, [1000, 1000, 1000])

Being run as a plain Python function, this code works correctly. But being run as a Celery task (using Celery version 3.0.*), it leaves three subprocesses that will hang forever; when the Celery worker quits, these subprocesses will become orphaned.

Owner
ask commented Dec 3, 2013

It doesn't explain why, it just states the unix behavior that you would expect when starting a child-child process. Even though it's an infamous limitation in unix it doesn't stop people from doing it. This is no different from
starting a subprocess.Popen process, or even calling fork() to start a new process. So why should it be illegal?

The way to do your example:

from billiard import Pool
from multiprocessing.util import Finalize

_finalizers = []

@app.task
def f():
    p = billiard.Pool(3)
    _finalizers.append(Finalize(p, p.terminate))
   try:
       p.map_async(time.sleep, [1000, 1000, 1000])
       p.close()
       p.join()
   finally:
       p.terminate()

To kill (-9) this you would have to also kill -9 the child processes, but that is something you will have
to consider for all unix processes.

Not that I advocate creating a Pool for every task, but I don't see why users, who know what they're
doing, shouldn't be allowed to do start processes from a task.

Also, we don't monkey patch anything this is a change in billiard only.

Also, we don't monkey patch anything this is a change in billiard only.

By "monkey patching" I mean this assignment, which replaces multiprocessing._current_process with an instance of billiard.process.Process: https://github.com/celery/billiard/blob/master/billiard/process.py#L53.

I agree that there is nothing wrong with starting child-child processes if they are handled right (like in your example). My point is that multiprocessing is not written that way and we should not ignore it's implementation limitations.

Owner
ask commented Dec 9, 2013

@aromanovich It cannot be written any other way, it's not a limitation of multiprocessing it's a limitation of unix.

It sets _current_process so that the logging modules processName format variable works, and the billiard process object has the same API as the multiprocessing process object so it's safe to set the current process.

Owner
ask commented Dec 9, 2013

And btw, you would have to use billiard for the limitation to be lifted, using multiprocessing will still raise the exception.

@ghost Unknown referenced this issue in celery/billiard Jul 23, 2014
Closed

_ack() takes exactly 4 arguments (5 given) #99

@ghost
ghost commented Jul 23, 2014

Could also fix this issue using this approach:
http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
Which would allow users to continue using the multiprocessing module, avoiding this issue:
celery/billiard#99

I get this error when calling a @parallel fabric task from within a celery task.

@celery.task
def dostuff():
   execute(fabfile.push_settings, sid=site['sid'])

@parallel
@roles(environment)
def push_settings(sid):
  #do stuff
xiaods commented Jan 26, 2015

@frodopwns use ENV
export PYTHONOPTIMIZE=1
to remove this assert. you need handle all things.

@xiaods I think I solved that issue with something like this:

@worker_process_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    Crypto.Random.atfork()

Problem

I have a task which calculates some data and loads a scikit-learn classifier to make predictions based on that data. When I run the task by itself, everything is OK, but when I run it using Celery, I get an error when the task attempts to load the pickled classifier:

[2015-07-17 21:23:51,299: ERROR/MainProcess] Task app.f[329d0da4-2e0e-4e1f-8148-d64f47750b1f] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",)
Traceback (most recent call last):
  File "/home/username/anaconda3/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/username/anaconda3/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/username/working/playground/celery/app.py", line 11, in f
    clf = pickle.load(open('clf.pickle', 'rb'))
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/ensemble/__init__.py", line 6, in <module>
    from .base import BaseEnsemble
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/ensemble/base.py", line 13, in <module>
    from ..externals.joblib import cpu_count
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/__init__.py", line 112, in <module>
    from .parallel import Parallel
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/parallel.py", line 23, in <module>
    from ._multiprocessing_helpers import mp
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/_multiprocessing_helpers.py", line 25, in <module>
    _sem = mp.Semaphore()
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/context.py", line 81, in Semaphore
    return Semaphore(value, ctx=self.get_context())
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 127, in __init__
    SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'

To reproduce

Create an empty classifier and save it as a pickle:

import pickle
from sklearn.ensemble import GradientBoostingClassifier
clf = GradientBoostingClassifier()
pickle.dump(clf, open('clf.pickle', 'wb'))

Create a simple app (app.py):

import pickle
import sklearn
from celery import Celery

app = Celery(__name__, broker='amqp://localhost//')

@app.task
def f():
    print('hello')
    clf = pickle.load(open('clf.pickle', 'rb'))
    print(clf)

Start the celery worker:

celery -A app worker --loglevel=debug

Run the app:

python -c "from app import f; f.delay()"

Error message:

...
AttributeError: 'Worker' object has no attribute '_config'

Solution

I think there should be an option to "monkeypatch" Celery to allow tasks to start sub-processes, especially if such a "feature" existed in the past. Right now, people are simply moving away to other frameworks when they encounter this problem: http://stackoverflow.com/questions/27904162/using-multiprocessing-pool-from-celery-task-raises-exception. Here is another example of this error: http://stackoverflow.com/questions/22674950/python-multiprocessing-job-to-celery-task-but-attributeerror.

This issue should be re-opened...

@thedrow thedrow reopened this Jul 18, 2015
martinth commented Aug 3, 2015

I just ran into the same problem. I'm using nltk inside one of my workers which in turn imports scikit-learn which leads to the same error @ostrokach showed.

It seems that I'm able to work around this with the following code:

from celery.signals import worker_process_init

@worker_process_init.connect
def fix_multiprocessing(**kwargs):
    from multiprocessing import current_process
    try:
        current_process()._config
    except AttributeError:
        current_process()._config = {'semprefix': '/mp'}

This obviously is a very crude hack and I don't know what would happen if I would really use multiprocessing (heck I don't even know what semprefix is) but it is enough to make scikit-learn work again.

I'm leaving this here for other people who stumble over the same problem until this issue is fixed.

Contributor
thedrow commented Aug 3, 2015

Could this be some kind of incompatibility with billiard on Python 3? Or is it reproducible on Python 2 as well?

gilinson commented Aug 4, 2015

Is the original issue of celery processes not being able to create subprocesses still an Issue? Looking back through the comments, it was was fixed with celery/billiard@e6bb0f7 for version 3.3. However a later commit (celery/billiard@c7eedbd) reintroduced a very similar assert statement into the start method for 3.4. No issue for me since I am on 3.3, but just wanted to point it out if it could create problems in the future.

@martinth Thanks, this hack works for me too!

@xiaods Thank you! Your solution works for me! Thanks!

@gilinson it's still an issue and export PYTHONOPTIMIZE=1 still "kinda fixes" it.
Just ran into the same problem, trying to run ansible playbook in Celery task

@martinth Thanks for the hack! I'm running into the same issue with:

  • Python 3.4.3
  • celery==3.1.18
  • scikit-learn==0.17

@martinth's hack doesn't work for me, ran into this trying to use multiprocessing to speed up some computation. Used the threading module instead and it seems to relieve this error for me while still breaking up my processing.

razzius commented Apr 9, 2016

Using the thread-based multiprocessing.dummy worked in celery for me:

from multiprocessing.dummy import Pool

@benrudolph benrudolph referenced this issue in dimagi/commcare-hq Apr 10, 2016
Merged

Captain #11190

fyb3r commented Apr 29, 2016

This error is still occurring in python 2.7.5 as well. I'm not sure if it is intended to ever address it, but this makes using saltstack's salt-ssh un-usable with celery.

Owner
ask commented Jun 23, 2016

Closing this, as we don't have the resources to complete this task.

@ask ask closed this Jun 23, 2016
orzel commented Jun 26, 2016

Possible "solution"

I had such a task that was trying to create threads and this would fail. I managed to have it working by : forking to a bash script that itself forks to a python interpreter that does the same exact code (and hence could create threads, which was critical for my use case).

orzel commented Jun 26, 2016 edited

I dont understand why the ticket is closed. If you dont have resource for that, you might make a comment about this, but this doesn't close the ticket. You're just hiding the bug doing so.

Which is especially bad for a ticket labelled "critical' both for priority and severity.

@orzel +1.
Priority: Critical
Severity: Critical
Closing this, as we don't have the resources to complete this task.

That is a joke. If you don't have resources now - then don't fix it now. Fix it when you will have resources. Closing the ticket will not remove the problem

Contributor
thedrow commented Jun 26, 2016

@orzel @Templarrr I labeled this ticket as Critical so @ask is not to blame here.
You might be unhappy about this, but protesting won't help.
We need to groom our backlog based on what's actionable and what isn't and currently this isn't.
It's a tough call to make but someone has to make it.
If this issue is in your way, try to fix it. I promise that if the fix is correct and has the appropriate tests I'll merge it.

orzel commented Jun 28, 2016

@thedrow
You misunderstood. Twice.
Our concern is not that you dont have the resource (this is perfectly understandable, and, sadly, a very common case in free software). Our concern is that the ticket is closed because of that, this is not how tickets work.
We are not 'unhappy', we are shocked.

I also completely disagree with closing this.

I think we can all agree that this is indeed a bug. And while it's indeed sad that there are not enough resources closing a definite bug won't help this. You can't possibly know if maybe tomorrow someone comes a long and thinks "lets fix some bugs in Celery" just to look through the open issues and thinks "Well, there isn't any interesting work to do here...let's work on OtherProject instead".
Additionally, closing this issue will make it harder to find it. I don't know how you use Github, but when I discover a potential problem I first search the issue tracker for open issue. Commonly there is a lot of discussion and more the never there is also a workaround (like in this case) I can use for now. Only if I get really desperate I start looking through the closed issues.

This is not "backlog grooming" it's number tweaking. If I look at things to use I do look at the open issue count but I also always look at the star count (which is pretty high for celery). I understand that is desirable to have a low bug count for public appeal but also for your own sake. Honestly, I understand that seeing "250 open issues" is not a nice number and sounds overwhelming.

If you don't have the manpower to work on this in the next month (or even year) this is fine. Just don't close. A close should only happen if the issue is either done or it's absolutely clear that it will never be done. Both are not the case here.

Just remove the "Critical" flags and add a "Postponed" flag for anything that can't handled now but should be handled if the resource are available.

Owner
ask commented Jun 28, 2016

I'm not sure we can actually fix the issue. We cannot change how unix works but we could submit a patch upstream to lift the restriction?

Maybe there are platform specific solutions in Linux, but that would have to be researched. It's been open for 2 years without anyone having the incentive to fix it, so it's unlikely to be fixed in the near feature.

I closed 200+ issues and marked well over 30k emails as read, so some of them are bound to be contentious, and we may have to reopen these. I completely expect that, but then it would be nice if we could also contribute to a solution, e.g. by helping to document the defect if that is the only known option.

We are swamped with work, trying to operate a huge project with no resources. We are unable to triage issues or to find out what issues have been fixed already.

Well..okay. But could the fact that "You can't use multiprocessing if you write code for a celery worker" at least be documented? I mean...there will always be people who don't read it but at least you can point towards it and say "See, it's documented. We can't change it. Deal with it."

Owner
ask commented Jun 28, 2016

My todo list is gigantic, you can edit the documentation directly on github now so it's really easy to contribute changes like these :(

I don't do this to hide issues, I'm doing this to whip people into action precisely because I want to see it improved.

@ns-codereview ns-codereview pushed a commit to couchbase/perfrunner that referenced this issue Jun 29, 2016
@pavel-paulau @pavel-paulau pavel-paulau + pavel-paulau Upgrade Celery to version 3.1.23
Celery 3.0.x is not compatible with Python 2.7.11. We need to fix
this issue before moving to Ubuntu 16.04.

Reference:
celery/kombu#545
celery/celery#1709

Change-Id: I4ee2b14d668fcadb565aed006fd9abc1ab5d52f8
Reviewed-on: http://review.couchbase.org/65216
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Pavel Paulau <pavel.paulau@gmail.com>
dffc742

@ask Can we apply multiprocessing inside task using celery in django ?
Is there any alternative to do so ?

@abhisheksachan you should read all this issue before posting such a question

@abhisheksachan I haven't tried this in a couple of years, but I had gotten it working using https://pypi.python.org/pypi/billiard because it allows daemonization of subprocesses.

Owner
ask commented Aug 4, 2016 edited

Yeah, you have to replace imports from 'multiprocessing' with 'billiard', for example:

from multiprocessing import Process

->

from billiard import Process

There's no way for us to disable the multiprocessing limitation, but we argue there shouldn't be a limitation anyway so our multiprocessing fork allows it.

For anyone, who like me, invested in developing a queueing system BEFORE finding out about that limitation and needs a different workaround until they can migrate to a more useable rabbitMQ python wrapper, I managed to work around the issue by calling an external subprocess that can fork itself cleanly. That forked process is now outside the celery sandbox and things work as it should.

In the OP example, replace:

app = Celery(__name__, broker='amqp://192.168.33.40') 
@app.task
def f():
    manager = multiprocessing.Manager()

with:

app = Celery(__name__, broker='amqp://192.168.33.40')
@app.task
def f():
    process = subprocess.Popen(["program"]) # or the newer post 3.5 run version
    process.wait()
    # analyze exit code

and the "program" will look like (under POSIX unix/linux platform)

import os

def main():
      manager = multiprocessing.Manager()

# this is equivalent to "(cmd )&" under bash
pid = os.fork()
if pid == 0:
    cpid = os.fork()
    if cpid == 0:
        main()
    else:
        exit(0)
else:
    os.wait(pid)

Bear in mind that CPU managment escapes the scope of celery which kinda goes against the idea of using celery, but given you were going to use multiprocessing, you probably want to handle CPU usage outside from celery anyway.

At the very least that limitation should be documented. I looked around in the doc and couldn't find.

Contributor
thedrow commented Oct 5, 2016

Again, feel free to submit a pull request with the documentation changes.

miraculixx commented Nov 19, 2016 edited

following up on @martinth's comment, on Python 3.5.2, Celery 4.0.0 and billiard 3.5.0 his solution did not work, as multiprocessing checks on the process being daemonized and stops it from starting a child.

I was able to lift the restriction by resetting the worker's daemon flag. I'm pretty sure that's a bad idea, but it allows starting multiprocessing.Pools from within a celery task.

@worker_process_init.connect
def fix_multiprocessing(**kwargs):
    # don't be a daemon, so we can create new subprocesses
    from multiprocessing import current_process
    current_process().daemon = False

That said, IMHO Celery should add a documented option to configure whether it starts workers as deamons. Note I'm using celery in a k8 pod so celery is started as a foreground process using celery worker, and I really don't need want daemonized workers.

Contributor
thedrow commented Nov 20, 2016

@miraculixx The problem with this suggestion is that we'll have more fail modes to handle and more issues to address. We'd rather avoid those.

@pyup-bot pyup-bot referenced this issue in impactlab/oeem-energy-datastore Nov 23, 2016
Open

Initial Update #62

Though using multiprocessing fails in combination with the prefork pool, it seems to work when using the solo pool. So I guess a workaround would be to spawn multiple celery workers with the solo pool, instead of one with multiple children in prefork pool. Does this sound legit? Of course that way some options such as max-mem-per-child won't work.

rapliandras commented Mar 16, 2017 edited

I think this is basically an application design issue. It's a special pain facing daemonic processes are not allowed to have children, because you know, you reached a point when you have to redesign the whole application. But it's an OS level limitation, you can't circumvent it without serious side effects. Daemonic processes also can't have children in C. This is not Python-specific stuff. There used to be a debate about thread vs process performance, and as conclusion turned out that none of them is significantly better or worse than the other.

I suggest two options (and talking in general, not about celery here)

  • Use subprocess.Popen to spawn an independent process, which can have children and use UNIX sockets for inter-process communication
  • Is it really neccessary for the threads to be spawned by a forked process and not your main process?

For what it's worth, my use case back then was that I wanted to launch a long running subprocess that would often crash quickly because of non-trivial (and non security sensitve) input issues. So the idea was to at least make sure the process launched successfully.

It turned out in the long term to be a poor design for various reasons so the new architecture naturally reverted to the "natural" use of async celery workers. So I agree with the idea of questioning if forking is really necessary; the task is the fork.

for what it's worth, my use case was to launch scikit-learn processes which uses multiprocessing (through joblib). I have since developed a celery backend to joblib which means scikit-learn launches parallel processes using celery and my above hack is no longer required. This is in a POC stage, not ready for prime-time just yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment