Skip to content

Commit

Permalink
Make LocalCluster.close async friendly (#2437)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Dec 23, 2018
1 parent 0f06178 commit 538767b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 46 deletions.
55 changes: 31 additions & 24 deletions distributed/deploy/local.py
Expand Up @@ -4,7 +4,7 @@
from datetime import timedelta
import logging
import math
from time import sleep
import warnings
import weakref
import toolz

Expand All @@ -13,7 +13,7 @@
from .cluster import Cluster
from ..core import CommClosedError
from ..utils import (sync, ignoring, All, silence_logging, LoopRunner,
log_errors, thread_state)
log_errors, thread_state, parse_timedelta)
from ..nanny import Nanny
from ..scheduler import Scheduler
from ..worker import Worker, _ncores
Expand Down Expand Up @@ -151,9 +151,12 @@ def __repr__(self):
def __await__(self):
return self._started.__await__()

@property
def asynchronous(self):
return self._asynchronous or getattr(thread_state, 'asynchronous', False)

def sync(self, func, *args, **kwargs):
asynchronous = kwargs.pop('asynchronous', None)
if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False):
if kwargs.pop('asynchronous', None) or self.asynchronous:
callback_timeout = kwargs.pop('callback_timeout', None)
future = func(*args, **kwargs)
if callback_timeout is not None:
Expand Down Expand Up @@ -196,6 +199,10 @@ def _start(self, ip=None, n_workers=0):

@gen.coroutine
def _start_worker(self, death_timeout=60, **kwargs):
if self.status and self.status.startswith('clos'):
warnings.warn("Tried to start a worker while status=='%s'" % self.status)
return

if self.processes:
W = Nanny
kwargs['quiet'] = True
Expand Down Expand Up @@ -257,14 +264,22 @@ def stop_worker(self, w):
self.sync(self._stop_worker, w)

@gen.coroutine
def _close(self):
def _close(self, timeout='2s'):
# Can be 'closing' as we're called by close() below
if self.status == 'closed':
return
self.status = 'closing'

self.scheduler.clear_task_state()

with ignoring(gen.TimeoutError):
yield gen.with_timeout(
timedelta(seconds=parse_timedelta(timeout)),
All([self._stop_worker(w) for w in self.workers]),
)
del self.workers[:]

try:
with ignoring(gen.TimeoutError, CommClosedError, OSError):
yield All([w._close() for w in self.workers])
with ignoring(gen.TimeoutError, CommClosedError, OSError):
yield self.scheduler.close(fast=True)
del self.workers[:]
Expand All @@ -277,26 +292,18 @@ def close(self, timeout=20):
return

try:
self.scheduler.clear_task_state()

for w in self.workers:
self.loop.add_callback(self._stop_worker, w)
for i in range(10):
if not self.workers:
break
else:
sleep(0.01)
del self.workers[:]
try:
self._loop_runner.run_sync(self._close, callback_timeout=timeout)
except RuntimeError: # IOLoop is closed
pass
self._loop_runner.stop()
finally:
self.status = 'closed'
result = self.sync(self._close, callback_timeout=timeout)
except RuntimeError: # IOLoop is closed
pass

with ignoring(AttributeError):
silence_logging(self._old_logging_level)

if not self.asynchronous:
self._loop_runner.stop()

return result

@gen.coroutine
def scale_up(self, n, **kwargs):
""" Bring the total count of workers up to ``n``
Expand Down
32 changes: 16 additions & 16 deletions distributed/deploy/tests/test_adaptive.py
Expand Up @@ -91,7 +91,7 @@ def test_adaptive_local_cluster_multi_workers():
asynchronous=True)
try:
cluster.scheduler.allowed_failures = 1000
alc = Adaptive(cluster.scheduler, cluster, interval=100)
alc = cluster.adapt(interval=100)
c = yield Client(cluster, asynchronous=True)

futures = c.map(slowinc, range(100), delay=0.01)
Expand Down Expand Up @@ -120,8 +120,8 @@ def test_adaptive_local_cluster_multi_workers():
yield c.gather(futures)

finally:
yield c._close()
yield cluster._close()
yield c.close()
yield cluster.close()


@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 10, active_rpc_timeout=10)
Expand Down Expand Up @@ -199,8 +199,8 @@ def test_min_max():
assert time() < start + 2
assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1}
finally:
yield c._close()
yield cluster._close()
yield c.close()
yield cluster.close()


@gen_test()
Expand All @@ -223,8 +223,8 @@ def test_avoid_churn():

assert frequencies(pluck(1, adapt.log)) == {'up': 1}
finally:
yield client._close()
yield cluster._close()
yield client.close()
yield cluster.close()


@gen_test(timeout=None)
Expand Down Expand Up @@ -270,8 +270,8 @@ def test_adapt_quickly():
yield gen.sleep(0.1)
assert len(cluster.scheduler.workers) == 1
finally:
yield client._close()
yield cluster._close()
yield client.close()
yield cluster.close()


@gen_test(timeout=None)
Expand All @@ -295,8 +295,8 @@ def test_adapt_down():
yield gen.sleep(0.1)
assert time() < start + 1
finally:
yield client._close()
yield cluster._close()
yield client.close()
yield cluster.close()


@pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks")
Expand All @@ -317,8 +317,8 @@ def test_no_more_workers_than_tasks():

assert len(cluster.scheduler.workers) <= 1
finally:
yield client._close()
yield cluster._close()
yield client.close()
yield cluster.close()


def test_basic_no_loop():
Expand Down Expand Up @@ -358,8 +358,8 @@ def test_target_duration():
assert adaptive.log[1][1:] == ('up', {'n': 20})

finally:
yield client._close()
yield cluster._close()
yield client.close()
yield cluster.close()


@gen_test(timeout=None)
Expand Down Expand Up @@ -391,7 +391,7 @@ def key(ws):
names = {ws.name for ws in cluster.scheduler.workers.values()}
assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'}
finally:
yield cluster._close()
yield cluster.close()


@gen_cluster(client=True, ncores=[])
Expand Down
9 changes: 4 additions & 5 deletions distributed/deploy/tests/test_local.py
Expand Up @@ -50,7 +50,6 @@ def test_close_twice():
cluster.close()
sleep(0.5)
log = log.getvalue()
print(log)
assert not log


Expand Down Expand Up @@ -291,8 +290,8 @@ def test_scale_up_and_down():
assert len(cluster.workers) == 1
assert addr not in cluster.scheduler.ncores

yield c._close()
yield cluster._close()
yield c.close()
yield cluster.close()


def test_silent_startup():
Expand Down Expand Up @@ -511,8 +510,8 @@ def scale_down(self, *args, **kwargs):
yield gen.sleep(0.01)
assert time() < start + 3

yield c._close()
yield cluster._close()
yield c.close()
yield cluster.close()


def test_local_tls_restart(loop):
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Expand Up @@ -1871,7 +1871,7 @@ def test_repr_localcluster():
assert cluster.scheduler.address in text
finally:
yield client.close()
yield cluster._close()
yield cluster.close()


@gen_cluster(client=True)
Expand Down

0 comments on commit 538767b

Please sign in to comment.