Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO layer refactor #810

Merged
merged 82 commits into from Jan 30, 2017

Conversation

@pitrou
Copy link
Member

commented Jan 12, 2017

This refactor creates a distinct distributed.comm namespace which hosts an abstracted communications layer, mediated using URIs and several top-level functions (mainly connect() and listen(), plus some address-handling functions). The default transport is tcp which handles both IPv4 and IPv6. zmq is provided as a proof-of-concept but no effort has been done (yet?) to prove that it works with distributed at large: it is only tested using the internal communications API.

A number of small issues are also addressed on the side. One notable issue is the fact that the scheduler and workers would always listen on all addresses, even if asked to identify as 127.0.0.1.


class TCP(Comm):

def __init__(self, stream, peer_addr, deserialize=True):

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

Should comms think about deserialization?

This comment has been minimized.

Copy link
@pitrou

pitrou Jan 13, 2017

Author Member

I got the impression that some comms were deserialize-always and others deserialize-never, though I've also kept the deserialize argument on read() for now. Which approach do you think is better?

If you're talking about the more general approach of pickling and unpickling in read() and write(), yes, I think it's desirable: imagine we write a new transport usable between threads of the same process, in this case we won't want to pickle anything, simple pass objects around in a queue.

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 13, 2017

Member

Ah, yes, good point.

"""
# XXX set context default options instead?
sock.set(zmq.RECONNECT_IVL, -1) # disable reconnections
sock.set(zmq.IPV6, True)

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

I'm generally curious about this choice. What was the motivation?

This comment has been minimized.

Copy link
@pitrou

pitrou Jan 13, 2017

Author Member

I'm hoping this way ZeroMQ might actually inform us of disconnects. Though at this point I'm considering the ZeroMQ transport a mere experiment, as ZeroMQ's characteristics doesn't make it very friendly to work with.

This comment has been minimized.

Copy link
@minrk

minrk Jan 13, 2017

Contributor

You won't be informed of disconnects by disabling reconnects. Sockets will simply stop triggering POLL events. I wouldn't recommend using this setting. If you want to be notified of such events, the monitor system must be used.


frames = yield self.sock.recv_multipart(copy=False)
msg = from_frames([f.buffer for f in frames], deserialize=self.deserialize)
raise gen.Return(msg)

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

Well that was easy :)

resp = {'zmq-url': make_zmq_url(self.ip, cli_port)}
yield self.sock.send_multipart([envelope] + to_frames(resp))

address = 'zmq://<unknown>' # XXX

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

Is the address the same as envelope above?

This comment has been minimized.

Copy link
@pitrou

pitrou Jan 13, 2017

Author Member

No, envelope is some opaque endpoint id attributed by ZeroMQ.

This comment has been minimized.

Copy link
@minrk

minrk Jan 13, 2017

Contributor

zmq calls the routing id 'IDENTITY', and is a uid for routing messages to peers. It can be set manually with `socket.IDENTITY = b'whoiam', but you have to be careful to avoid collisions if you do this. The default is either a monotonic counter or random id.


@property
def _socket_class(self):
return _AsyncSocket

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

Is this file re-implementing what you found to be inefficient in pyzmq?

This comment has been minimized.

Copy link
@pitrou

pitrou Jan 13, 2017

Author Member

Yes, and also avoids zeromq/pyzmq#963

This comment has been minimized.

Copy link
@minrk

minrk Jan 13, 2017

Contributor

I hope I can look forward to a PR to pyzmq when this is done!

"""
if isinstance(addr, list):
addr = tuple(addr)
# XXX how many address-parsing routines do we have?

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 12, 2017

Member

Yeah, sorry about this. There is some history here that is no longer relevant. I also wouldn't mind reducing the kinds of inputs this takes.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 12, 2017

The approach here seems reasonable to me. Thank you for pushing early; I was curious.

deserialize = self.deserialize

frames = yield self.sock.recv_multipart(copy=False)
msg = from_frames([f.buffer for f in frames], deserialize=self.deserialize)

This comment has been minimized.

Copy link
@minrk

minrk Jan 13, 2017

Contributor

deserialize=deserialize

This comment has been minimized.

Copy link
@pitrou

pitrou Jan 13, 2017

Author Member

Woo, thank you.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

