From b21d48ef65e220a950915fe3a9e5b48d7892854c Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 11:06:50 +0100 Subject: [PATCH 1/7] Take into account grouped workers in scale --- dask_jobqueue/cluster_manager.py | 14 ++++++++++++-- dask_jobqueue/core.py | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 15e82bac..3369a49b 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -30,6 +30,10 @@ def scale_down(self, workers: List[str], n: int): ''' Close the workers with the given addresses or remove pending workers to match n running workers. ''' + 3. Optionally worker_key: Callable(WorkerState): + ''' Callable mapping a WorkerState object to a group, see + Scheduler.workers_to_close + ''' This will provide a general ``scale`` method as well as an IPython widget for display. @@ -62,8 +66,10 @@ def scale_down(self, workers: List[str], n: int): >>> cluster.adapt(minimum=1, maximum=100) # scale automatically """ - def __init__(self): + def __init__(self, adaptive_options={}): self._target_scale = 0 + self._adaptive_options = adaptive_options + self._adaptive_options.setdefault('worker_key', self.worker_key) @gen.coroutine def _scale(self, n): @@ -83,7 +89,8 @@ def _scale(self, n): # This may not be useful to call scheduler methods in this case # Scheduler interface here may need to be modified to_close = self.scheduler.workers_to_close( - n=len(self.scheduler.workers) - n) + n=len(self.scheduler.workers) - n, minimum=n, + key=self.worker_key) logger.debug("Closing workers: %s", to_close) # Should be an RPC call here yield self.scheduler.retire_workers(workers=to_close) @@ -112,3 +119,6 @@ def scale(self, n): # TODO we should not rely on scheduler loop here, self should have its # own loop self.scheduler.loop.add_callback(self._scale, n) + + def worker_key(self, wname): + return wname \ No newline at end of file diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index b14be675..85bc9515 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -452,7 +452,7 @@ def scale_down(self, workers, n=None): # We only need to kill some pending jobs, to_kill = int(n_to_close / self.worker_processes) jobs = list(self.pending_jobs.keys())[-to_kill:] - logger.debug("%d jobs to stop, stoppubg jobs %s", to_kill, jobs) + logger.debug("%d jobs to stop, stopping jobs %s", to_kill, jobs) self.stop_jobs(jobs) else: worker_states = [] From 409d7f53309f52781d8bad2448721efb9af44720 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 11:13:39 +0100 Subject: [PATCH 2/7] implementing worker_key --- dask_jobqueue/cluster_manager.py | 7 +++++-- dask_jobqueue/core.py | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 3369a49b..ec76dc6e 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -120,5 +120,8 @@ def scale(self, n): # own loop self.scheduler.loop.add_callback(self._scale, n) - def worker_key(self, wname): - return wname \ No newline at end of file + def worker_key(self, ws): + ''' Callable mapping a WorkerState object to a group, see + Scheduler.workers_to_close + ''' + return ws \ No newline at end of file diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 85bc9515..9f6f95c4 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -156,7 +156,6 @@ class JobQueueCluster(ClusterManager): submit_command = None cancel_command = None scheduler_name = '' - _adaptive_options = {'worker_key': lambda ws: _job_id_from_worker_name(ws.name)} job_id_regexp = r'(?P\d+)' def __init__(self, @@ -505,3 +504,6 @@ def _job_id_from_submit_output(self, out): raise ValueError(msg) return job_id + + def worker_key(self, ws): + return _job_id_from_worker_name(ws.name) \ No newline at end of file From 6fe63fee384f4a9534277907a0a5a29169073456 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 11:17:54 +0100 Subject: [PATCH 3/7] expand variable name --- dask_jobqueue/cluster_manager.py | 4 ++-- dask_jobqueue/core.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index ec76dc6e..0e2dd353 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -120,8 +120,8 @@ def scale(self, n): # own loop self.scheduler.loop.add_callback(self._scale, n) - def worker_key(self, ws): + def worker_key(self, worker_state): ''' Callable mapping a WorkerState object to a group, see Scheduler.workers_to_close ''' - return ws \ No newline at end of file + return worker_state \ No newline at end of file diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 9f6f95c4..dac96e74 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -505,5 +505,5 @@ def _job_id_from_submit_output(self, out): return job_id - def worker_key(self, ws): - return _job_id_from_worker_name(ws.name) \ No newline at end of file + def worker_key(self, worker_state): + return _job_id_from_worker_name(worker_state.name) \ No newline at end of file From 9d4d203b1416236ecfd7a7f2832b370418c033be Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 11:27:52 +0100 Subject: [PATCH 4/7] flake --- dask_jobqueue/cluster_manager.py | 2 +- dask_jobqueue/core.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 0e2dd353..8931306c 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -124,4 +124,4 @@ def worker_key(self, worker_state): ''' Callable mapping a WorkerState object to a group, see Scheduler.workers_to_close ''' - return worker_state \ No newline at end of file + return worker_state diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index dac96e74..305e26f4 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -506,4 +506,4 @@ def _job_id_from_submit_output(self, out): return job_id def worker_key(self, worker_state): - return _job_id_from_worker_name(worker_state.name) \ No newline at end of file + return _job_id_from_worker_name(worker_state.name) From b52fa2795aa8c1ae75a5176deb887b22460e1c34 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 12:08:48 +0100 Subject: [PATCH 5/7] scale with ncores and memory --- dask_jobqueue/cluster_manager.py | 201 +++++++++++++++++++++++++++++-- dask_jobqueue/core.py | 18 ++- dask_jobqueue/tests/test_pbs.py | 57 +++++++++ 3 files changed, 264 insertions(+), 12 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 8931306c..fe76d206 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -1,8 +1,13 @@ import logging +import math from tornado import gen +from weakref import ref + +from distributed.deploy.adaptive import Adaptive from distributed.deploy import Cluster -from distributed.utils import log_errors +from distributed.utils import log_errors, ignoring, parse_bytes, \ + PeriodicCallback logger = logging.getLogger(__name__) @@ -34,6 +39,9 @@ def scale_down(self, workers: List[str], n: int): ''' Callable mapping a WorkerState object to a group, see Scheduler.workers_to_close ''' + 4. worker_info dict attribute if scale(cores=...) or scale(memory=...) + can be used by users. + worker_info = {'cores': 4, 'memory': '16 GB'} This will provide a general ``scale`` method as well as an IPython widget for display. @@ -44,6 +52,8 @@ def scale_down(self, workers: List[str], n: int): - Connect to a local or remote Scheduler through RPC, and then communicate with it. - Ability to start a local or remote scheduler. + - Ability to work with different worker pools: in scale, adaptive, + worker_spec... - Scheduler - Provide some remote methods: - retire_workers(n: int): close enough workers ot have only n @@ -64,6 +74,7 @@ def scale_down(self, workers: List[str], n: int): >>> cluster = MyCluster() >>> cluster.scale(5) # scale manually >>> cluster.adapt(minimum=1, maximum=100) # scale automatically + >>> cluster.scale(cores=100) # scale manually to cores nb """ def __init__(self, adaptive_options={}): @@ -71,13 +82,64 @@ def __init__(self, adaptive_options={}): self._adaptive_options = adaptive_options self._adaptive_options.setdefault('worker_key', self.worker_key) + def adapt(self, minimum_cores=None, maximum_cores=None, + minimum_memory=None, maximum_memory=None, **kwargs): + """ Turn on adaptivity + For keyword arguments see dask.distributed.Adaptive + Instead of minimum and maximum parameters which apply to the number of + worker, If Cluster object implements worker_info attribute, one can use + the following parameters: + Parameters + ---------- + minimum_cores: int + Minimum number of cores for the cluster + maximum_cores: int + Maximum number of cores for the cluster + minimum_memory: str + Minimum amount of memory for the cluster + maximum_memory: str + Maximum amount of memory for the cluster + Examples + -------- + >>> cluster.adapt(minimum=0, maximum=10, interval='500ms') + >>> cluster.adapt(minimum_cores=24, maximum_cores=96) + >>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB') + """ + with ignoring(AttributeError): + self._adaptive.stop() + if not hasattr(self, '_adaptive_options'): + self._adaptive_options = {} + if 'minimum' not in kwargs: + if minimum_cores is not None: + kwargs['minimum'] = self._get_nb_workers_from_cores(minimum_cores) + elif minimum_memory is not None: + kwargs['minimum'] = self._get_nb_workers_from_memory(minimum_memory) + if 'maximum' not in kwargs: + if maximum_cores is not None: + kwargs['maximum'] = self._get_nb_workers_from_cores(maximum_cores) + elif maximum_memory is not None: + kwargs['maximum'] = self._get_nb_workers_from_memory(maximum_memory) + self._adaptive_options.update(kwargs) + self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options) + return self._adaptive + @gen.coroutine - def _scale(self, n): + def _scale(self, n=None, cores=None, memory=None): """ Asynchronously called scale method This allows to do every operation with a coherent ocntext """ with log_errors(): + if [n, cores, memory].count(None) != 2: + raise ValueError('One and only one of n, cores, memory kwargs' + ' should be used, n={}, cores={}, memory={}' + ' provided.'.format(n, cores, memory)) + if n is None: + if cores is not None: + n = self._get_nb_workers_from_cores(cores) + elif memory is not None: + n = self._get_nb_workers_from_memory(memory) + # here we rely on a ClusterManager attribute to retrieve the # active and pending workers if n == self._target_scale: @@ -99,29 +161,152 @@ def _scale(self, n): self.scale_down(to_close, n) self._target_scale = n - def scale(self, n): - """ Scale cluster to n workers - + def scale(self, n=None, cores=None, memory=None): + """ Scale cluster to n workers or to the given number of cores or + memory + number of cores and memory are converted into number of workers using + worker_info attribute. Parameters ---------- n: int Target number of workers - + cores: int + Target number of cores + memory: str + Target amount of available memory Example ------- >>> cluster.scale(10) # scale cluster to ten workers - + >>> cluster.scale(cores=100) # scale cluster to 100 cores + >>> cluster.scale(memory='1 TB') # scale cluster to 1 TB memory See Also -------- Cluster.scale_up Cluster.scale_down + Cluster.worker_info """ # TODO we should not rely on scheduler loop here, self should have its # own loop - self.scheduler.loop.add_callback(self._scale, n) + self.scheduler.loop.add_callback(self._scale, n, cores, memory) + + def _widget(self): + """ Create IPython widget for display within a notebook """ + try: + return self._cached_widget + except AttributeError: + pass + + from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion, Text + + layout = Layout(width='150px') + + if 'bokeh' in self.scheduler.services: + link = self.dashboard_link + link = '

