Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask distributed , fail to start worker #2446

Closed
redsum opened this issue Jan 1, 2019 · 8 comments
Closed

dask distributed , fail to start worker #2446

redsum opened this issue Jan 1, 2019 · 8 comments

Comments

@redsum
Copy link

redsum commented Jan 1, 2019

I get timeout error when trying to tart a localCluster with 4 process or more

        cluster = LocalCluster(processes=True, n_workers=4, death_timeout=None)#, silence_logs=logging.DEBUG)
        client = Client(cluster)

I'm using dask 1.25.1 with python 2.7 running over mac

This happens also happen during tests
i modify the dask distributed test test_procs found in distributed/deploy/tests/test_local.py
like this

def test_procs():
    with LocalCluster(4, scheduler_port=0, processes=True, threads_per_worker=3,
                      diagnostics_port=None, silence_logs=False) as c:
        assert len(c.workers) == 2
        assert all(isinstance(w, Nanny) for w in c.workers)
        with Client(c.scheduler.address) as e:
            assert all(v == 3 for v in e.ncores().values())

            c.start_worker()
            assert all(isinstance(w, Nanny) for w in c.workers)
        repr(c)

i set the n_workers to 4 instead of 2
and i get this error

test_local.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../local.py:141: in __init__
    self.start(ip=ip, n_workers=n_workers)
../local.py:174: in start
    self.sync(self._start, **kwargs)
../local.py:167: in sync
    return sync(self.loop, func, *args, **kwargs)
../../utils.py:277: in sync
    six.reraise(*error[0])
../../utils.py:262: in f
    result[0] = yield future
../../../../../anaconda/lib/python2.7/site-packages/tornado/gen.py:1133: in run
    value = future.result()
../../../../../anaconda/lib/python2.7/site-packages/tornado/concurrent.py:269: in result
    raise_exc_info(self._exc_info)
../../../../../anaconda/lib/python2.7/site-packages/tornado/gen.py:1141: in run
    yielded = self.gen.throw(*exc_info)
../local.py:194: in _start
    yield [self._start_worker(**self.worker_kwargs) for i in range(n_workers)]
../../../../../anaconda/lib/python2.7/site-packages/tornado/gen.py:1133: in run
    value = future.result()
../../../../../anaconda/lib/python2.7/site-packages/tornado/concurrent.py:269: in result
    raise_exc_info(self._exc_info)
../../../../../anaconda/lib/python2.7/site-packages/tornado/gen.py:883: in callback
    result_list.append(f.result())
../../../../../anaconda/lib/python2.7/site-packages/tornado/concurrent.py:269: in result
    raise_exc_info(self._exc_info)
