Skip to content
This repository has been archived by the owner on Nov 3, 2020. It is now read-only.

Hanging in waiter.acquire(True, timeout) #96

Closed
lionfish0 opened this issue Jun 26, 2017 · 4 comments
Closed

Hanging in waiter.acquire(True, timeout) #96

lionfish0 opened this issue Jun 26, 2017 · 4 comments

Comments

@lionfish0
Copy link
Contributor

lionfish0 commented Jun 26, 2017

My python script hangs (inside search.fit). When I stop it, I find that it's blocked on line 299, gotit = waiter.acquire(True, timeout). I'm not 100% sure what's happened, but I can run it happily from the remote jupyter notebook (over the workers). I wonder if it's a python/pickle compatibility issue. I've just upgraded from 14.04 to 16.04 on the local machine, while the remote workers etc are all on 14.04 (due to #38). The version of python is slightly different, and I wonder if that's causing the problem?

On workers: Python 3.5.2 (ubuntu 14.04)
On local: Python 3.6.1 (ubuntu 16.04)

I don't know if that's the problem... I'll investigate more (I wonder how to get a bit more info from the workers??)

@lionfish0
Copy link
Contributor Author

I realised the workers produced logs in /var/log/dask-worker.log (I don't know if this is because I added distributed.worker: debug to .dask/config.yaml).

Anyway, it looks like this is the problem:

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f95d1e4f588> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 239, in handle_comm
    + str(msg))
TypeError: Bad message type.  Expected dict, got
  [{'key': 'cv-parameters-ffeaf7708bc5a807e04134ba8785cabf', 'op': 'release-task'}, {'key': 'array-ae6a72ba4372b6e6a66362a12d606e12', 'op': 'release-task'}]
distributed.utils - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 381, in log_errors
    yield
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.worker - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.core - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 260, in handle_comm
    result = yield result
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)

So looking at the version of ~/anaconda3/lib/python3.6/site-packages/distributed/protocol/numpy.py,

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        dt = header['dtype']
        if isinstance(dt, tuple):
            dt = list(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                strides=header['strides'])

        x = stride_tricks.as_strided(x, strides=header['strides'])

        return x

Compared to the version on the worker /opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py:

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        is_custom, dt = header['dtype']
        if is_custom:
            dt = pickle.loads(dt)
        else:
            dt = np.dtype(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                       strides=header['strides'])

        return x

My local version of distributed is

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.16.3'

while on the work it is:

Python 3.5.2 |Continuum Analytics, Inc.| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.17.1'

I ran conda update distributed on my local machine. Updating:
distributed: 1.16.3-py36_0 --> 1.17.1-py36_0

And it now works!

@mrocklin
Copy link
Member

mrocklin commented Jun 28, 2017 via email

@jmsking
Copy link

jmsking commented May 31, 2019

I realised the workers produced logs in /var/log/dask-worker.log (I don't know if this is because I added distributed.worker: debug to .dask/config.yaml).

Anyway, it looks like this is the problem:

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f95d1e4f588> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 239, in handle_comm
    + str(msg))
TypeError: Bad message type.  Expected dict, got
  [{'key': 'cv-parameters-ffeaf7708bc5a807e04134ba8785cabf', 'op': 'release-task'}, {'key': 'array-ae6a72ba4372b6e6a66362a12d606e12', 'op': 'release-task'}]
distributed.utils - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 381, in log_errors
    yield
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.worker - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.core - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 260, in handle_comm
    result = yield result
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)

So looking at the version of ~/anaconda3/lib/python3.6/site-packages/distributed/protocol/numpy.py,

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        dt = header['dtype']
        if isinstance(dt, tuple):
            dt = list(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                strides=header['strides'])

        x = stride_tricks.as_strided(x, strides=header['strides'])

        return x

Compared to the version on the worker /opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py:

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        is_custom, dt = header['dtype']
        if is_custom:
            dt = pickle.loads(dt)
        else:
            dt = np.dtype(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                       strides=header['strides'])

        return x

My local version of distributed is

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.16.3'

while on the work it is:

Python 3.5.2 |Continuum Analytics, Inc.| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.17.1'

I ran conda update distributed on my local machine. Updating:
distributed: 1.16.3-py36_0 --> 1.17.1-py36_0

And it now works!
Hi, lionfish0 @lionfish0
I can not find the log of workers in ~/var/log, can you share your .dask/config.yaml

@mrocklin
Copy link
Member

This project is ancient and probably not supported any longer. I recommend looking through https://docs.dask.org/en/latest/setup/cloud.html

In particular, many AWS users seem to like Dask on EMR.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants