Skip to content

Commit

Permalink
IO layer refactor (#810)
Browse files Browse the repository at this point in the history
* Start comm layer

* zmq draft

* Add our own ZMQ async socket

* Wire our ZMQ async socket on the ZMQ comm layer

* Add and fix tests

* Improve comm layer and tests

* More streamlining of the comm layer

* Get distributed.tests.test_core to pass

* Get test_batched to pass

* Make tests in test_scheduler that don't use a Client or a Nanny pass

* All test_scheduler tests pass except nanny tests

* Fix some of test_client

* test_client passes entirely

* Fix test in test_scheduler

* Let test_nanny tests pass

* Fix failure in test_scheduler

* Fix more tests

* Get test_channels to pass

* Let more tests pass

* Remove distributed.sync

* Get more tests to pass

* Let test_worker pass

* Get test_worker_failure to pass

* Fix test_hdfs

* Pass all diagnostics tests

* Get deploy tests to work (except SSHCluster)

* Rename test file to make py.test happy

* Fix circular import

* Have Bokeh tests pass

* Pass all cli and deploy tests

* Add a CLI utils tests

* Fix some failures in test_widgets

* Fix issues with ipv6 as well as with Py2

* Add a couple comm tests

* Hopefully fix more tests on Travis

* Fix previous merge

* Distinguish between listening address and contact address

* Fix test_batched

* Try to fix ZMQ issues on Windows

* Refactor getting the service addr of a worker

* Remove / rename some APIs

* Remove coerce_to_rpc

* Remove Nanny environment feature

* More small cleanups

* Improve test suite speed a bit

* Fix test failure

* Restrict listen IP of Scheduler services to the Scheduler's listen IP

* Remove `ip` argument from Worker and Nanny.
Workers now bind to the right address for contacting the scheduler.

* Make some calling code in scheduler and worker simpler

* Add tests for scheduler listening ports

* Try to debug logging test failures on CI

* Fix logging test failure

* Make comm closed errors more informative about the underlying reason

* Use distinct port number in test_cores to minimize the risk of getting errors when binding sockets

* Use longer timeouts in connection tests

* Fix some ZMQ issues

* Fix deserialization issue with ZMQ

* Fix compression at the right place

* Fix crash on Windows

* Fix some warnings when building the docs

* Fix examples and text

* Fix rpc() docstring

* Fix formatting of protocol message example

* Add a bit about addresses and listeners

* Address review comments

* Fix scheduler services to listen on all interfaces
(and actually run tests)

* Move ensure_bytes to TCP.write()

* Address review comments: make default scheme configurable, and add docstrings

* Fix regression in Bokeh TapTool

* Hide zmq behind a config option or environment variable

* Try to fix sporadic failure in test_tcp_many_listeners

* Fix tests on Python 2.7
  • Loading branch information
pitrou authored and mrocklin committed Jan 30, 2017
1 parent 312dd6b commit 8176da1
Show file tree
Hide file tree
Showing 87 changed files with 3,974 additions and 2,033 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ matrix:
- python: "2.7"
env: PACKAGES=blosc
- python: "3.4"
env: COVERAGE=true
env: COVERAGE=true DASK_EXPERIMENTAL_ZMQ=1
- python: "3.5"
- python: "2.7"
env: HDFS=true
Expand Down
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import print_function, division, absolute_import

from .config import config
from .core import connect, read, write, rpc
from .core import connect, rpc
from .deploy import LocalCluster
from .diagnostics import progress
from .client import (Client, Executor, CompatibleExecutor, wait, as_completed,
Expand Down
52 changes: 31 additions & 21 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@

from tornado import gen, locks
from tornado.queues import Queue
from tornado.iostream import StreamClosedError
from tornado.ioloop import PeriodicCallback, IOLoop

from .core import read, write, close
from .core import CommClosedError
from .utils import ignoring, log_errors


Expand Down Expand Up @@ -39,22 +38,25 @@ class BatchedSend(object):
['Hello,', 'world!']
"""
# XXX why doesn't BatchedSend follow either the IOStream or Comm API?

def __init__(self, interval, loop=None):
# XXX is the loop arg useful?
self.loop = loop or IOLoop.current()
self.interval = interval / 1000.

self.waker = locks.Event()
self.stopped = locks.Event()
self.please_stop = False
self.buffer = []
self.stream = None
self.comm = None
self.message_count = 0
self.batch_count = 0
self.byte_count = 0
self.next_deadline = None

def start(self, stream):
self.stream = stream
def start(self, comm):
self.comm = comm
self.loop.add_callback(self._background_send)

def __str__(self):
Expand All @@ -80,10 +82,10 @@ def _background_send(self):
self.batch_count += 1
self.next_deadline = self.loop.time() + self.interval
try:
nbytes = yield write(self.stream, payload)
nbytes = yield self.comm.write(payload)
self.byte_count += nbytes
except StreamClosedError:
logger.info("Batched Stream Closed")
except CommClosedError:
logger.info("Batched Comm Closed")
break
except Exception:
logger.exception("Error in batched write")
Expand All @@ -96,8 +98,8 @@ def send(self, msg):
This completes quickly and synchronously
"""
if self.stream is not None and self.stream._closed:
raise StreamClosedError()
if self.comm is not None and self.comm.closed():
raise CommClosedError

self.message_count += 1
self.buffer.append(msg)
Expand All @@ -106,18 +108,26 @@ def send(self, msg):
self.waker.set()

@gen.coroutine
def close(self, ignore_closed=False):
""" Flush existing messages and then close stream """
if self.stream is None:
def close(self):
""" Flush existing messages and then close comm """
if self.comm is None:
return
self.please_stop = True
self.waker.set()
yield self.stopped.wait()
try:
if self.buffer:
self.buffer, payload = [], self.buffer
yield write(self.stream, payload)
except StreamClosedError:
if not ignore_closed:
raise
yield close(self.stream)
if not self.comm.closed():
try:
if self.buffer:
self.buffer, payload = [], self.buffer
yield self.comm.write(payload)
except CommClosedError:
pass
yield self.comm.close()

def abort(self):
if self.comm is None:
return
self.waker.set()
if not self.comm.closed():
self.comm.abort()

30 changes: 15 additions & 15 deletions distributed/bokeh/background/server_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

from tornado import gen
from tornado.httpclient import AsyncHTTPClient, HTTPError
from tornado.iostream import StreamClosedError
from tornado.ioloop import IOLoop

from distributed.compatibility import ConnectionRefusedError
from distributed.core import read, connect, write
from distributed.core import connect, CommClosedError, coerce_to_address
from distributed.metrics import time
from distributed.protocol.pickle import dumps
from distributed.diagnostics.eventstream import eventstream
Expand Down Expand Up @@ -74,11 +73,11 @@ def workers():
@gen.coroutine
def progress():
with log_errors():
stream = yield progress_stream('%(host)s:%(tcp-port)d' % options, 0.050)
comm = yield progress_stream('%(host)s:%(tcp-port)d' % options, 0.050)
while True:
try:
msg = yield read(stream)
except StreamClosedError:
msg = yield comm.read()
except CommClosedError:
break
else:
messages['progress'] = msg
Expand All @@ -88,14 +87,15 @@ def progress():
def processing():
with log_errors():
from distributed.diagnostics.scheduler import processing
stream = yield connect(ip=options['host'], port=options['tcp-port'])
yield write(stream, {'op': 'feed',
'function': dumps(processing),
'interval': 0.200})
addr = coerce_to_address((options['host'], options['tcp-port']))
comm = yield connect(addr)
yield comm.write({'op': 'feed',
'function': dumps(processing),
'interval': 0.200})
while True:
try:
msg = yield read(stream)
except StreamClosedError:
msg = yield comm.read()
except CommClosedError:
break
else:
messages['processing'] = msg
Expand All @@ -105,9 +105,9 @@ def processing():
def task_events(interval, deque, times, index, rectangles, workers, last_seen):
i = 0
try:
stream = yield eventstream('%(host)s:%(tcp-port)d' % options, 0.100)
comm = yield eventstream('%(host)s:%(tcp-port)d' % options, 0.100)
while True:
msgs = yield read(stream)
msgs = yield comm.read()
if not msgs:
continue

Expand All @@ -120,8 +120,8 @@ def task_events(interval, deque, times, index, rectangles, workers, last_seen):
index.append(i)
i += 1

except StreamClosedError:
pass # don't log StreamClosedErrors
except CommClosedError:
pass # don't log CommClosedErrors
except Exception as e:
logger.exception(e)
finally:
Expand Down
2 changes: 1 addition & 1 deletion distributed/bokeh/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def update(self, messages):
pass

data['processing'] = [sorted(d[w]['processing']) for w in workers]
data['processes'] = [len(d[w]['ports']) for w in workers]
data['processes'] = [len(d[w]['addresses']) for w in workers]
self.source.data.update(data)


Expand Down
12 changes: 9 additions & 3 deletions distributed/bokeh/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@


class BokehServer(object):
def listen(self, port):
def listen(self, addr):
if self.server:
return
if isinstance(addr, tuple):
ip, port = addr
else:
port = addr
ip = None
for i in range(5):
try:
self.server = Server(self.apps, io_loop=self.loop, port=port,
self.server = Server(self.apps, io_loop=self.loop,
port=port, address=ip, host=['*'],
check_unused_sessions_milliseconds=500,
host=['*'])
)
if bokeh.__version__ <= '0.12.3':
self.server.start(start_loop=False)
else:
Expand Down
11 changes: 5 additions & 6 deletions distributed/bokeh/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,11 @@ def update(self):
with log_errors():
o = self.scheduler.occupancy
workers = list(self.scheduler.workers)
wi = self.scheduler.worker_info
try:
bokeh_addresses = [worker.split(':')[0] + ':' + str(wi[worker]['services']['bokeh'])
for worker in workers]
except Exception as e:
bokeh_addresses = [''] * len(workers)

bokeh_addresses = []
for worker in workers:
addr = self.scheduler.get_worker_service_addr(worker, 'bokeh')
bokeh_addresses.append('%s:%d' % addr if addr is not None else '')

y = list(range(len(workers)))
occupancy = [o[w] for w in workers]
Expand Down
10 changes: 7 additions & 3 deletions distributed/bokeh/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from distributed.client import _wait
from distributed.metrics import time
from distributed.utils_test import gen_cluster, inc, dec, slowinc
from distributed.bokeh.worker import Counters
from distributed.bokeh.worker import Counters, BokehWorker
from distributed.bokeh.scheduler import (BokehScheduler, StateTable,
SystemMonitor, Occupancy, StealingTimeSeries, StealingEvents)

Expand All @@ -34,13 +34,17 @@ def test_simple(c, s, a, b):
assert 'bokeh' in response.body.decode().lower()


@gen_cluster(client=True)
@gen_cluster(client=True, worker_kwargs=dict(services={'bokeh': BokehWorker}))
def test_basic(c, s, a, b):
for component in [SystemMonitor, StateTable, Occupancy, StealingTimeSeries]:
ss = component(s)

ss.update()
assert len(first(ss.source.data.values()))
data = ss.source.data
assert len(first(data.values()))
if component is Occupancy:
assert all(addr.startswith('127.0.0.1:')
for addr in data['bokeh_address'])


@gen_cluster(client=True)
Expand Down
4 changes: 2 additions & 2 deletions distributed/bokeh/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def update(self):
'Ready': [len(w.ready)],
'Waiting': [len(w.waiting_for_data)],
'Connections': [len(w.in_flight_workers)],
'Serving': [len(w._listen_streams)]}
'Serving': [len(w._comms)]}
self.source.data.update(d)