../../../../../anaconda/lib/python2.7/site-packages/tornado/gen.py:1147: in run
    yielded = self.gen.send(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LocalCluster('tcp://127.0.0.1:55398', workers=3, ncores=9)
death_timeout = 60, kwargs = {'ncores': 3, 'quiet': True, 'services': {}}
W = <class 'distributed.nanny.Nanny'>, w = <Nanny: None, threads: 3>

    @gen.coroutine
    def _start_worker(self, death_timeout=60, **kwargs):
        if self.status and self.status.startswith('clos'):
            warnings.warn("Tried to start a worker while status=='%s'" % self.status)
            return
    
        if self.processes:
            W = Nanny
            kwargs['quiet'] = True
        else:
            W = Worker
    
        w = W(self.scheduler.address, loop=self.loop,
              death_timeout=death_timeout,
              silence_logs=self.silence_logs, **kwargs)
        yield w._start()
    
        self.workers.append(w)
    
        while w.status != 'closed' and w.worker_address not in self.scheduler.workers:
            yield gen.sleep(0.01)
    
        if w.status == 'closed' and self.scheduler.status == 'running':
            self.workers.remove(w)
>           raise gen.TimeoutError("Worker failed to start")
E           TimeoutError: Worker failed to start

../local.py:224: TimeoutError
@mrocklin
Copy link
Member

mrocklin commented Jan 4, 2019

I've run into issues when using fork and creating and destroying many workers that seems like this. I personally haven't bothered to track it down because most people today seem to be using forkserver in Python 3. I agree with you that it would be good to solve though. If you have any time to devote to tracking this down and resolving it that would be welcome.

@redsum
Copy link
Author

redsum commented Jan 6, 2019

there is a library named billiard
billiard is a fork of the Python 2.7 multiprocessing package
it include the option to use forkserver and spawn
It is used by celery

Would you consider using it to solve this issue?

@redsum
Copy link
Author

redsum commented Jan 10, 2019

I run the following code in loop

test1.py

    with contextlib2.ExitStack() as es:
          cluster = LocalCluster(processes=True, n_workers=4, death_timeout=None)#, silence_logs=logging.DEBUG)
          client = Client(cluster)
          es.callback(client.close)
          es.callback(es.callback(client.close))


like this

    while [ 1==1 ]; do  python test1.py ; echo "--------------done----------"; done

after less then 10 iteration . the code is in endless loop
it is stuck in site-packages/distributed/utils.py:sync inside this block:

            while not e.is_set():
                e.wait(10)

all the child process where created but didn't complete the worker initialisation

callling
ps -ef | grep -i 20732
shows


    lc       20732 23216 13 09:24 pts/8    00:00:06 python test1.py
    lc       20737 20732 10 09:24 pts/8    00:00:04 dask-worker [tcp://127.0.0.1:41411]
    lc       20741 20732  0 09:24 pts/8    00:00:00 python test1.py
    lc       20746 20732 10 09:24 pts/8    00:00:04 dask-worker [tcp://127.0.0.1:35800]
    lc       20753 20732 10 09:24 pts/8    00:00:04 dask-worker [tcp://127.0.0.1:46858]

process 20741 is a dask-worker that didn't complete the initialisation and didn't change it name yet

calling
strace -p 20741
shows:

    strace: Process 20741 attached
    futex(0xa4ed90, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, NULL, ffffffff

process 20741 is the worker that is stuck in deadline , due to a lock that was forked from the parent

@mrocklin
Copy link
Member

Would you consider using it to solve this issue?

I think it would be reasonable to allow people to come with their own multiprocessing library. We might not want to depend on billiard explicitly though (though I don't know enough about the library to judge it well).

Thank you for the analysis above. If you are able to track down the source of the issue and provide a fix that would be very welcome.

@redsum
Copy link
Author

redsum commented Jan 11, 2019

Here is the stack trace of process when it is deadlocked
It is trying to acquire the lock of the logger

Traceback (most recent call first):
  Waiting for the GIL
  File "/usr/lib/python2.7/threading.py", line 174, in acquire
    rc = self.__block.acquire(blocking)
  File "/usr/lib/python2.7/logging/__init__.py", line 212, in _acquireLock
    _lock.acquire()
  File "/usr/lib/python2.7/logging/__init__.py", line 1041, in getLogger
    _acquireLock()
  File "/usr/lib/python2.7/logging/__init__.py", line 1574, in getLogger
    return Logger.manager.getLogger(name)
  File "/usr/local/lib/python2.7/dist-packages/distributed/process.py", line 156, in reset_logger_locks
    for handler in logging.getLogger(name).handlers:
  File "/usr/local/lib/python2.7/dist-packages/distributed/process.py", line 165, in _run
    cls.reset_logger_locks()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)

@mrocklin
Copy link
Member

mrocklin commented Jan 13, 2019 via email

@redsum
Copy link
Author

redsum commented Jan 13, 2019

it may be fixed just to fail on the next lock
any other library used by dask can cause this kind of issue
so , i think that the best option is to use forkserver or spawn , this could be done using billiard

@redsum
Copy link
Author

redsum commented Feb 21, 2019

A workaround , was to use start with one worker and scale 1 by 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants