Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 193 additions & 8 deletions dask_jobqueue/cluster_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
import math

from tornado import gen

from distributed.deploy.adaptive import Adaptive
from distributed.deploy import Cluster
from distributed.utils import log_errors
from distributed.utils import log_errors, ignoring, parse_bytes, \
PeriodicCallback

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -34,6 +38,12 @@ def scale_down(self, workers: List[str], n: int):
''' Callable mapping a WorkerState object to a group, see
Scheduler.workers_to_close
'''
<<<<<<< HEAD
4. worker_spec dict attribute if scale(cores=...) or scale(memory=...)
can be used by users.
worker_spec = {'cores': 4, 'memory': '16 GB'}
=======
>>>>>>> master

This will provide a general ``scale`` method as well as an IPython widget
for display.
Expand All @@ -44,6 +54,8 @@ def scale_down(self, workers: List[str], n: int):
- Connect to a local or remote Scheduler through RPC, and then
communicate with it.
- Ability to start a local or remote scheduler.
- Ability to work with different worker pools: in scale, adaptive,
worker_spec...
- Scheduler
- Provide some remote methods:
- retire_workers(n: int): close enough workers ot have only n
Expand All @@ -64,20 +76,72 @@ def scale_down(self, workers: List[str], n: int):
>>> cluster = MyCluster()
>>> cluster.scale(5) # scale manually
>>> cluster.adapt(minimum=1, maximum=100) # scale automatically
>>> cluster.scale(cores=100) # scale manually to cores nb
"""

def __init__(self, adaptive_options={}):
self._target_scale = 0
self._adaptive_options = adaptive_options
self._adaptive_options.setdefault('worker_key', self.worker_key)

def adapt(self, minimum_cores=None, maximum_cores=None,
minimum_memory=None, maximum_memory=None, **kwargs):
""" Turn on adaptivity
For keyword arguments see dask.distributed.Adaptive
Instead of minimum and maximum parameters which apply to the number of
worker, If Cluster object implements worker_spec attribute, one can use
the following parameters:
Parameters
----------
minimum_cores: int
Minimum number of cores for the cluster
maximum_cores: int
Maximum number of cores for the cluster
minimum_memory: str
Minimum amount of memory for the cluster
maximum_memory: str
Maximum amount of memory for the cluster
Examples
--------
>>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
>>> cluster.adapt(minimum_cores=24, maximum_cores=96)
>>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB')
"""
with ignoring(AttributeError):
self._adaptive.stop()
if not hasattr(self, '_adaptive_options'):
self._adaptive_options = {}
if 'minimum' not in kwargs:
if minimum_cores is not None:
kwargs['minimum'] = self._get_nb_workers_from_cores(minimum_cores)
elif minimum_memory is not None:
kwargs['minimum'] = self._get_nb_workers_from_memory(minimum_memory)
if 'maximum' not in kwargs:
if maximum_cores is not None:
kwargs['maximum'] = self._get_nb_workers_from_cores(maximum_cores)
elif maximum_memory is not None:
kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory)
self._adaptive_options.update(kwargs)
self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options)
return self._adaptive

@gen.coroutine
def _scale(self, n):
def _scale(self, n=None, cores=None, memory=None):
""" Asynchronously called scale method

This allows to do every operation with a coherent ocntext
"""
with log_errors():
if [n, cores, memory].count(None) != 2:
raise ValueError('One and only one of n, cores, memory kwargs'
' should be used, n={}, cores={}, memory={}'
' provided.'.format(n, cores, memory))
if n is None:
if cores is not None:
n = self._get_nb_workers_from_cores(cores)
elif memory is not None:
n = self._get_nb_workers_from_memory(memory)

# here we rely on a ClusterManager attribute to retrieve the
# active and pending workers
if n == self._target_scale:
Expand All @@ -99,29 +163,150 @@ def _scale(self, n):
self.scale_down(to_close, n)
self._target_scale = n

def scale(self, n):
""" Scale cluster to n workers