I would like to issue a micro release before we merge this. I will try to accomplish that today.

@mrocklin mrocklin assigned mrocklin and unassigned mrocklin Jan 27, 2017

}


DEFAULT_SCHEME = 'tcp'

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

We might want to pull this value from config. This would make it easier to switch to ZeroMQ without changing source.

pass
else:
# XXX this registers the ZMQ event loop, event if ZMQ is unused...
from . import zmq

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

Given recent problems we probably don't want to import this by default.

raise CommClosedError(str(exc))


class TCP(Comm):

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

This should probably have a docstring

An address this listener can be contacted on. This can be
different from `listen_address` if the latter is some wildcard
address such as 'tcp://0.0.0.0:123'.
"""

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

It might be good to add these classes and their methods as API documentation within the docs. They provide an informative explanation of what one needs to do to implement another transport mechanism.

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

Ah, I see that you've done this already.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

It would be nice to remove this line from distributed/protocol/core.py:dumps()

out_frames = [ensure_bytes(f) for f in out_frames]

This will probably require us to add an ensure_bytes call within the tcp write method. This avoids an unnecessary memory copy for transports that support memoryviews.

@@ -75,7 +75,7 @@ def dumps(msg):
header['keys'].append(key)
out_frames.extend(frames)

out_frames = [ensure_bytes(f) for f in out_frames]
out_frames = [f for f in out_frames]

This comment has been minimized.

Copy link
@mrocklin

mrocklin Jan 27, 2017

Member

Can this line be removed?

@pitrou pitrou force-pushed the pitrou:io_refactor branch from 8d44656 to c0cb233 Jan 27, 2017

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

In distributed/bokeh/scheduler.py we add a TapTool to a plot so that, whenever a user clicks on a rectangle corresponding to a worker, it redirects them to that worker's bokeh page. The code to do this is currently written as follows:

            bokeh_addresses = [worker.split(':')[0] + ':' + str(wi[worker]['services']['bokeh'])
                               for worker in workers]

This solution no longer works. We find ourselves somewhere less-than-useful.

@pitrou

This comment has been minimized.

Copy link
Member Author

commented Jan 27, 2017

This solution no longer works. We find ourselves somewhere less-than-useful.

Oh, sorry about that. I'll fix it.

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

I've tried out the ZeroMQ solution on a larger problem on a cluster. It is not robust and the cluster often deadlocks. I suspect that we would run into errors on the test_stress.py tests.

@pitrou

This comment has been minimized.

Copy link
Member Author

commented Jan 27, 2017

The TapTool regression should be fixed now (I also added a test). Could you check?

@pitrou

This comment has been minimized.

Copy link
Member Author

commented Jan 27, 2017

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

My guess is that ZeroMQ won't pass tests. I don't think we necessarily need it to pass tests for us to merge this.

I think that Travis has the ability to add new elements to the test matrix that don't need to pass to get a green light.

@pitrou

This comment has been minimized.

Copy link
Member Author

commented Jan 27, 2017

The option doesn't switch to default transport to ZMQ, it just enables the ZMQ transport and the ZMQ event loop. The tests have been passing before with that, so they shouldn't fail :-)

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

Ah, I see

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

Now that version 1.15.2 is released I would like to see this merged quickly. Merging soon has a few benefits:

  1. It frees @pitrou from having to constantly merge in updates from master
  2. It gets more exposure to possible regressions
  3. It lets us start working on other comms like Queues or our own TCP stream (if we can verify that that is valuable)
@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 27, 2017

@pitrou how do you feel about this PR? What remains to be done?

@pitrou

This comment has been minimized.

Copy link
Member Author

commented Jan 27, 2017

Nothing important really. There may be bits and pieces worth further cleaning up, but that can be tackled in further PRs.

@mrocklin mrocklin merged commit 8176da1 into dask:master Jan 30, 2017

2 checks passed

continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 30, 2017

Well, this is in. Lets see what happens :)

@mrocklin

This comment has been minimized.

Copy link
Member

commented Jan 30, 2017

Thank you @pitrou for taking this on.

@minrk

This comment has been minimized.

Copy link
Contributor

commented Jan 30, 2017

I will investigate if there are any zmq tuneups I can provide, if I get a chance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.