diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py
index 8931306c..5428a3f6 100644
--- a/dask_jobqueue/cluster_manager.py
+++ b/dask_jobqueue/cluster_manager.py
@@ -1,8 +1,12 @@
import logging
+import math
from tornado import gen
+
+from distributed.deploy.adaptive import Adaptive
from distributed.deploy import Cluster
-from distributed.utils import log_errors
+from distributed.utils import log_errors, ignoring, parse_bytes, \
+ PeriodicCallback
logger = logging.getLogger(__name__)
@@ -34,6 +38,12 @@ def scale_down(self, workers: List[str], n: int):
''' Callable mapping a WorkerState object to a group, see
Scheduler.workers_to_close
'''
+<<<<<<< HEAD
+ 4. worker_spec dict attribute if scale(cores=...) or scale(memory=...)
+ can be used by users.
+ worker_spec = {'cores': 4, 'memory': '16 GB'}
+=======
+>>>>>>> master
This will provide a general ``scale`` method as well as an IPython widget
for display.
@@ -44,6 +54,8 @@ def scale_down(self, workers: List[str], n: int):
- Connect to a local or remote Scheduler through RPC, and then
communicate with it.
- Ability to start a local or remote scheduler.
+ - Ability to work with different worker pools: in scale, adaptive,
+ worker_spec...
- Scheduler
- Provide some remote methods:
- retire_workers(n: int): close enough workers ot have only n
@@ -64,6 +76,7 @@ def scale_down(self, workers: List[str], n: int):
>>> cluster = MyCluster()
>>> cluster.scale(5) # scale manually
>>> cluster.adapt(minimum=1, maximum=100) # scale automatically
+ >>> cluster.scale(cores=100) # scale manually to cores nb
"""
def __init__(self, adaptive_options={}):
@@ -71,13 +84,64 @@ def __init__(self, adaptive_options={}):
self._adaptive_options = adaptive_options
self._adaptive_options.setdefault('worker_key', self.worker_key)
+ def adapt(self, minimum_cores=None, maximum_cores=None,
+ minimum_memory=None, maximum_memory=None, **kwargs):
+ """ Turn on adaptivity
+ For keyword arguments see dask.distributed.Adaptive
+ Instead of minimum and maximum parameters which apply to the number of
+ worker, If Cluster object implements worker_spec attribute, one can use
+ the following parameters:
+ Parameters
+ ----------
+ minimum_cores: int
+ Minimum number of cores for the cluster
+ maximum_cores: int
+ Maximum number of cores for the cluster
+ minimum_memory: str
+ Minimum amount of memory for the cluster
+ maximum_memory: str
+ Maximum amount of memory for the cluster
+ Examples
+ --------
+ >>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
+ >>> cluster.adapt(minimum_cores=24, maximum_cores=96)
+ >>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB')
+ """
+ with ignoring(AttributeError):
+ self._adaptive.stop()
+ if not hasattr(self, '_adaptive_options'):
+ self._adaptive_options = {}
+ if 'minimum' not in kwargs:
+ if minimum_cores is not None:
+ kwargs['minimum'] = self._get_nb_workers_from_cores(minimum_cores)
+ elif minimum_memory is not None:
+ kwargs['minimum'] = self._get_nb_workers_from_memory(minimum_memory)
+ if 'maximum' not in kwargs:
+ if maximum_cores is not None:
+ kwargs['maximum'] = self._get_nb_workers_from_cores(maximum_cores)
+ elif maximum_memory is not None:
+ kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory)
+ self._adaptive_options.update(kwargs)
+ self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options)
+ return self._adaptive
+
@gen.coroutine
- def _scale(self, n):
+ def _scale(self, n=None, cores=None, memory=None):
""" Asynchronously called scale method
This allows to do every operation with a coherent ocntext
"""
with log_errors():
+ if [n, cores, memory].count(None) != 2:
+ raise ValueError('One and only one of n, cores, memory kwargs'
+ ' should be used, n={}, cores={}, memory={}'
+ ' provided.'.format(n, cores, memory))
+ if n is None:
+ if cores is not None:
+ n = self._get_nb_workers_from_cores(cores)
+ elif memory is not None:
+ n = self._get_nb_workers_from_memory(memory)
+
# here we rely on a ClusterManager attribute to retrieve the
# active and pending workers
if n == self._target_scale:
@@ -99,29 +163,150 @@ def _scale(self, n):
self.scale_down(to_close, n)
self._target_scale = n
- def scale(self, n):
- """ Scale cluster to n workers
-
+ def scale(self, n=None, cores=None, memory=None):
+ """ Scale cluster to n workers or to the given number of cores or
+ memory
+ number of cores and memory are converted into number of workers using
+ worker_spec attribute.
Parameters
----------
n: int
Target number of workers
-
+ cores: int
+ Target number of cores
+ memory: str
+ Target amount of available memory
Example
-------
>>> cluster.scale(10) # scale cluster to ten workers
-
+ >>> cluster.scale(cores=100) # scale cluster to 100 cores
+ >>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory
See Also
--------
Cluster.scale_up
Cluster.scale_down
+ Cluster.worker_spec
"""
# TODO we should not rely on scheduler loop here, self should have its
# own loop
- self.scheduler.loop.add_callback(self._scale, n)
+ self.scheduler.loop.add_callback(self._scale, n, cores, memory)
+
+ def _widget(self):
+ """ Create IPython widget for display within a notebook """
+ try:
+ return self._cached_widget
+ except AttributeError:
+ pass
+
+ from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion, Text
+
+ layout = Layout(width='150px')
+
+ if 'bokeh' in self.scheduler.services:
+ link = self.dashboard_link
+ link = '
Dashboard: %s
\n' % (link, link)
+ else:
+ link = ''
+
+ title = '%s
' % type(self).__name__
+ title = HTML(title)
+ dashboard = HTML(link)
+
+ status = HTML(self._widget_status(), layout=Layout(min_width='150px'))
+
+ request = IntText(0, description='Workers', layout=layout)
+ scale = Button(description='Scale', layout=layout)
+ request_cores = IntText(0, description='Cores', layout=layout)
+ scale_cores = Button(description='Scale', layout=layout)
+ request_memory = Text('O GB', description='Memory', layout=layout)
+ scale_memory = Button(description='Scale', layout=layout)
+
+ minimum = IntText(0, description='Minimum', layout=layout)
+ maximum = IntText(0, description='Maximum', layout=layout)
+ adapt = Button(description='Adapt', layout=layout)
+ minimum_cores = IntText(0, description='Min cores', layout=layout)
+ maximum_cores = IntText(0, description='Max cores', layout=layout)
+ adapt_cores = Button(description='Adapt', layout=layout)
+ minimum_mem = Text('0 GB', description='Min memory', layout=layout)
+ maximum_mem = Text('0 GB', description='Max memory', layout=layout)
+ adapt_mem = Button(description='Adapt', layout=layout)
+
+ scale_hbox = [HBox([request, scale])]
+ adapt_hbox = [HBox([minimum, maximum, adapt])]
+ if hasattr(self, 'worker_spec'):
+ scale_hbox.append(HBox([request_cores, scale_cores]))
+ scale_hbox.append(HBox([request_memory, scale_memory]))
+ adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores]))
+ adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem]))
+
+ accordion = Accordion([VBox(scale_hbox),
+ VBox(adapt_hbox)],
+ layout=Layout(min_width='500px'))
+ accordion.selected_index = None
+ accordion.set_title(0, 'Manual Scaling')
+ accordion.set_title(1, 'Adaptive Scaling')
+
+ box = VBox([title,
+ HBox([status,
+ accordion]),
+ dashboard])
+
+ self._cached_widget = box
+
+ def adapt_cb(b):
+ self.adapt(minimum=minimum.value, maximum=maximum.value)
+
+ def adapt_cores_cb(b):
+ self.adapt(minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value)
+
+ def adapt_mem_cb(b):
+ self.adapt(minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value)
+
+ adapt.on_click(adapt_cb)
+ adapt_cores.on_click(adapt_cores_cb)
+ adapt_mem.on_click(adapt_mem_cb)
+
+ def scale_cb(request, kwarg):
+ def request_cb(b):
+ with log_errors():
+ arg = request.value
+ with ignoring(AttributeError):
+ self._adaptive.stop()
+ local_kwargs = dict()
+ local_kwargs[kwarg] = arg
+ self.scale(**local_kwargs)
+
+ return request_cb
+
+ scale.on_click(scale_cb(request, 'n'))
+ scale_cores.on_click(scale_cb(request_cores, 'cores'))
+ scale_memory.on_click(scale_cb(request_memory, 'memory'))
+
+ def update():
+ status.value = self._widget_status()
+
+ pc = PeriodicCallback(update, 500, io_loop=self.scheduler.loop)
+ self.scheduler.periodic_callbacks['cluster-repr'] = pc
+ pc.start()
+
+ return box
def worker_key(self, worker_state):
''' Callable mapping a WorkerState object to a group, see
Scheduler.workers_to_close
'''
return worker_state
+
+ def _get_nb_workers_from_cores(self, cores):
+ return math.ceil(cores / self.worker_spec['cores'])
+
+ def _get_nb_workers_from_memory(self, memory):
+ return math.ceil(parse_bytes(memory) / parse_bytes(self.worker_spec['memory']))
+
+ @property
+ def worker_spec(self):
+ ''' single worker process info needed for scaling on cores or memory '''
+ raise NotImplementedError('{} class does not provide worker_spec '
+ 'attribute, needed for scaling with '
+ 'cores or memory kwargs.'
+ .format(self.__class__.__name__))
diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py
index 305e26f4..fa557147 100644
--- a/dask_jobqueue/core.py
+++ b/dask_jobqueue/core.py
@@ -247,12 +247,11 @@ def __init__(self,
# dask-worker command line build
dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=python)
command_args = [dask_worker_command, self.scheduler.address]
- command_args += ['--nthreads', self.worker_threads]
+ command_args += ['--nthreads', self.worker_process_threads]
if processes is not None and processes > 1:
command_args += ['--nprocs', processes]
- mem = format_bytes(self.worker_memory / self.worker_processes)
- command_args += ['--memory-limit', mem.replace(' ', '')]
+ command_args += ['--memory-limit', self.worker_process_memory]
command_args += ['--name', '%s--${JOB_ID}--' % name]
if death_timeout is not None:
@@ -271,7 +270,7 @@ def __init__(self,
def __repr__(self):
running_workers = self._count_active_workers()
- running_cores = running_workers * self.worker_threads
+ running_cores = running_workers * self.worker_process_threads
total_jobs = len(self.pending_jobs) + len(self.running_jobs)
total_workers = total_jobs * self.worker_processes
running_memory = running_workers * self.worker_memory / self.worker_processes
@@ -298,9 +297,20 @@ def finished_jobs(self):
return self._scheduler_plugin.finished_jobs
@property
- def worker_threads(self):
+ def worker_process_threads(self):
return int(self.worker_cores / self.worker_processes)
+ @property
+ def worker_process_memory(self):
+ mem = format_bytes(self.worker_memory / self.worker_processes)
+ mem = mem.replace(' ', '')
+ return mem
+
+ @property
+ def worker_spec(self):
+ ''' single worker process info needed for scaling on cores or memory '''
+ return {'cores': self.worker_process_threads, 'memory': self.worker_process_memory}
+
def job_script(self):
""" Construct a job submission script """
pieces = {'job_header': self.job_header,
diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py
index a725b9d9..cb9e4e6e 100644
--- a/dask_jobqueue/tests/test_pbs.py
+++ b/dask_jobqueue/tests/test_pbs.py
@@ -118,6 +118,38 @@ def test_basic(loop):
assert not cluster.running_jobs
+@pytest.mark.env("pbs") # noqa: F811
+def test_scale_cores_memory(loop):
+ with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
+ job_extra=['-V'], loop=loop) as cluster:
+ with Client(cluster) as client:
+
+ cluster.scale(cores=2)
+
+ start = time()
+ while not(cluster.pending_jobs or cluster.running_jobs):
+ sleep(0.100)
+ assert time() < start + QUEUE_WAIT
+
+ future = client.submit(lambda x: x + 1, 10)
+ assert future.result(QUEUE_WAIT) == 11
+ assert cluster.running_jobs
+
+ workers = list(client.scheduler_info()['workers'].values())
+ w = workers[0]
+ assert w['memory_limit'] == 2e9
+ assert w['ncores'] == 2
+
+ cluster.scale(memory='0GB')
+
+ start = time()
+ while cluster.running_jobs:
+ sleep(0.100)
+ assert time() < start + QUEUE_WAIT
+
+ assert not cluster.running_jobs
+
+
@pytest.mark.env("pbs") # noqa: F811
def test_basic_scale_edge_cases(loop):
with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
@@ -187,6 +219,31 @@ def test_adaptive_grouped(loop):
assert time() < start + QUEUE_WAIT
+@pytest.mark.env("pbs") # noqa: F811
+def test_adaptive_cores_mem(loop):
+ with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
+ job_extra=['-V'], loop=loop) as cluster:
+ cluster.adapt(minimum_cores=0, maximum_memory='4GB')
+ with Client(cluster) as client:
+ future = client.submit(lambda x: x + 1, 10)
+ assert future.result(QUEUE_WAIT) == 11
+
+ start = time()
+ processes = cluster.worker_processes
+ while len(client.scheduler_info()['workers']) != processes:
+ sleep(0.1)
+ assert time() < start + QUEUE_WAIT
+
+ del future
+
+ start = time()
+ while cluster.pending_jobs or cluster.running_jobs:
+ sleep(0.100)
+ assert time() < start + QUEUE_WAIT
+
+ assert cluster.finished_jobs
+
+
@pytest.mark.env("pbs") # noqa: F811
def test_scale_grouped(loop):
with PBSCluster(walltime='00:02:00', processes=2, cores=2, memory='2GB', local_directory='/tmp',