def scale(self, n=None, cores=None, memory=None):
""" Scale cluster to n workers or to the given number of cores or
memory
number of cores and memory are converted into number of workers using
worker_spec attribute.
Parameters
----------
n: int
Target number of workers

cores: int
Target number of cores
memory: str
Target amount of available memory
Example
-------
>>> cluster.scale(10) # scale cluster to ten workers

>>> cluster.scale(cores=100) # scale cluster to 100 cores
>>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory
See Also
--------
Cluster.scale_up
Cluster.scale_down
Cluster.worker_spec
"""
# TODO we should not rely on scheduler loop here, self should have its
# own loop
self.scheduler.loop.add_callback(self._scale, n)
self.scheduler.loop.add_callback(self._scale, n, cores, memory)

def _widget(self):
""" Create IPython widget for display within a notebook """
try:
return self._cached_widget
except AttributeError:
pass

from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion, Text

layout = Layout(width='150px')

if 'bokeh' in self.scheduler.services:
link = self.dashboard_link
link = '<p><b>Dashboard: </b><a href="%s" target="_blank">%s</a></p>\n' % (link, link)
else:
link = ''

title = '<h2>%s</h2>' % type(self).__name__
title = HTML(title)
dashboard = HTML(link)

status = HTML(self._widget_status(), layout=Layout(min_width='150px'))

request = IntText(0, description='Workers', layout=layout)
scale = Button(description='Scale', layout=layout)
request_cores = IntText(0, description='Cores', layout=layout)
scale_cores = Button(description='Scale', layout=layout)
request_memory = Text('O GB', description='Memory', layout=layout)
scale_memory = Button(description='Scale', layout=layout)

minimum = IntText(0, description='Minimum', layout=layout)
maximum = IntText(0, description='Maximum', layout=layout)
adapt = Button(description='Adapt', layout=layout)
minimum_cores = IntText(0, description='Min cores', layout=layout)
maximum_cores = IntText(0, description='Max cores', layout=layout)
adapt_cores = Button(description='Adapt', layout=layout)
minimum_mem = Text('0 GB', description='Min memory', layout=layout)
maximum_mem = Text('0 GB', description='Max memory', layout=layout)
adapt_mem = Button(description='Adapt', layout=layout)

scale_hbox = [HBox([request, scale])]
adapt_hbox = [HBox([minimum, maximum, adapt])]
if hasattr(self, 'worker_spec'):
scale_hbox.append(HBox([request_cores, scale_cores]))
scale_hbox.append(HBox([request_memory, scale_memory]))
adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores]))
adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem]))

accordion = Accordion([VBox(scale_hbox),
VBox(adapt_hbox)],
layout=Layout(min_width='500px'))
accordion.selected_index = None
accordion.set_title(0, 'Manual Scaling')
accordion.set_title(1, 'Adaptive Scaling')

box = VBox([title,
HBox([status,
accordion]),
dashboard])

self._cached_widget = box

def adapt_cb(b):
self.adapt(minimum=minimum.value, maximum=maximum.value)

def adapt_cores_cb(b):
self.adapt(minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value)

def adapt_mem_cb(b):
self.adapt(minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value)

adapt.on_click(adapt_cb)
adapt_cores.on_click(adapt_cores_cb)
adapt_mem.on_click(adapt_mem_cb)

def scale_cb(request, kwarg):
def request_cb(b):
with log_errors():
arg = request.value
with ignoring(AttributeError):
self._adaptive.stop()
local_kwargs = dict()
local_kwargs[kwarg] = arg
self.scale(**local_kwargs)

return request_cb

scale.on_click(scale_cb(request, 'n'))
scale_cores.on_click(scale_cb(request_cores, 'cores'))
scale_memory.on_click(scale_cb(request_memory, 'memory'))

def update():
status.value = self._widget_status()

pc = PeriodicCallback(update, 500, io_loop=self.scheduler.loop)
self.scheduler.periodic_callbacks['cluster-repr'] = pc
pc.start()

return box

def worker_key(self, worker_state):
''' Callable mapping a WorkerState object to a group, see
Scheduler.workers_to_close
'''
return worker_state

def _get_nb_workers_from_cores(self, cores):
return math.ceil(cores / self.worker_spec['cores'])