Expand Down Expand Up @@ -168,7 +168,7 @@ def __init__(self, worker, **kwargs):
def update(self):
with log_errors():
self.source.stream({'x': [time() * 1000],
'out': [len(self.worker._listen_streams)],
'out': [len(self.worker._comms)],
'in': [len(self.worker.in_flight_workers)]},
10000)

Expand Down
2 changes: 1 addition & 1 deletion distributed/bokeh/worker_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from ..utils import ignoring, log_errors

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)

with ignoring(ImportError):
from bokeh.models import (
Expand Down
35 changes: 17 additions & 18 deletions distributed/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
from time import sleep
import threading

from tornado.iostream import StreamClosedError

from .client import Future
from .core import CommClosedError
from .utils import tokey, log_errors

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,15 +47,15 @@ def subscribe(self, channel=None, client=None, maxlen=None):
self.stopped[channel] = False
self.clients[channel].add(client)

stream = self.scheduler.streams[client]
comm = self.scheduler.comms[client]
for key in self.deques[channel]:
stream.send({'op': 'channel-append',
'key': key,
'channel': channel})
comm.send({'op': 'channel-append',
'key': key,
'channel': channel})

if self.stopped[channel]:
stream.send({'op': 'channel-stop',
'channel': channel})
comm.send({'op': 'channel-stop',
'channel': channel})

def unsubscribe(self, channel=None, client=None):
logger.info("Remove client from channel, %s, %s", client, channel)
Expand Down Expand Up @@ -88,20 +87,20 @@ def stop(self, channel=None, client=None):
logger.info("Stop channel %s", channel)
for client in list(self.clients[channel]):
try:
stream = self.scheduler.streams[client]
stream.send({'op': 'channel-stop',
'channel': channel})
except (KeyError, StreamClosedError):
comm = self.scheduler.comms[client]
comm.send({'op': 'channel-stop',
'channel': channel})
except (KeyError, CommClosedError):
self.unsubscribe(channel, client)

def report(self, channel, key):
for client in list(self.clients[channel]):
try:
stream = self.scheduler.streams[client]
stream.send({'op': 'channel-append',
'key': key,
'channel': channel})
except (KeyError, StreamClosedError):
comm = self.scheduler.comms[client]
comm.send({'op': 'channel-append',
'key': key,
'channel': channel})
except (KeyError, CommClosedError):
self.unsubscribe(channel, client)


Expand Down Expand Up @@ -227,7 +226,7 @@ def flush(self):
sleep(0.01)

def __del__(self):
if not self.client.scheduler_stream.stream:
if not self.client.scheduler_comm.comm:
self.client._send_to_scheduler({'op': 'channel-unsubscribe',
'channel': self.name,
'client': self.client.id})
Expand Down
Loading

0 comments on commit 8176da1

Please sign in to comment.