Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Stream is closed" error when creating dask client in Python 2 #25

Closed
SergeySmith opened this issue Oct 18, 2018 · 7 comments
Closed

"Stream is closed" error when creating dask client in Python 2 #25

SergeySmith opened this issue Oct 18, 2018 · 7 comments

Comments

@SergeySmith
Copy link

I'm trying to test dask-yarn installation on my Hadoop cluster. I started with simple example:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='environment.tar.gz', worker_vcores=1, 
                      worker_memory='4GB', 
                      n_workers=4)

the YARN application starts successfully, I have 1 scheduler and 4 workers. One thing is I have the following error in the scheduler's container log

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://zzz:34495
distributed.scheduler - INFO -       bokeh at:                    :45834
distributed.scheduler - INFO - Register tcp://zzz:38718
distributed.scheduler - INFO - Starting worker compute stream, tcp://zzz:38718
distributed.core - INFO - Starting established connection
Future exception was never retrieved
future: <Future finished exception=KeyError('op',)>
Traceback (most recent call last):
  File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/data/hadoop/yarn/local/usercache/user/appcache/application_1537451980488_0012/container_e19_1537451980488_0012_01_000002/environment/lib/python3.6/site-packages/distributed/core.py", line 313, in handle_comm
    op = msg.pop('op')
KeyError: 'op'

(zzz is just scheduler's host IP); and the output log in the python shell:

18/10/18 12:08:18 INFO skein.Daemon: Submitting application...
18/10/18 12:08:18 INFO impl.YarnClientImpl: Submitted application application_1537451980488_0012
18/10/18 12:08:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/10/18 12:08:36 INFO skein.Daemon: Daemon shut down

Once I'm getting to the dask client:

client = Client(cluster)

I receive the stream closed error

/home/user/anaconda2/lib/python2.7/site-packages/distributed/comm/tcp.pyc in convert_stream_closed_error(obj, exc)
    124         raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
    125     else:
--> 126         raise CommClosedError("in %s: %s" % (obj, exc))
    127 
    128 

CommClosedError: in <closed TCP>: Stream is closed

Interestingly, that <Future finished exception=KeyError('op',)> appears again in the scheduler log every time I'm trying to create the client.

The environment:

  • Hadoop 2.8.1
  • Anaconda environment (pip freeaze):
asn1crypto==0.24.0
bokeh==0.13.0
certifi==2018.10.15
cffi==1.11.5
chardet==3.0.4
Click==7.0
cloudpickle==0.6.1
conda-pack==0.2.0
cryptography==2.3.1
cryptography-vectors==2.3.1
cytoolz==0.9.0.1
dask==0.19.4
dask-yarn==0.3.1
distributed==1.23.3
future==0.16.0
grpcio==1.14.1
heapdict==1.0.0
idna==2.7
Jinja2==2.10
locket==0.2.0
MarkupSafe==1.0
mkl-fft==1.0.6
mkl-random==1.0.1
msgpack==0.5.6
numpy==1.15.0
packaging==18.0
pandas==0.23.4
partd==0.3.9
patsy==0.5.0
protobuf==3.6.1
psutil==5.4.7
pyarrow==0.11.0
pycparser==2.19
pyOpenSSL==18.0.0
pyparsing==2.2.2
PySocks==1.6.8
python-dateutil==2.7.3
pytz==2018.5
PyYAML==3.13
requests==2.19.1
scikit-learn==0.19.1
scipy==1.1.0
six==1.11.0
skein==0.2.0
sortedcontainers==2.0.5
statsmodels==0.9.0
tblib==1.3.2
toolz==0.9.0
tornado==5.1.1
tqdm==4.26.0
tsfresh==0.11.0
urllib3==1.23
zict==0.1.3
@mrocklin
Copy link
Member

My guess is that this is due to mismatched versions between the scheduler and client. Can you verify that your client process has the same version of distributed that is in the environment that you're giving to the YarnCluster?

@SergeySmith
Copy link
Author

I just checked the distributed versions on client and environment sides and they appear to be the same

pip freeze | grep distributed
distributed==1.23.3

As a matter of fact I made the anaconda environment tar.gz file few days ago so it's fresh. I also checked the tornado and skein versions and they are also the same.

@SergeySmith
Copy link
Author

SergeySmith commented Oct 19, 2018

Update. I tried the same code under Python 3 and It works great. The scenario I tried initially was as follows: I installed Anaconda2 (python 2), created environment for YARN from this anaconda, called the client from anaconda's python 2 and got the error I described above. Repeating the same steps but with Anaconda 3 doens't produce any errors. So the problem is with Python 2.
As it's indicated in the scheduler log the dask on yarn uses Python 3 (environment/lib/python3.6) even though the environment was created and packed into tar.gz file under Python 2.

@SergeySmith SergeySmith changed the title "Stream is closed" error when creating dask client "Stream is closed" error when creating dask client in Python 2 Oct 19, 2018
@jcrist
Copy link
Member

jcrist commented Oct 19, 2018

Ah, that's it then. This has nothing to do with dask-yarn, this is general across all of Dask: you can't use Python 3 for the scheduler when you're using Python 2 for the client. Versions of all packages (including Python) should be the same across the scheduler, client, and workers.

As it's indicated in the scheduler log the dask on yarn uses Python 3 (environment/lib/python3.6) even though the environment was created and packed into tar.gz file under Python 2.

An environment created with conda and conda-pack is completely self contained, I don't see how an environment that packaged Python 2 could be running Python 3 - I suspect the environment you packaged is not the same as the one you're running locally. Is there a possibility that you may have created or packaged the environment incorrectly?

@SergeySmith
Copy link
Author

SergeySmith commented Oct 19, 2018

Well, I think the environment was packed correctly in some sense. I just discovered that anaconda2 creates environment with Python 3 being the default version even though Python 2 seems to be more logical default version created from anaconda 2.
At the end of the day mystery is solved and the issue can be closed.

@jcrist
Copy link
Member

jcrist commented Oct 19, 2018

Glad to hear you figured things out.

@jcrist jcrist closed this as completed Oct 19, 2018
@rodfloripa
Copy link

Is it possible to install dask-yarn on python2?
SergeySmith said he did it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants