Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2554,7 +2554,7 @@ def replicate(self, comm=None, keys=None, n=None, workers=None,
'key-count': len(keys),
'branching-factor': branching_factor})

def workers_to_close(self, memory_ratio=2, key=None):
def workers_to_close(self, memory_ratio=None, n=None, key=None):
"""
Find workers that we can close with low cost

Expand All @@ -2572,6 +2572,8 @@ def workers_to_close(self, memory_ratio=2, key=None):
Amount of extra space we want to have for our stored data.
Defaults two 2, or that we want to have twice as much memory as we
currently have data.
n: int
Number of workers to close
key: Callable(WorkerState)
An optional callable mapping a WorkerState object to a group
affiliation. Groups will be closed together. This is useful when
Expand All @@ -2587,10 +2589,25 @@ def workers_to_close(self, memory_ratio=2, key=None):
>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

Remove two workers

>>> scheduler.workers_to_close(n=2)

Keep enough workers to have twice as much memory as we we need.

>>> scheduler.workers_to_close(memory_ratio=2)

Returns
-------
to_close: list of worker addresses that are OK to close

See Also
--------
Scheduler.retire_workers
"""
if n is None and memory_ratio is None:
memory_ratio = 2

with log_errors():
# XXX processing isn't used is the heuristics below
if all(ws.processing for ws in self.workers.values()):
Expand Down Expand Up @@ -2624,7 +2641,8 @@ def workers_to_close(self, memory_ratio=2, key=None):
while idle:
w = idle.pop()
limit -= limit_bytes[w]
if limit >= memory_ratio * total: # still plenty of space
if (n is not None and len(to_close) < n or # still plenty of space
memory_ratio is not None and limit >= memory_ratio * total):
to_close.append(w)
else:
break
Expand All @@ -2637,7 +2655,7 @@ def workers_to_close(self, memory_ratio=2, key=None):

@gen.coroutine
def retire_workers(self, comm=None, workers=None, remove=True, close=False,
close_workers=False):
close_workers=False, **kwargs):
""" Gracefully retire workers from cluster

Parameters
Expand All @@ -2652,11 +2670,18 @@ def retire_workers(self, comm=None, workers=None, remove=True, close=False,
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
**kwargs: dict
Extra options to pass to workers_to_close to determine which
workers we should drop

Returns
-------
Dictionary mapping worker ID/address to dictionary of information about
that worker for each retired worker.

See Also
--------
Scheduler.workers_to_close
"""
if close:
logger.warning("The keyword close= has been deprecated. "
Expand All @@ -2666,7 +2691,7 @@ def retire_workers(self, comm=None, workers=None, remove=True, close=False,
if workers is None:
while True:
try:
workers = self.workers_to_close()
workers = self.workers_to_close(**kwargs)
if workers:
workers = yield self.retire_workers(workers=workers,
remove=remove,
Expand Down
7 changes: 6 additions & 1 deletion distributed/tests/py3_test_utils_tst.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from distributed.utils_test import gen_cluster
from distributed.utils_test import gen_cluster, gen_test
from distributed import Client


Expand All @@ -8,3 +8,8 @@ async def test_gen_cluster_async(s, a, b): # flake8: noqa
future = c.submit(lambda x: x + 1, 1)
result = await future
assert result == 2


@gen_test()
async def test_gen_test_async(): # flake8: noqa
await gen.moment
15 changes: 15 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,21 @@ def test_retire_workers(c, s, a, b):
assert not workers


@gen_cluster(client=True)
def test_retire_workers_n(c, s, a, b):
yield s.retire_workers(n=1)
assert len(s.workers) == 1

yield s.retire_workers(n=0)
assert len(s.workers) == 1

yield s.retire_workers(n=1)
assert len(s.workers) == 0

yield s.retire_workers(n=0)
assert len(s.workers) == 0


@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 4)
def test_workers_to_close(cl, s, *workers):
s.task_duration['a'] = 4
Expand Down