Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,41 +806,44 @@ def _release_key(self, key):
def _handle_report(self):
""" Listen to scheduler """
with log_errors():
while True:
try:
msgs = yield self.scheduler_comm.comm.read()
except CommClosedError:
if self.status == 'running':
logger.warning("Client report stream closed to scheduler")
logger.info("Reconnecting...")
self.status = 'connecting'
yield self._reconnect()
continue
else:
break
if not isinstance(msgs, list):
msgs = [msgs]
try:
while True:
try:
msgs = yield self.scheduler_comm.comm.read()
except CommClosedError:
if self.status == 'running':
logger.warning("Client report stream closed to scheduler")
logger.info("Reconnecting...")
self.status = 'connecting'
yield self._reconnect()
continue
else:
break
if not isinstance(msgs, list):
msgs = [msgs]

breakout = False
for msg in msgs:
logger.debug("Client receives message %s", msg)
breakout = False
for msg in msgs:
logger.debug("Client receives message %s", msg)

if 'status' in msg and 'error' in msg['status']:
six.reraise(*clean_exception(**msg))
if 'status' in msg and 'error' in msg['status']:
six.reraise(*clean_exception(**msg))

op = msg.pop('op')
op = msg.pop('op')

if op == 'close' or op == 'stream-closed':
breakout = True
break
if op == 'close' or op == 'stream-closed':
breakout = True
break

try:
handler = self._handlers[op]
handler(**msg)
except Exception as e:
logger.exception(e)
if breakout:
break
try:
handler = self._handlers[op]
handler(**msg)
except Exception as e:
logger.exception(e)
if breakout:
break
except CancelledError:
pass

def _handle_key_in_memory(self, key=None, type=None, workers=None):
state = self.futures.get(key)
Expand Down Expand Up @@ -897,6 +900,8 @@ def _handle_error(self, exception=None):
@gen.coroutine
def _close(self, fast=False):
""" Send close signal and wait until scheduler completes """
self.status = 'closing'

with log_errors():
for pc in self._periodic_callbacks:
pc.stop()
Expand All @@ -922,14 +927,24 @@ def _close(self, fast=False):
self.status = 'closed'
if _get_global_client() is self:
_set_global_client(None)
coroutines = set(self.coroutines)
for f in self.coroutines:
# cancel() works on asyncio futures (Tornado 5)
# but is a no-op on Tornado futures
f.cancel()
if f.cancelled():
coroutines.remove(f)
del self.coroutines[:]
if not fast:
with ignoring(TimeoutError):
yield [gen.with_timeout(timedelta(seconds=2), f)
for f in self.coroutines]
yield gen.with_timeout(timedelta(seconds=2),
list(coroutines))
with ignoring(AttributeError):
self.scheduler.close_rpc()
self.scheduler = None

self.status = 'closed'

_shutdown = _close

def close(self, timeout=10):
Expand All @@ -944,16 +959,17 @@ def close(self, timeout=10):
--------
Client.restart
"""
# XXX handling of self.status here is not thread-safe
if self.status == 'closed':
return
self.status = 'closing'

if self.asynchronous:
future = self._close()
if timeout:
future = gen.with_timeout(timedelta(seconds=timeout), future)
return future
# XXX handling of self.status here is not thread-safe
if self.status == 'closed':
return

self.status = 'closing'
if self._start_arg is None:
with ignoring(AttributeError):
self.cluster.close()
Expand Down
1 change: 1 addition & 0 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def abort(self):
if not self.closed():
# Putting EOF is cheap enough that we do it on abort() too
self._write_loop.add_callback(self._write_q.put_nowait, _EOF)
self._read_q.put_nowait(_EOF)
self._write_q = self._read_q = None
self._closed = True
self._finalizer.detach()
Expand Down
41 changes: 20 additions & 21 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ def client_communicate(key, delay=0):
msg = yield comm.read()
assert msg == {'op': 'pong', 'data': key}
l.append(key)
with pytest.raises(CommClosedError):
yield comm.read()
yield comm.close()

client_communicate = partial(run_client, client_communicate)
Expand Down Expand Up @@ -649,29 +651,26 @@ def test_inproc_comm_closed_implicit():

@gen.coroutine
def check_comm_closed_explicit(addr, listen_args=None, connect_args=None):
@gen.coroutine
def handle_comm(comm):
# Wait
try:
yield comm.read()
except CommClosedError:
pass

listener = listen(addr, handle_comm, connection_args=listen_args)
listener.start()
contact_addr = listener.contact_address

comm = yield connect(contact_addr, connection_args=connect_args)
comm.close()
a, b = yield get_comm_pair(addr, listen_args=listen_args, connect_args=connect_args)
a_read = a.read()
b_read = b.read()
yield a.close()
# In-flight reads should abort with CommClosedError
with pytest.raises(CommClosedError):
yield comm.write({})

comm = yield connect(contact_addr, connection_args=connect_args)
comm.close()
yield a_read
with pytest.raises(CommClosedError):
yield comm.read()

yield gen.moment
yield b_read
# New reads as well
with pytest.raises(CommClosedError):
yield a.read()
with pytest.raises(CommClosedError):
yield b.read()
# And writes
with pytest.raises(CommClosedError):
yield a.write({})
with pytest.raises(CommClosedError):
yield b.write({})
yield b.close()


@gen_test()
Expand Down