Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework BatchedSend logic #661

Merged
merged 10 commits into from
Nov 16, 2016
3 changes: 2 additions & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tornado.iostream import StreamClosedError
from tornado.ioloop import PeriodicCallback, IOLoop

from .core import read, write
from .core import read, write, flush
from .utils import ignoring, log_errors


Expand Down Expand Up @@ -112,6 +112,7 @@ def close(self, ignore_closed=False):
if self.buffer:
self.buffer, payload = [], self.buffer
yield write(self.stream, payload)
yield flush(self.stream)
except StreamClosedError:
if not ignore_closed:
raise
Expand Down
37 changes: 31 additions & 6 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def handle_stream(self, stream, address):
break
finally:
try:
stream.close()
yield close(stream)
except Exception as e:
logger.warn("Failed while closing writer", exc_info=True)
logger.info("Close connection from %s:%d to %s", address[0], address[1],
Expand Down Expand Up @@ -229,11 +229,36 @@ def write(stream, msg):
stream.write(b''.join(lengths))

for frame in frames:
# Can't wait for the write() Future as it may be lost
# ("If write is called again before that Future has resolved,
# the previous future will be orphaned and will never resolve")
stream.write(frame)

yield gen.moment


@gen.coroutine
def flush(stream):
"""Flush the stream's output buffer.
This is recommended before closing the stream.
"""
if stream.writing():
yield stream.write(b'')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that this will complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API expects that write() isn't called before flush() completes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we confident that this expectation is fulfilled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the docstring to better inform the reader. But perhaps we only want to expose close() so that we don't do any further mistakes. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly concerned about us as users. Only very advanced dask/distributed users should use read/write/close directly.

However, given that we've had problems reported it's possible that we aren't handling everything well internally.

I have no problem with read/write/close as an API generally.



@gen.coroutine
def close(stream):
"""Close a stream safely.
"""
if not stream.closed():
try:
flush(stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we yield on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, my bad.

except StreamClosedError:
pass
finally:
stream.close()


def pingpong(stream):
return b'pong'

Expand Down Expand Up @@ -295,7 +320,7 @@ def send_recv(stream=None, arg=None, ip=None, port=None, addr=None, reply=True,
else:
response = None
if kwargs.get('close'):
stream.close()
close(stream)
raise gen.Return(response)


Expand Down Expand Up @@ -398,7 +423,7 @@ def close_streams(self):
for stream in self.streams:
if stream and not stream.closed():
try:
stream.close()
close(stream)
except (OSError, IOError, StreamClosedError):
pass
self.streams.clear()
Expand Down Expand Up @@ -551,15 +576,15 @@ def collect(self):
self.open, self.active)
for streams in list(self.available.values()):
for stream in streams:
stream.close()
close(stream)

def close(self):
for streams in list(self.available.values()):
for stream in streams:
stream.close()
close(stream)
for streams in list(self.occupied.values()):
for stream in streams:
stream.close()
close(stream)


def coerce_to_address(o, out=str):
Expand Down
10 changes: 4 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from .batched import BatchedSend
from .config import config
from .core import (rpc, connect, read, write, MAX_BUFFER_SIZE,
from .core import (rpc, connect, read, write, close, MAX_BUFFER_SIZE,
Server, send_recv, coerce_to_address, error_message)
from .utils import (All, ignoring, clear_queue, get_ip, ignore_exceptions,
ensure_ip, get_fileno_limit, log_errors, key_split, mean,
Expand Down Expand Up @@ -410,7 +410,7 @@ def finished(self):
def close_streams(self):
""" Close all active IOStreams """
for stream in self.streams.values():
stream.stream.close()
close(stream.stream)
self.rpc.close()

@gen.coroutine
Expand Down Expand Up @@ -711,8 +711,7 @@ def remove_worker(self, stream=None, address=None, safe=False):
return 'already-removed'
with ignoring(AttributeError):
stream = self.worker_streams[address].stream
if not stream.closed():
stream.close()
close(stream)

host, port = address.split(':')

Expand Down Expand Up @@ -1141,8 +1140,7 @@ def worker_stream(self, worker):
import pdb; pdb.set_trace()
raise
finally:
if not stream.closed():
stream.close()
close(stream)
self.remove_worker(address=worker)

def correct_time_delay(self, worker, msg):
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tornado import gen

from distributed import Nanny, rpc, Scheduler
from distributed.core import connect, read, write
from distributed.core import connect, read, write, close
from distributed.protocol.pickle import dumps, loads
from distributed.utils import ignoring
from distributed.utils_test import gen_cluster
Expand Down Expand Up @@ -120,7 +120,7 @@ def test_monitor_resources(s):
assert isinstance(msg, dict)
assert {'cpu_percent', 'memory_percent'}.issubset(msg)

stream.close()
close(stream)
yield n._close()
s.stop()

Expand Down
12 changes: 6 additions & 6 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pytest

from distributed import Nanny, Worker
from distributed.core import connect, read, write, rpc
from distributed.core import connect, read, write, close, rpc
from distributed.scheduler import (validate_state, decide_worker,
Scheduler)
from distributed.client import _wait
Expand Down Expand Up @@ -402,7 +402,7 @@ def test_scheduler(s, a, b):
break

write(stream, {'op': 'close'})
stream.close()
close(stream)


@gen_cluster()
Expand Down Expand Up @@ -467,7 +467,7 @@ def test_server(s, a, b):
assert msg == {'op': 'stream-closed'}
with pytest.raises(StreamClosedError):
yield readone(stream)
stream.close()
close(stream)


@gen_cluster()
Expand Down Expand Up @@ -547,7 +547,7 @@ def func(scheduler):
expected = s.processing, s.stacks
assert cloudpickle.loads(response) == expected

stream.close()
close(stream)


@gen_cluster()
Expand All @@ -573,7 +573,7 @@ def teardown(scheduler, state):
response = yield read(stream)
assert response == 'OK'

stream.close()
close(stream)
start = time()
while not hasattr(s, 'flag'):
yield gen.sleep(0.01)
Expand All @@ -599,7 +599,7 @@ def func(scheduler):
response = yield read(stream)
assert response == True

stream.close()
close(stream)


@gen_test(timeout=None)
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_ip(host='8.8.8.8', port=80):
def ignoring(*exceptions):
try:
yield
except exceptions:
except exceptions as e:
pass


Expand Down
4 changes: 2 additions & 2 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from tornado.ioloop import IOLoop, TimeoutError
from tornado.iostream import StreamClosedError

from .core import connect, read, write, rpc
from .core import connect, read, write, close, rpc
from .utils import ignoring, log_errors, sync
import pytest

Expand Down Expand Up @@ -303,7 +303,7 @@ def disconnect(ip, port):
yield write(stream, {'op': 'terminate', 'close': True})
response = yield read(stream)
finally:
stream.close()
yield close(stream)


import pytest
Expand Down