def _get_nb_workers_from_memory(self, memory):
return math.ceil(parse_bytes(memory) / parse_bytes(self.worker_spec['memory']))

@property
def worker_spec(self):
''' single worker process info needed for scaling on cores or memory '''
raise NotImplementedError('{} class does not provide worker_spec '
'attribute, needed for scaling with '
'cores or memory kwargs.'
.format(self.__class__.__name__))
20 changes: 15 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,11 @@ def __init__(self,
# dask-worker command line build
dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=python)
command_args = [dask_worker_command, self.scheduler.address]
command_args += ['--nthreads', self.worker_threads]
command_args += ['--nthreads', self.worker_process_threads]
if processes is not None and processes > 1:
command_args += ['--nprocs', processes]

mem = format_bytes(self.worker_memory / self.worker_processes)
command_args += ['--memory-limit', mem.replace(' ', '')]
command_args += ['--memory-limit', self.worker_process_memory]
command_args += ['--name', '%s--${JOB_ID}--' % name]

if death_timeout is not None:
Expand All @@ -271,7 +270,7 @@ def __init__(self,

def __repr__(self):
running_workers = self._count_active_workers()
running_cores = running_workers * self.worker_threads
running_cores = running_workers * self.worker_process_threads
total_jobs = len(self.pending_jobs) + len(self.running_jobs)
total_workers = total_jobs * self.worker_processes
running_memory = running_workers * self.worker_memory / self.worker_processes
Expand All @@ -298,9 +297,20 @@ def finished_jobs(self):
return self._scheduler_plugin.finished_jobs

@property
def worker_threads(self):
def worker_process_threads(self):
return int(self.worker_cores / self.worker_processes)

@property
def worker_process_memory(self):
mem = format_bytes(self.worker_memory / self.worker_processes)
mem = mem.replace(' ', '')
return mem

@property
def worker_spec(self):
''' single worker process info needed for scaling on cores or memory '''
return {'cores': self.worker_process_threads, 'memory': self.worker_process_memory}

def job_script(self):
""" Construct a job submission script """
pieces = {'job_header': self.job_header,
Expand Down
57 changes: 57 additions & 0 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,38 @@ def test_basic(loop):
assert not cluster.running_jobs


@pytest.mark.env("pbs") # noqa: F811
def test_scale_cores_memory(loop):
with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
job_extra=['-V'], loop=loop) as cluster:
with Client(cluster) as client:

cluster.scale(cores=2)

start = time()
while not(cluster.pending_jobs or cluster.running_jobs):
sleep(0.100)
assert time() < start + QUEUE_WAIT

future = client.submit(lambda x: x + 1, 10)
assert future.result(QUEUE_WAIT) == 11
assert cluster.running_jobs

workers = list(client.scheduler_info()['workers'].values())
w = workers[0]
assert w['memory_limit'] == 2e9
assert w['ncores'] == 2

cluster.scale(memory='0GB')

start = time()
while cluster.running_jobs:
sleep(0.100)
assert time() < start + QUEUE_WAIT

assert not cluster.running_jobs


@pytest.mark.env("pbs") # noqa: F811
def test_basic_scale_edge_cases(loop):
with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
Expand Down Expand Up @@ -187,6 +219,31 @@ def test_adaptive_grouped(loop):
assert time() < start + QUEUE_WAIT


@pytest.mark.env("pbs") # noqa: F811
def test_adaptive_cores_mem(loop):
with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp',
job_extra=['-V'], loop=loop) as cluster:
cluster.adapt(minimum_cores=0, maximum_memory='4GB')
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)
assert future.result(QUEUE_WAIT) == 11

start = time()
processes = cluster.worker_processes
while len(client.scheduler_info()['workers']) != processes:
sleep(0.1)
assert time() < start + QUEUE_WAIT

del future

start = time()
while cluster.pending_jobs or cluster.running_jobs:
sleep(0.100)
assert time() < start + QUEUE_WAIT

assert cluster.finished_jobs


@pytest.mark.env("pbs") # noqa: F811
def test_scale_grouped(loop):
with PBSCluster(walltime='00:02:00', processes=2, cores=2, memory='2GB', local_directory='/tmp',
Expand Down