From 538767b4977d1bd14679ae555b7705088a7e5a16 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sun, 23 Dec 2018 08:44:01 -0800 Subject: [PATCH] Make LocalCluster.close async friendly (#2437) --- distributed/deploy/local.py | 55 +++++++++++++---------- distributed/deploy/tests/test_adaptive.py | 32 ++++++------- distributed/deploy/tests/test_local.py | 9 ++-- distributed/tests/test_client.py | 2 +- 4 files changed, 52 insertions(+), 46 deletions(-) diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index d3feedb578..4f6ebe12fd 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -4,7 +4,7 @@ from datetime import timedelta import logging import math -from time import sleep +import warnings import weakref import toolz @@ -13,7 +13,7 @@ from .cluster import Cluster from ..core import CommClosedError from ..utils import (sync, ignoring, All, silence_logging, LoopRunner, - log_errors, thread_state) + log_errors, thread_state, parse_timedelta) from ..nanny import Nanny from ..scheduler import Scheduler from ..worker import Worker, _ncores @@ -151,9 +151,12 @@ def __repr__(self): def __await__(self): return self._started.__await__() + @property + def asynchronous(self): + return self._asynchronous or getattr(thread_state, 'asynchronous', False) + def sync(self, func, *args, **kwargs): - asynchronous = kwargs.pop('asynchronous', None) - if asynchronous or self._asynchronous or getattr(thread_state, 'asynchronous', False): + if kwargs.pop('asynchronous', None) or self.asynchronous: callback_timeout = kwargs.pop('callback_timeout', None) future = func(*args, **kwargs) if callback_timeout is not None: @@ -196,6 +199,10 @@ def _start(self, ip=None, n_workers=0): @gen.coroutine def _start_worker(self, death_timeout=60, **kwargs): + if self.status and self.status.startswith('clos'): + warnings.warn("Tried to start a worker while status=='%s'" % self.status) + return + if self.processes: W = Nanny kwargs['quiet'] = True @@ -257,14 +264,22 @@ def stop_worker(self, w): self.sync(self._stop_worker, w) @gen.coroutine - def _close(self): + def _close(self, timeout='2s'): # Can be 'closing' as we're called by close() below if self.status == 'closed': return + self.status = 'closing' + + self.scheduler.clear_task_state() + + with ignoring(gen.TimeoutError): + yield gen.with_timeout( + timedelta(seconds=parse_timedelta(timeout)), + All([self._stop_worker(w) for w in self.workers]), + ) + del self.workers[:] try: - with ignoring(gen.TimeoutError, CommClosedError, OSError): - yield All([w._close() for w in self.workers]) with ignoring(gen.TimeoutError, CommClosedError, OSError): yield self.scheduler.close(fast=True) del self.workers[:] @@ -277,26 +292,18 @@ def close(self, timeout=20): return try: - self.scheduler.clear_task_state() - - for w in self.workers: - self.loop.add_callback(self._stop_worker, w) - for i in range(10): - if not self.workers: - break - else: - sleep(0.01) - del self.workers[:] - try: - self._loop_runner.run_sync(self._close, callback_timeout=timeout) - except RuntimeError: # IOLoop is closed - pass - self._loop_runner.stop() - finally: - self.status = 'closed' + result = self.sync(self._close, callback_timeout=timeout) + except RuntimeError: # IOLoop is closed + pass + with ignoring(AttributeError): silence_logging(self._old_logging_level) + if not self.asynchronous: + self._loop_runner.stop() + + return result + @gen.coroutine def scale_up(self, n, **kwargs): """ Bring the total count of workers up to ``n`` diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 3014defa74..c3a40b23c2 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -91,7 +91,7 @@ def test_adaptive_local_cluster_multi_workers(): asynchronous=True) try: cluster.scheduler.allowed_failures = 1000 - alc = Adaptive(cluster.scheduler, cluster, interval=100) + alc = cluster.adapt(interval=100) c = yield Client(cluster, asynchronous=True) futures = c.map(slowinc, range(100), delay=0.01) @@ -120,8 +120,8 @@ def test_adaptive_local_cluster_multi_workers(): yield c.gather(futures) finally: - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() @gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 10, active_rpc_timeout=10) @@ -199,8 +199,8 @@ def test_min_max(): assert time() < start + 2 assert frequencies(pluck(1, adapt.log)) == {'up': 2, 'down': 1} finally: - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() @gen_test() @@ -223,8 +223,8 @@ def test_avoid_churn(): assert frequencies(pluck(1, adapt.log)) == {'up': 1} finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -270,8 +270,8 @@ def test_adapt_quickly(): yield gen.sleep(0.1) assert len(cluster.scheduler.workers) == 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -295,8 +295,8 @@ def test_adapt_down(): yield gen.sleep(0.1) assert time() < start + 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks") @@ -317,8 +317,8 @@ def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() def test_basic_no_loop(): @@ -358,8 +358,8 @@ def test_target_duration(): assert adaptive.log[1][1:] == ('up', {'n': 20}) finally: - yield client._close() - yield cluster._close() + yield client.close() + yield cluster.close() @gen_test(timeout=None) @@ -391,7 +391,7 @@ def key(ws): names = {ws.name for ws in cluster.scheduler.workers.values()} assert names == {'a-1', 'a-2'} or names == {'b-1', 'b-2'} finally: - yield cluster._close() + yield cluster.close() @gen_cluster(client=True, ncores=[]) diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index c8b6bbc766..ee586c5be4 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -50,7 +50,6 @@ def test_close_twice(): cluster.close() sleep(0.5) log = log.getvalue() - print(log) assert not log @@ -291,8 +290,8 @@ def test_scale_up_and_down(): assert len(cluster.workers) == 1 assert addr not in cluster.scheduler.ncores - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() def test_silent_startup(): @@ -511,8 +510,8 @@ def scale_down(self, *args, **kwargs): yield gen.sleep(0.01) assert time() < start + 3 - yield c._close() - yield cluster._close() + yield c.close() + yield cluster.close() def test_local_tls_restart(loop): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 35af16fd4d..f9aaf304c9 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1871,7 +1871,7 @@ def test_repr_localcluster(): assert cluster.scheduler.address in text finally: yield client.close() - yield cluster._close() + yield cluster.close() @gen_cluster(client=True)