Dashboard: %s

\n' % (link, link) + else: + link = '' + + title = '

%s

' % type(self).__name__ + title = HTML(title) + dashboard = HTML(link) + + status = HTML(self._widget_status(), layout=Layout(min_width='150px')) + + request = IntText(0, description='Workers', layout=layout) + scale = Button(description='Scale', layout=layout) + request_cores = IntText(0, description='Cores', layout=layout) + scale_cores = Button(description='Scale', layout=layout) + request_memory = Text('O GB', description='Memory', layout=layout) + scale_memory = Button(description='Scale', layout=layout) + + minimum = IntText(0, description='Minimum', layout=layout) + maximum = IntText(0, description='Maximum', layout=layout) + adapt = Button(description='Adapt', layout=layout) + minimum_cores = IntText(0, description='Min cores', layout=layout) + maximum_cores = IntText(0, description='Max cores', layout=layout) + adapt_cores = Button(description='Adapt', layout=layout) + minimum_mem = Text('0 GB', description='Min memory', layout=layout) + maximum_mem = Text('0 GB', description='Max memory', layout=layout) + adapt_mem = Button(description='Adapt', layout=layout) + + scale_hbox = [HBox([request, scale])] + adapt_hbox = [HBox([minimum, maximum, adapt])] + if hasattr(self, 'worker_info'): + scale_hbox.append(HBox([request_cores, scale_cores])) + scale_hbox.append(HBox([request_memory, scale_memory])) + adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores])) + adapt_hbox.append(HBox([minimum_mem, maximum_mem, adapt_mem])) + + accordion = Accordion([VBox(scale_hbox), + VBox(adapt_hbox)], + layout=Layout(min_width='500px')) + accordion.selected_index = None + accordion.set_title(0, 'Manual Scaling') + accordion.set_title(1, 'Adaptive Scaling') + + box = VBox([title, + HBox([status, + accordion]), + dashboard]) + + self._cached_widget = box + + def adapt_cb(b): + self.adapt(minimum=minimum.value, maximum=maximum.value) + + def adapt_cores_cb(b): + self.adapt(minimum_cores=minimum_cores.value, maximum_cores=maximum_cores.value) + + def adapt_mem_cb(b): + self.adapt(minimum_memory=minimum_mem.value, maximum_memory=maximum_mem.value) + + adapt.on_click(adapt_cb) + adapt_cores.on_click(adapt_cores_cb) + adapt_mem.on_click(adapt_mem_cb) + + def scale_cb(request, kwarg): + def request_cb(b): + with log_errors(): + arg = request.value + with ignoring(AttributeError): + self._adaptive.stop() + local_kwargs = dict() + local_kwargs[kwarg] = arg + self.scale(**local_kwargs) + + return request_cb + + scale.on_click(scale_cb(request, 'n')) + scale_cores.on_click(scale_cb(request_cores, 'cores')) + scale_memory.on_click(scale_cb(request_memory, 'memory')) + + scheduler_ref = ref(self.scheduler) + + def update(): + status.value = self._widget_status() + + pc = PeriodicCallback(update, 500, io_loop=self.scheduler.loop) + self.scheduler.periodic_callbacks['cluster-repr'] = pc + pc.start() + + return box def worker_key(self, worker_state): ''' Callable mapping a WorkerState object to a group, see Scheduler.workers_to_close ''' return worker_state + + def _get_nb_workers_from_cores(self, cores): + return math.ceil(cores / self.worker_spec['cores']) + + def _get_nb_workers_from_memory(self, memory): + return math.ceil(parse_bytes(memory) / parse_bytes(self.worker_spec['memory'])) + + @property + def worker_spec(self): + ''' single worker process info needed for scaling on cores or memory ''' + raise NotImplementedError('{} class does not provide worker_spec ' + 'attribute, needed for scaling with ' + 'cores or memory kwargs.' + .format(self.__class__.__name__)) diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 305e26f4..3d52242e 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -247,12 +247,11 @@ def __init__(self, # dask-worker command line build dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=python) command_args = [dask_worker_command, self.scheduler.address] - command_args += ['--nthreads', self.worker_threads] + command_args += ['--nthreads', self.worker_process_threads] if processes is not None and processes > 1: command_args += ['--nprocs', processes] - mem = format_bytes(self.worker_memory / self.worker_processes) - command_args += ['--memory-limit', mem.replace(' ', '')] + command_args += ['--memory-limit', self.worker_process_memory] command_args += ['--name', '%s--${JOB_ID}--' % name] if death_timeout is not None: @@ -298,9 +297,20 @@ def finished_jobs(self): return self._scheduler_plugin.finished_jobs @property - def worker_threads(self): + def worker_process_threads(self): return int(self.worker_cores / self.worker_processes) + @property + def worker_process_memory(self): + mem = format_bytes(self.worker_memory / self.worker_processes) + mem = mem.replace(' ', '') + return mem + + @property + def worker_spec(self): + ''' single worker process info needed for scaling on cores or memory ''' + return {'cores': self.worker_process_threads, 'memory': self.worker_process_memory} + def job_script(self): """ Construct a job submission script """ pieces = {'job_header': self.job_header, diff --git a/dask_jobqueue/tests/test_pbs.py b/dask_jobqueue/tests/test_pbs.py index 5ffb1c1d..5cb59b48 100644 --- a/dask_jobqueue/tests/test_pbs.py +++ b/dask_jobqueue/tests/test_pbs.py @@ -118,6 +118,38 @@ def test_basic(loop): assert not cluster.running_jobs +@pytest.mark.env("pbs") # noqa: F811 +def test_scale_cores_memory(loop): + with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', + job_extra=['-V'], loop=loop) as cluster: + with Client(cluster) as client: + + cluster.scale(cores=2) + + start = time() + while not(cluster.pending_jobs or cluster.running_jobs): + sleep(0.100) + assert time() < start + QUEUE_WAIT + + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + assert cluster.running_jobs + + workers = list(client.scheduler_info()['workers'].values()) + w = workers[0] + assert w['memory_limit'] == 2e9 + assert w['ncores'] == 2 + + cluster.scale(memory='0GB') + + start = time() + while cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert not cluster.running_jobs + + @pytest.mark.env("pbs") # noqa: F811 def test_basic_scale_edge_cases(loop): with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', @@ -187,6 +219,31 @@ def test_adaptive_grouped(loop): assert time() < start + QUEUE_WAIT +@pytest.mark.env("pbs") # noqa: F811 +def test_adaptive_cores_mem(loop): + with PBSCluster(walltime='00:02:00', processes=1, cores=2, memory='2GB', local_directory='/tmp', + job_extra=['-V'], loop=loop) as cluster: + cluster.adapt(minimum_cores=0, maximum_memory='4GB') + with Client(cluster) as client: + future = client.submit(lambda x: x + 1, 10) + assert future.result(QUEUE_WAIT) == 11 + + start = time() + processes = cluster.worker_processes + while len(client.scheduler_info()['workers']) != processes: + sleep(0.1) + assert time() < start + QUEUE_WAIT + + del future + + start = time() + while cluster.pending_jobs or cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + + assert cluster.finished_jobs + + def test_config(loop): # noqa: F811 with dask.config.set({'jobqueue.pbs.walltime': '00:02:00', 'jobqueue.pbs.local-directory': '/foo'}): From 455d26e399f9f2e387414120583e5f767258f7a4 Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 12:12:20 +0100 Subject: [PATCH 6/7] fix problem with worker_spec attribute name --- dask_jobqueue/cluster_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index fe76d206..3fc38934 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -39,9 +39,9 @@ def scale_down(self, workers: List[str], n: int): ''' Callable mapping a WorkerState object to a group, see Scheduler.workers_to_close ''' - 4. worker_info dict attribute if scale(cores=...) or scale(memory=...) + 4. worker_spec dict attribute if scale(cores=...) or scale(memory=...) can be used by users. - worker_info = {'cores': 4, 'memory': '16 GB'} + worker_spec = {'cores': 4, 'memory': '16 GB'} This will provide a general ``scale`` method as well as an IPython widget for display. @@ -87,7 +87,7 @@ def adapt(self, minimum_cores=None, maximum_cores=None, """ Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Instead of minimum and maximum parameters which apply to the number of - worker, If Cluster object implements worker_info attribute, one can use + worker, If Cluster object implements worker_spec attribute, one can use the following parameters: Parameters ---------- @@ -165,7 +165,7 @@ def scale(self, n=None, cores=None, memory=None): """ Scale cluster to n workers or to the given number of cores or memory number of cores and memory are converted into number of workers using - worker_info attribute. + worker_spec attribute. Parameters ---------- n: int @@ -183,7 +183,7 @@ def scale(self, n=None, cores=None, memory=None): -------- Cluster.scale_up Cluster.scale_down - Cluster.worker_info + Cluster.worker_spec """ # TODO we should not rely on scheduler loop here, self should have its # own loop @@ -231,7 +231,7 @@ def _widget(self): scale_hbox = [HBox([request, scale])] adapt_hbox = [HBox([minimum, maximum, adapt])] - if hasattr(self, 'worker_info'): + if hasattr(self, 'worker_spec'): scale_hbox.append(HBox([request_cores, scale_cores])) scale_hbox.append(HBox([request_memory, scale_memory])) adapt_hbox.append(HBox([minimum_cores, maximum_cores, adapt_cores])) From d83900999c6368a34134dc37e2f054cb5300e33a Mon Sep 17 00:00:00 2001 From: Guillaume EB Date: Tue, 30 Oct 2018 12:37:58 +0100 Subject: [PATCH 7/7] flake and others --- dask_jobqueue/cluster_manager.py | 3 --- dask_jobqueue/core.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/dask_jobqueue/cluster_manager.py b/dask_jobqueue/cluster_manager.py index 3fc38934..e597c77e 100644 --- a/dask_jobqueue/cluster_manager.py +++ b/dask_jobqueue/cluster_manager.py @@ -2,7 +2,6 @@ import math from tornado import gen -from weakref import ref from distributed.deploy.adaptive import Adaptive from distributed.deploy import Cluster @@ -280,8 +279,6 @@ def request_cb(b): scale_cores.on_click(scale_cb(request_cores, 'cores')) scale_memory.on_click(scale_cb(request_memory, 'memory')) - scheduler_ref = ref(self.scheduler) - def update(): status.value = self._widget_status() diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 3d52242e..fa557147 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -270,7 +270,7 @@ def __init__(self, def __repr__(self): running_workers = self._count_active_workers() - running_cores = running_workers * self.worker_threads + running_cores = running_workers * self.worker_process_threads total_jobs = len(self.pending_jobs) + len(self.running_jobs) total_workers = total_jobs * self.worker_processes running_memory = running_workers * self.worker_memory / self.worker_processes