From d8897a2e63ac2042e97e26e58342417c68c238f3 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 12:18:47 +0100 Subject: [PATCH 1/9] Rework BatchedSend logic Does away with the timeout and looking up a private attribute on IOStream. Refs PR #653. --- distributed/batched.py | 100 ++++++++++++++---------------- distributed/tests/test_batched.py | 52 ++++++++-------- 2 files changed, 72 insertions(+), 80 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 2aeabf6005..04bb8bc51f 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -1,16 +1,17 @@ from __future__ import print_function, division, absolute_import from datetime import timedelta +from functools import partial import logging from timeit import default_timer -from tornado import gen +from tornado import gen, locks from tornado.queues import Queue from tornado.iostream import StreamClosedError from tornado.ioloop import PeriodicCallback, IOLoop from .core import read, write -from .utils import log_errors +from .utils import ignoring, log_errors logger = logging.getLogger(__name__) @@ -38,18 +39,19 @@ class BatchedSend(object): def __init__(self, interval, loop=None): self.loop = loop or IOLoop.current() self.interval = interval / 1000. - self.last_transmission = 0 + + self.waker = locks.Event() + self.stopped = locks.Event() + self.please_stop = False self.buffer = [] self.stream = None - self.last_payload = [] - self.last_send = gen.sleep(0) self.message_count = 0 self.batch_count = 0 + self.next_deadline = None def start(self, stream): self.stream = stream - if self.buffer: - self.send_next() + self.loop.add_callback(self._background_send) def __str__(self): return '' % len(self.buffer) @@ -57,68 +59,56 @@ def __str__(self): __repr__ = __str__ @gen.coroutine - def send_next(self, wait=True): - try: - now = default_timer() - if wait: - wait_time = min(self.last_transmission + self.interval - now, - self.interval) - yield gen.sleep(wait_time) - yield self.last_send - self.buffer, payload = [], self.buffer - self.last_payload = payload - self.last_transmission = now + def _background_send(self): + while not self.please_stop: + with ignoring(gen.TimeoutError): + yield self.waker.wait(self.next_deadline) + self.waker.clear() + if not self.buffer: + # Nothing to send + self.next_deadline = None + continue + if (self.next_deadline is not None and + self.loop.time() < self.next_deadline): + # Send interval not expired yet + continue + payload, self.buffer = self.buffer, [] self.batch_count += 1 - self.last_send = write(self.stream, payload) - except Exception as e: - logger.exception(e) - raise + try: + yield write(self.stream, payload) + except Exception: + logger.exception("Error in batched write") + break + self.next_deadline = self.loop.time() + self.interval - @gen.coroutine - def _write(self, payload): - yield gen.sleep(0) - yield write(self.stream, payload) + self.stopped.set() def send(self, msg): - """ Send a message to the other side + """ Schedule a message for sending to the other side This completes quickly and synchronously """ - try: - self.message_count += 1 - if self.stream is None: # not yet started - self.buffer.append(msg) - return - - if self.stream._closed: - raise StreamClosedError() + if self.stream is not None and self.stream._closed: + raise StreamClosedError() - if self.buffer: - self.buffer.append(msg) - return - - # If we're new and early, - now = default_timer() - if (now < self.last_transmission + self.interval - or not self.last_send._done): - self.buffer.append(msg) - self.loop.add_callback(self.send_next) - return - - self.buffer.append(msg) - self.loop.add_callback(self.send_next, wait=False) - except StreamClosedError: - raise - except Exception as e: - logger.exception(e) + self.message_count += 1 + self.buffer.append(msg) + self.waker.set() @gen.coroutine def close(self, ignore_closed=False): """ Flush existing messages and then close stream """ + if self.stream is None: + return + self.please_stop = True + self.waker.set() + yield self.stopped.wait() try: - if self.stream._write_buffer: - yield self.last_send if self.buffer: + if self.next_deadline is not None: + delay = self.next_deadline - self.loop.time() + if delay > 0: + yield gen.sleep(delay) self.buffer, payload = [], self.buffer yield write(self.stream, payload) except StreamClosedError: diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index e7059ffb53..ba49e0fbfc 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -37,7 +37,7 @@ def handle_stream(self, stream, address): self.count += 1 yield write(stream, msg) except StreamClosedError as e: - pass + return def listen(self, port=0): while True: @@ -113,7 +113,6 @@ def test_BatchedSend(): assert str(len(b.buffer)) in str(b) assert str(len(b.buffer)) in repr(b) b.start(stream) - yield b.last_send yield gen.sleep(0.020) @@ -135,41 +134,30 @@ def test_send_before_start(): stream = yield client.connect('127.0.0.1', e.port) b = BatchedSend(interval=10) - yield b.last_send b.send('hello') - b.send('hello') + b.send('world') b.start(stream) - result = yield read(stream); assert result == ['hello', 'hello'] + result = yield read(stream); assert result == ['hello', 'world'] @gen_test() -def test_send_after_stream_start_before_stream_finish(): +def test_send_after_stream_start(): with echo_server() as e: client = TCPClient() stream = yield client.connect('127.0.0.1', e.port) b = BatchedSend(interval=10) - yield b.last_send b.start(stream) b.send('hello') - result = yield read(stream); assert result == ['hello'] - - -@gen_test() -def test_send_after_stream_finish(): - with echo_server() as e: - client = TCPClient() - stream = yield client.connect('127.0.0.1', e.port) - - b = BatchedSend(interval=10) - b.start(stream) - yield b.last_send + b.send('world') + result = yield read(stream) + if len(result) < 2: + result += yield read(stream) + assert result == ['hello', 'world'] - b.send('hello') - result = yield read(stream); assert result == ['hello'] @gen_test() def test_send_before_close(): @@ -179,7 +167,6 @@ def test_send_before_close(): b = BatchedSend(interval=10) b.start(stream) - yield b.last_send cnt = int(e.count) b.send('hello') @@ -203,7 +190,6 @@ def test_close_closed(): b = BatchedSend(interval=10) b.start(stream) - yield b.last_send b.send(123) stream.close() # external closing @@ -211,6 +197,24 @@ def test_close_closed(): yield b.close(ignore_closed=True) +@gen_test() +def test_close_not_started(): + b = BatchedSend(interval=10) + yield b.close() + + +@gen_test() +def test_close_twice(): + with echo_server() as e: + client = TCPClient() + stream = yield client.connect('127.0.0.1', e.port) + + b = BatchedSend(interval=10) + b.start(stream) + yield b.close() + yield b.close() + + @slow @gen_test(timeout=50) def test_stress(): @@ -253,14 +257,12 @@ def test_sending_traffic_jam(): b = BatchedSend(interval=0.01) b.start(stream) - yield b.last_send n = 50 msg = {'x': to_serialize(data)} for i in range(n): b.send(assoc(msg, 'i', i)) - print(len(b.buffer)) yield gen.sleep(0.001) results = [] From 1f931c0bf834753bf4a46f116900c954b8d7a729 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 14:48:16 +0100 Subject: [PATCH 2/9] Address review comments --- distributed/batched.py | 13 +++++++------ distributed/core.py | 15 +++------------ 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 04bb8bc51f..f26667f40f 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -24,6 +24,9 @@ class BatchedSend(object): more than one message every interval milliseconds. We send lists of messages. + Batching several messages at once helps performance when sending + a myriad of tiny messages. + Example ------- >>> stream = yield connect(ip, port) @@ -74,12 +77,12 @@ def _background_send(self): continue payload, self.buffer = self.buffer, [] self.batch_count += 1 + self.next_deadline = self.loop.time() + self.interval try: yield write(self.stream, payload) except Exception: logger.exception("Error in batched write") break - self.next_deadline = self.loop.time() + self.interval self.stopped.set() @@ -93,7 +96,9 @@ def send(self, msg): self.message_count += 1 self.buffer.append(msg) - self.waker.set() + # Avoid spurious wakeups if possible + if self.next_deadline is None: + self.waker.set() @gen.coroutine def close(self, ignore_closed=False): @@ -105,10 +110,6 @@ def close(self, ignore_closed=False): yield self.stopped.wait() try: if self.buffer: - if self.next_deadline is not None: - delay = self.next_deadline - self.loop.time() - if delay > 0: - yield gen.sleep(delay) self.buffer, payload = [], self.buffer yield write(self.stream, payload) except StreamClosedError: diff --git a/distributed/core.py b/distributed/core.py index a5ca0ab2d9..46575566ea 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -231,23 +231,14 @@ def write(stream, msg): logger.exception(e) raise - futures = [] - lengths = ([struct.pack('Q', len(frames))] + [struct.pack('Q', len(frame)) for frame in frames]) - futures.append(stream.write(b''.join(lengths))) + stream.write(b''.join(lengths)) for frame in frames[:-1]: - futures.append(stream.write(frame)) - - futures.append(stream.write(frames[-1])) + stream.write(frame) - while stream._write_buffer: - try: - yield gen.with_timeout(timedelta(seconds=0.01), futures[-1]) - break - except gen.TimeoutError: - pass + yield stream.write(frames[-1]) def pingpong(stream): From e746b3222e32c9606e9673a0b2c59e34eb514d17 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 16:17:44 +0100 Subject: [PATCH 3/9] Remove BatchedStream --- distributed/batched.py | 87 ----------------------------- distributed/core.py | 60 +++++++++----------- distributed/tests/test_batched.py | 51 +---------------- distributed/tests/test_scheduler.py | 42 +++++++------- distributed/tests/test_worker.py | 9 ++- distributed/utils_test.py | 36 +++++++++++- 6 files changed, 84 insertions(+), 201 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index f26667f40f..6190c0ad36 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -116,90 +116,3 @@ def close(self, ignore_closed=False): if not ignore_closed: raise self.stream.close() - - -class BatchedStream(object): - """ Mostly obsolete, see BatchedSend """ - def __init__(self, stream, interval): - self.stream = stream - self.interval = interval / 1000. - self.last_transmission = default_timer() - self.send_q = Queue() - self.recv_q = Queue() - self._background_send_coroutine = self._background_send() - self._background_recv_coroutine = self._background_recv() - self._broken = None - - self.pc = PeriodicCallback(lambda: None, 100) - self.pc.start() - - @gen.coroutine - def _background_send(self): - with log_errors(): - while True: - msg = yield self.send_q.get() - if msg == 'close': - break - msgs = [msg] - now = default_timer() - wait_time = self.last_transmission + self.interval - now - if wait_time > 0: - yield gen.sleep(wait_time) - while not self.send_q.empty(): - msgs.append(self.send_q.get_nowait()) - - try: - yield write(self.stream, msgs) - except StreamClosedError: - self.recv_q.put_nowait('close') - self._broken = True - break - - if len(msgs) > 1: - logger.debug("Batched messages: %d", len(msgs)) - for _ in msgs: - self.send_q.task_done() - - @gen.coroutine - def _background_recv(self): - with log_errors(): - while True: - try: - msgs = yield read(self.stream) - except StreamClosedError: - self.recv_q.put_nowait('close') - self.send_q.put_nowait('close') - self._broken = True - break - assert isinstance(msgs, list) - if len(msgs) > 1: - logger.debug("Batched messages: %d", len(msgs)) - for msg in msgs: - self.recv_q.put_nowait(msg) - - @gen.coroutine - def flush(self): - yield self.send_q.join() - - @gen.coroutine - def send(self, msg): - if self._broken: - raise StreamClosedError('Batch Stream is Closed') - else: - self.send_q.put_nowait(msg) - - @gen.coroutine - def recv(self): - result = yield self.recv_q.get() - if result == 'close': - raise StreamClosedError('Batched Stream is Closed') - else: - raise gen.Return(result) - - @gen.coroutine - def close(self): - yield self.flush() - raise gen.Return(self.stream.close()) - - def closed(self): - return self.stream.closed() diff --git a/distributed/core.py b/distributed/core.py index 46575566ea..1d4bb7e414 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -196,49 +196,42 @@ def handle_stream(self, stream, address): @gen.coroutine def read(stream, deserialize=True): """ Read a message from a stream """ - if isinstance(stream, BatchedStream): - msg = yield stream.recv() - raise gen.Return(msg) - else: - n_frames = yield stream.read_bytes(8) - n_frames = struct.unpack('Q', n_frames)[0] + n_frames = yield stream.read_bytes(8) + n_frames = struct.unpack('Q', n_frames)[0] - lengths = yield stream.read_bytes(8 * n_frames) - lengths = struct.unpack('Q' * n_frames, lengths) + lengths = yield stream.read_bytes(8 * n_frames) + lengths = struct.unpack('Q' * n_frames, lengths) - frames = [] - for length in lengths: - if length: - frame = yield stream.read_bytes(length) - else: - frame = b'' - frames.append(frame) + frames = [] + for length in lengths: + if length: + frame = yield stream.read_bytes(length) + else: + frame = b'' + frames.append(frame) - msg = protocol.loads(frames, deserialize=deserialize) - raise gen.Return(msg) + msg = protocol.loads(frames, deserialize=deserialize) + raise gen.Return(msg) @gen.coroutine def write(stream, msg): """ Write a message to a stream """ - if isinstance(stream, BatchedStream): - stream.send(msg) - else: - try: - frames = protocol.dumps(msg) - except Exception as e: - logger.info("Unserializable Message: %s", msg) - logger.exception(e) - raise + try: + frames = protocol.dumps(msg) + except Exception as e: + logger.info("Unserializable Message: %s", msg) + logger.exception(e) + raise - lengths = ([struct.pack('Q', len(frames))] + - [struct.pack('Q', len(frame)) for frame in frames]) - stream.write(b''.join(lengths)) + lengths = ([struct.pack('Q', len(frames))] + + [struct.pack('Q', len(frame)) for frame in frames]) + stream.write(b''.join(lengths)) - for frame in frames[:-1]: - stream.write(frame) + for frame in frames[:-1]: + stream.write(frame) - yield stream.write(frames[-1]) + yield stream.write(frames[-1]) def pingpong(stream): @@ -649,6 +642,3 @@ def clean_exception(exception, traceback, **kwargs): if isinstance(traceback, str): traceback = None return type(exception), exception, traceback - - -from .batched import BatchedStream diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index ba49e0fbfc..2178b8fc8e 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -14,17 +14,7 @@ from distributed.core import read, write from distributed.utils import sync, All from distributed.utils_test import gen_test, slow -from distributed.batched import BatchedStream, BatchedSend - - -class MyServer(TCPServer): - @gen.coroutine - def handle_stream(self, stream, address): - batched = BatchedStream(stream, interval=10) - while True: - msg = yield batched.recv() - batched.send(msg) - batched.send(msg) +from distributed.batched import BatchedSend class EchoServer(TCPServer): @@ -64,45 +54,6 @@ def echo_server(): server.stop() -@gen_test(timeout=10) -def test_BatchedStream(): - port = 3434 - server = MyServer() - server.listen(port) - - client = TCPClient() - stream = yield client.connect('127.0.0.1', port) - b = BatchedStream(stream, interval=20) - - b.send('hello') - b.send('world') - - result = yield b.recv(); assert result == 'hello' - result = yield b.recv(); assert result == 'hello' - result = yield b.recv(); assert result == 'world' - result = yield b.recv(); assert result == 'world' - - b.close() - -@gen_test(timeout=10) -def test_BatchedStream_raises(): - port = 3435 - server = MyServer() - server.listen(port) - - client = TCPClient() - stream = yield client.connect('127.0.0.1', port) - b = BatchedStream(stream, interval=20) - - stream.close() - - with pytest.raises(StreamClosedError): - yield b.recv() - - with pytest.raises(StreamClosedError): - yield b.send('123') - - @gen_test() def test_BatchedSend(): with echo_server() as e: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index a4a6dd7b1e..51e6b707bb 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -20,7 +20,6 @@ import pytest from distributed import Nanny, Worker -from distributed.batched import BatchedStream from distributed.core import connect, read, write, rpc from distributed.scheduler import (validate_state, decide_worker, Scheduler) @@ -28,7 +27,7 @@ from distributed.protocol.pickle import dumps from distributed.worker import dumps_function, dumps_task from distributed.utils_test import (inc, ignoring, dec, gen_cluster, gen_test, - loop) + loop, readone) from distributed.utils import All from dask.compatibility import apply @@ -345,8 +344,7 @@ def div(x, y): def test_scheduler(s, a, b): stream = yield connect(s.ip, s.port) yield write(stream, {'op': 'register-client', 'client': 'ident'}) - stream = BatchedStream(stream, 10) - msg = yield read(stream) + msg = yield readone(stream) assert msg['op'] == 'stream-start' # Test update graph @@ -360,7 +358,7 @@ def test_scheduler(s, a, b): 'keys': ['x', 'z'], 'client': 'ident'}) while True: - msg = yield read(stream) + msg = yield readone(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'z': break @@ -376,7 +374,7 @@ def test_scheduler(s, a, b): 'client': 'ident'}) while True: - msg = yield read(stream) + msg = yield readone(stream) if msg['op'] == 'task-erred' and msg['key'] == 'b': break @@ -385,7 +383,7 @@ def test_scheduler(s, a, b): s.ensure_occupied() while True: - msg = yield read(stream) + msg = yield readone(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'z': break @@ -399,7 +397,7 @@ def test_scheduler(s, a, b): 'keys': ['zz'], 'client': 'ident'}) while True: - msg = yield read(stream) + msg = yield readone(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'zz': break @@ -452,23 +450,23 @@ def test_multi_queues(s, a, b): def test_server(s, a, b): stream = yield connect('127.0.0.1', s.port) yield write(stream, {'op': 'register-client', 'client': 'ident'}) - stream = BatchedStream(stream, 0) - stream.send({'op': 'update-graph', - 'tasks': {'x': dumps_task((inc, 1)), - 'y': dumps_task((inc, 'x'))}, - 'dependencies': {'x': [], 'y': ['x']}, - 'keys': ['y'], - 'client': 'ident'}) + yield write(stream, {'op': 'update-graph', + 'tasks': {'x': dumps_task((inc, 1)), + 'y': dumps_task((inc, 'x'))}, + 'dependencies': {'x': [], 'y': ['x']}, + 'keys': ['y'], + 'client': 'ident'}) while True: - msg = yield read(stream) + msg = yield readone(stream) if msg['op'] == 'key-in-memory' and msg['key'] == 'y': break - stream.send({'op': 'close-stream'}) - msg = yield read(stream) + yield write(stream, {'op': 'close-stream'}) + msg = yield readone(stream) assert msg == {'op': 'stream-closed'} - assert stream.closed() + with pytest.raises(StreamClosedError): + yield readone(stream) stream.close() @@ -701,8 +699,6 @@ def test_filtered_communication(s, a, b): yield write(f, {'op': 'register-client', 'client': 'f'}) yield read(c) yield read(f) - c = BatchedStream(c, 0) - f = BatchedStream(f, 0) assert set(s.streams) == {'c', 'f'} @@ -720,10 +716,10 @@ def test_filtered_communication(s, a, b): 'client': 'f', 'keys': ['z']}) - msg = yield read(c) + msg, = yield read(c) assert msg['op'] == 'key-in-memory' assert msg['key'] == 'y' - msg = yield read(f) + msg, = yield read(f) assert msg['op'] == 'key-in-memory' assert msg['key'] == 'z' diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 97a15475c6..4ab609e034 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -15,7 +15,6 @@ from tornado import gen from tornado.ioloop import TimeoutError -from distributed.batched import BatchedStream from distributed.core import rpc, connect, read, write from distributed.client import _wait from distributed.scheduler import Scheduler @@ -25,7 +24,7 @@ from distributed.worker import Worker, error_message, logger, TOTAL_MEMORY from distributed.utils import ignoring from distributed.utils_test import (loop, inc, gen_cluster, - slow, slowinc, throws, current_loop, gen_test) + slow, slowinc, throws, current_loop, gen_test, readone) @@ -459,16 +458,16 @@ def test_gather(s, a, b): @gen_cluster() def test_compute_stream(s, a, b): stream = yield connect(a.ip, a.port) + yield write(stream, {'op': 'compute-stream'}) msgs = [{'op': 'compute-task', 'function': dumps(inc), 'args': dumps((i,)), 'key': 'x-%d' % i} for i in range(10)] - bstream = BatchedStream(stream, 0) for msg in msgs[:5]: yield write(stream, msg) for i in range(5): - msg = yield read(bstream) + msg = yield readone(stream) assert msg['status'] == 'OK' assert msg['key'][0] == 'x' @@ -476,7 +475,7 @@ def test_compute_stream(s, a, b): yield write(stream, msg) for i in range(5): - msg = yield read(bstream) + msg = yield readone(stream) assert msg['status'] == 'OK' assert msg['key'][0] == 'x' diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e0f8667285..552226b208 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -14,7 +14,7 @@ import uuid from toolz import merge -from tornado import gen +from tornado import gen, queues from tornado.ioloop import IOLoop, TimeoutError from tornado.iostream import StreamClosedError @@ -145,6 +145,40 @@ def slowadd(x, y, delay=0.02): return x + y +_readone_queues = {} + +@gen.coroutine +def readone(stream): + """ + Read one message at a time from a stream that reads lists of + messages. + """ + try: + q = _readone_queues[stream] + except KeyError: + q = _readone_queues[stream] = queues.Queue() + + @gen.coroutine + def background_read(): + while True: + try: + messages = yield read(stream) + except StreamClosedError: + break + for msg in messages: + q.put_nowait(msg) + q.put_nowait(None) + del _readone_queues[stream] + + background_read() + + msg = yield q.get() + if msg is None: + raise StreamClosedError + else: + raise gen.Return(msg) + + def run_scheduler(q, scheduler_port=0, **kwargs): from distributed import Scheduler from tornado.ioloop import IOLoop, PeriodicCallback From 6b6c492dfefe66f785f0fbeb4faf1864f96cc7fd Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 17:14:49 +0100 Subject: [PATCH 4/9] Remove fragile wait on IOStream.write() --- distributed/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 1d4bb7e414..ea272ccb29 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -228,10 +228,10 @@ def write(stream, msg): [struct.pack('Q', len(frame)) for frame in frames]) stream.write(b''.join(lengths)) - for frame in frames[:-1]: + for frame in frames: stream.write(frame) - yield stream.write(frames[-1]) + yield gen.moment def pingpong(stream): From ac1fbb1227b7807d397da908a9650b67bf98dc2e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 18:26:31 +0100 Subject: [PATCH 5/9] Flush streams before closing to avoid losing messages. @mrocklin --- distributed/batched.py | 3 ++- distributed/core.py | 37 ++++++++++++++++++++++++----- distributed/scheduler.py | 10 ++++---- distributed/tests/test_nanny.py | 4 ++-- distributed/tests/test_scheduler.py | 12 +++++----- distributed/utils.py | 2 +- distributed/utils_test.py | 4 ++-- 7 files changed, 48 insertions(+), 24 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index 6190c0ad36..a79e57d064 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -10,7 +10,7 @@ from tornado.iostream import StreamClosedError from tornado.ioloop import PeriodicCallback, IOLoop -from .core import read, write +from .core import read, write, flush from .utils import ignoring, log_errors @@ -112,6 +112,7 @@ def close(self, ignore_closed=False): if self.buffer: self.buffer, payload = [], self.buffer yield write(self.stream, payload) + yield flush(self.stream) except StreamClosedError: if not ignore_closed: raise diff --git a/distributed/core.py b/distributed/core.py index ea272ccb29..141c9a6db9 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -186,7 +186,7 @@ def handle_stream(self, stream, address): break finally: try: - stream.close() + yield close(stream) except Exception as e: logger.warn("Failed while closing writer", exc_info=True) logger.info("Close connection from %s:%d to %s", address[0], address[1], @@ -229,11 +229,36 @@ def write(stream, msg): stream.write(b''.join(lengths)) for frame in frames: + # Can't wait for the write() Future as it may be lost + # ("If write is called again before that Future has resolved, + # the previous future will be orphaned and will never resolve") stream.write(frame) yield gen.moment +@gen.coroutine +def flush(stream): + """Flush the stream's output buffer. + This is recommended before closing the stream. + """ + if stream.writing(): + yield stream.write(b'') + + +@gen.coroutine +def close(stream): + """Close a stream safely. + """ + if not stream.closed(): + try: + flush(stream) + except StreamClosedError: + pass + finally: + stream.close() + + def pingpong(stream): return b'pong' @@ -295,7 +320,7 @@ def send_recv(stream=None, arg=None, ip=None, port=None, addr=None, reply=True, else: response = None if kwargs.get('close'): - stream.close() + close(stream) raise gen.Return(response) @@ -398,7 +423,7 @@ def close_streams(self): for stream in self.streams: if stream and not stream.closed(): try: - stream.close() + close(stream) except (OSError, IOError, StreamClosedError): pass self.streams.clear() @@ -551,15 +576,15 @@ def collect(self): self.open, self.active) for streams in list(self.available.values()): for stream in streams: - stream.close() + close(stream) def close(self): for streams in list(self.available.values()): for stream in streams: - stream.close() + close(stream) for streams in list(self.occupied.values()): for stream in streams: - stream.close() + close(stream) def coerce_to_address(o, out=str): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c9bc5495a8..2efa6fe85e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -29,7 +29,7 @@ from .batched import BatchedSend from .config import config -from .core import (rpc, connect, read, write, MAX_BUFFER_SIZE, +from .core import (rpc, connect, read, write, close, MAX_BUFFER_SIZE, Server, send_recv, coerce_to_address, error_message) from .utils import (All, ignoring, clear_queue, get_ip, ignore_exceptions, ensure_ip, get_fileno_limit, log_errors, key_split, mean, @@ -410,7 +410,7 @@ def finished(self): def close_streams(self): """ Close all active IOStreams """ for stream in self.streams.values(): - stream.stream.close() + close(stream.stream) self.rpc.close() @gen.coroutine @@ -711,8 +711,7 @@ def remove_worker(self, stream=None, address=None, safe=False): return 'already-removed' with ignoring(AttributeError): stream = self.worker_streams[address].stream - if not stream.closed(): - stream.close() + close(stream) host, port = address.split(':') @@ -1141,8 +1140,7 @@ def worker_stream(self, worker): import pdb; pdb.set_trace() raise finally: - if not stream.closed(): - stream.close() + close(stream) self.remove_worker(address=worker) def correct_time_delay(self, worker, msg): diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 20da78c9d4..bc48266bd6 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -12,7 +12,7 @@ from tornado import gen from distributed import Nanny, rpc, Scheduler -from distributed.core import connect, read, write +from distributed.core import connect, read, write, close from distributed.protocol.pickle import dumps, loads from distributed.utils import ignoring from distributed.utils_test import gen_cluster @@ -120,7 +120,7 @@ def test_monitor_resources(s): assert isinstance(msg, dict) assert {'cpu_percent', 'memory_percent'}.issubset(msg) - stream.close() + close(stream) yield n._close() s.stop() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 51e6b707bb..f486005a3f 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -20,7 +20,7 @@ import pytest from distributed import Nanny, Worker -from distributed.core import connect, read, write, rpc +from distributed.core import connect, read, write, close, rpc from distributed.scheduler import (validate_state, decide_worker, Scheduler) from distributed.client import _wait @@ -402,7 +402,7 @@ def test_scheduler(s, a, b): break write(stream, {'op': 'close'}) - stream.close() + close(stream) @gen_cluster() @@ -467,7 +467,7 @@ def test_server(s, a, b): assert msg == {'op': 'stream-closed'} with pytest.raises(StreamClosedError): yield readone(stream) - stream.close() + close(stream) @gen_cluster() @@ -547,7 +547,7 @@ def func(scheduler): expected = s.processing, s.stacks assert cloudpickle.loads(response) == expected - stream.close() + close(stream) @gen_cluster() @@ -573,7 +573,7 @@ def teardown(scheduler, state): response = yield read(stream) assert response == 'OK' - stream.close() + close(stream) start = time() while not hasattr(s, 'flag'): yield gen.sleep(0.01) @@ -599,7 +599,7 @@ def func(scheduler): response = yield read(stream) assert response == True - stream.close() + close(stream) @gen_test(timeout=None) diff --git a/distributed/utils.py b/distributed/utils.py index 08b1c0d208..0942f1d020 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -63,7 +63,7 @@ def get_ip(host='8.8.8.8', port=80): def ignoring(*exceptions): try: yield - except exceptions: + except exceptions as e: pass diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 7333f23ec9..78b722b142 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -20,7 +20,7 @@ from tornado.ioloop import IOLoop, TimeoutError from tornado.iostream import StreamClosedError -from .core import connect, read, write, rpc +from .core import connect, read, write, close, rpc from .utils import ignoring, log_errors, sync import pytest @@ -303,7 +303,7 @@ def disconnect(ip, port): yield write(stream, {'op': 'terminate', 'close': True}) response = yield read(stream) finally: - stream.close() + yield close(stream) import pytest From 05b7c9261459de9f9b6086bce4150862ca4563fa Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 18:51:25 +0100 Subject: [PATCH 6/9] Yield on the flush() call --- distributed/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 141c9a6db9..ff072d7d56 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -240,7 +240,8 @@ def write(stream, msg): @gen.coroutine def flush(stream): """Flush the stream's output buffer. - This is recommended before closing the stream. + This should be the last operation before closing, otherwise this + coroutine may never return. """ if stream.writing(): yield stream.write(b'') @@ -252,7 +253,7 @@ def close(stream): """ if not stream.closed(): try: - flush(stream) + yield flush(stream) except StreamClosedError: pass finally: From eb6afe8a4a0126c4e67ec3fb4634803d97a42167 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2016 19:35:53 +0100 Subject: [PATCH 7/9] Avoid exposing close() --- distributed/batched.py | 5 ++--- distributed/core.py | 18 ++++++------------ 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index a79e57d064..a6a74a6ac2 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -10,7 +10,7 @@ from tornado.iostream import StreamClosedError from tornado.ioloop import PeriodicCallback, IOLoop -from .core import read, write, flush +from .core import read, write, close from .utils import ignoring, log_errors @@ -112,8 +112,7 @@ def close(self, ignore_closed=False): if self.buffer: self.buffer, payload = [], self.buffer yield write(self.stream, payload) - yield flush(self.stream) except StreamClosedError: if not ignore_closed: raise - self.stream.close() + yield close(self.stream) diff --git a/distributed/core.py b/distributed/core.py index ff072d7d56..c48c9aa898 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -237,23 +237,17 @@ def write(stream, msg): yield gen.moment -@gen.coroutine -def flush(stream): - """Flush the stream's output buffer. - This should be the last operation before closing, otherwise this - coroutine may never return. - """ - if stream.writing(): - yield stream.write(b'') - - @gen.coroutine def close(stream): - """Close a stream safely. + """Close a stream after flushing it. + No concurrent write() should be issued during execution of this + coroutine. """ if not stream.closed(): try: - yield flush(stream) + # Flush the stream's write buffer by waiting for a last write. + if stream.writing(): + yield stream.write(b'') except StreamClosedError: pass finally: From b0e92f014ec6211a9b9109dbab2afd3104f2c04b Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 15 Nov 2016 10:01:41 +0100 Subject: [PATCH 8/9] Fix name clash with close() --- distributed/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index c48c9aa898..12e7c47d13 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -155,7 +155,7 @@ def handle_stream(self, stream, address): raise TypeError("Bad message type. Expected dict, got\n " + str(msg)) op = msg.pop('op') - close = msg.pop('close', False) + close_desired = msg.pop('close', False) reply = msg.pop('reply', True) if op == 'close': if reply: @@ -182,13 +182,13 @@ def handle_stream(self, stream, address): except StreamClosedError: logger.info("Lost connection: %s" % str(address)) break - if close: + if close_desired: break finally: try: yield close(stream) except Exception as e: - logger.warn("Failed while closing writer", exc_info=True) + logger.warn("Failed while closing writer", exc_info=True) logger.info("Close connection from %s:%d to %s", address[0], address[1], type(self).__name__) From a22650d7cb922619b2401c32fb71d33a613c9970 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 15 Nov 2016 13:28:19 +0100 Subject: [PATCH 9/9] Add a large traffic jam test --- distributed/tests/test_batched.py | 33 ++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_batched.py b/distributed/tests/test_batched.py index 2178b8fc8e..f46fcbb9d7 100644 --- a/distributed/tests/test_batched.py +++ b/distributed/tests/test_batched.py @@ -197,11 +197,12 @@ def recv(): stream.close() -@gen_test() -def test_sending_traffic_jam(): +@gen.coroutine +def _run_traffic_jam(nsends, nbytes): + # This test eats `nsends * nbytes` bytes in RAM np = pytest.importorskip('numpy') from distributed.protocol import to_serialize - data = bytes(np.random.randint(0, 255, size=(300000,)).astype('u1').data) + data = bytes(np.random.randint(0, 255, size=(nbytes,)).astype('u1').data) with echo_server() as e: client = TCPClient() stream = yield client.connect('127.0.0.1', e.port) @@ -209,27 +210,37 @@ def test_sending_traffic_jam(): b = BatchedSend(interval=0.01) b.start(stream) - n = 50 - msg = {'x': to_serialize(data)} - for i in range(n): + for i in range(nsends): b.send(assoc(msg, 'i', i)) - yield gen.sleep(0.001) + if np.random.random() > 0.5: + yield gen.sleep(0.001) results = [] count = 0 - while len(results) < n: + while len(results) < nsends: # If this times out then I think it's a backpressure issue # Somehow we're able to flood the socket so that the receiving end # loses some of our messages L = yield gen.with_timeout(timedelta(seconds=5), read(stream)) count += 1 - results.extend(L) + results.extend(r['i'] for r in L) assert count == b.batch_count == e.count - assert b.message_count == n + assert b.message_count == nsends - assert [r['i'] for r in results] == list(range(50)) + assert results == list(range(nsends)) stream.close() # external closing yield b.close(ignore_closed=True) + + +@gen_test() +def test_sending_traffic_jam(): + yield _run_traffic_jam(50, 300000) + + +@slow +@gen_test() +def test_large_traffic_jam(): + yield _run_traffic_jam(500, 1500000)