Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive: recommend close workers when any are idle #2330

Merged
merged 10 commits into from Apr 24, 2019
23 changes: 19 additions & 4 deletions distributed/deploy/adaptive.py
Expand Up @@ -123,17 +123,32 @@ def needs_cpu(self):
Notes
-----
Returns ``True`` if the occupancy per core is some factor larger
than ``startup_cost``.
than ``startup_cost`` and the number of tasks exceeds the number of
workers
"""
total_occupancy = self.scheduler.total_occupancy
total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()])

if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2:
logger.info("CPU limit exceeded [%d occupancy / %d cores]",
total_occupancy, total_cores)
return True
else:
return False

num_workers = len(self.scheduler.workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you use total_cores for the number of tasks comparison? Do you have one thread per worker in your setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - I'm not sure I follow, but I am on a dask_kubernetes setup with 1:1 threads to workers, so maybe my use case isn't universal. I think I see, though... in many cases the number of dask processes/threads which can take on tasks is greater than the number of workers? Is total_cores the number of workers * (threads per worker or procs per worker)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on a dask_kubernetes setup with 1:1 threads to workers, so maybe my use case isn't universal
in many cases the number of dask processes/threads which can take on tasks is greater than the number of workers

Yes!

I think in most cases having several threads per worker is better, except for GIL boud tasks. Dask-kubernetes propose 2 threads by default, which is low but it depends on VMs you use:

https://github.com/dask/dask-kubernetes/blob/master/dask_kubernetes/kubernetes.yaml#L24-L26

With dask-jobqueue, I often use 4, 8 or even 24 cores per worker.

Is total_cores the number of workers * (threads per worker)?

Yes!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks! Just so I understand before modifying the code again... should I take this to mean that dask uses the term "core" to refer specifically to the number of tasks that can be handled simultaneously by a worker, regardless of the hardware? Or are you doing something clever with the relationship between physical cores and CPU needs that I'm not following? Thanks again for the helping hand in this :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I take this to mean that dask uses the term "core" to refer specifically to the number of tasks that can be handled simultaneously by a worker

Yes, see https://github.com/dask/distributed/blob/master/distributed/cli/dask_worker.py#L62-L63, https://github.com/dask/distributed/blob/master/distributed/cli/dask_worker.py#L217.

I like it better when it is called nthreads, like in dask-worker options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok this makes a lot of sense. so I should count the number of available threads or processes, not the number of workers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Worker = 1 process = several threads, on scheduler side. So just using the already computed total_cores should be fine.


tasks_processing = 0

for w in self.scheduler.workers.values():
tasks_processing += len(w.processing)

if tasks_processing > num_workers:
logger.info(
"pending tasks exceed number of workers "
"[%d tasks / %d workers]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this message, or probably the one above, should be changed.

tasks_processing, num_workers)

return True

return False

def needs_memory(self):
"""
Expand Down