Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive: recommend close workers when any are idle #2330

Merged
merged 10 commits into from Apr 24, 2019

Conversation

delgadom
Copy link
Contributor

@delgadom delgadom commented Nov 1, 2018

Adding this as a placeholder fix for #2329. No tests added nor doc changes yet.

@delgadom delgadom changed the title Recommend close workers adaptive Adaptive: close workers when any workers are idle Nov 1, 2018
@delgadom delgadom changed the title Adaptive: close workers when any workers are idle Adaptive: recommend close workers when any are idle Nov 1, 2018
@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

hmm. all these broken tests are a bit daunting for a newcomer. any recommended starting points?

@mrocklin
Copy link
Member

mrocklin commented Nov 1, 2018

I've created a trivial PR to see if they're due to something unrelated: #2331

num_workers = len(self.scheduler.workers)

if tasks_processing <= num_workers:
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can convince myself that the tasks_processing < num_workers check is redundant given the all(ws.processing ...) check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's definitely sufficient to prompt the correct behavior when the cluster has idle workers, but once those have been spun down, the current logic doesn't distinguish between a resource-constrained cluster with waiting tasks and one with ntasks == nworkers. This was confirmed by my experiments, where the cluster successfully scaled down to (ntasks) and then shot back up to adaptive.maximum a few heartbeats later, and continued oscilating between these two until all tasks completed.

I don't see a way to do this with all(ws.processing ...) but if there is then the performance benefit would be significant.

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

I think the errors are unrelated (I tried pytest and pytest distributed/deploy before making changes and faced a wall of errors). But I'll try digging in at some point - just couldn't figure it out right now

@mrocklin
Copy link
Member

mrocklin commented Nov 1, 2018

Sorry for the delay in responding here. I've been a bit saturated. I wonder if @guillaumeeb has any interest in engaging here.

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

no worries! I appreciate all the feedback @mrocklin. hope my pushback is constructive :) Dask is becoming increasingly central to everything I do - it's a really great project.

@guillaumeeb
Copy link
Member

Yes I have indeed some interest on this, I've been following the discussion, and the solution of @delgadom seems good.

I would try to look more closely today, but I fear we are missing some information in the Scheduler to do this more elegantly.

What would be nice would be to reproduce this in a small test, or a script easier to run.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given my still limited knowledge of Scheduler's internal, and what I know of adaptive, I think the proposed solution is good for the time being.

It seems to me that scheduler.total_occupancy doesn't contain enough information for estimating the need for CPUs. A more efficient solution could be to improve scheduler, but I'm not familiar enough with this class yet. Is there a way to get the list of tasks? Or a pendant to scheduler.total_occupancy, like scheduler.total_processing_tasks? Would it be feasible? If I understand correctly @delgadom already tried to browse all the tasks and this was not performant enough?

We should still try to add a test, even if we don't run it for every PR (I believe we can easily ignore some tests for CI?). It should be doable by slightly modifying the test proposed in the issue. Just by being more deterministic in the task duration. Launching say 7 tasks running for 20 seconds, and 3 tasks 30 seconds. then waiting 21 seconds, and testing recommendations and cluster state.

num_workers = len(self.scheduler.workers)

if tasks_processing <= num_workers:
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that all the task_processing count and checks would be better in needs_cpu(). The wrong part of code seems to be there: it just take into account the scheduler.total_occupancy, which seems to me like the sum of the time of each processing task. It should take into account the number of tasks remaining as pointed out by @delgadom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. if we move it into needs_cpu this resolves the problem. that's how I implemented it as a first cut, and I've been running this on a subclassed version of Adaptive on our cluster and it works beautifully. I spun up 10k 15minute jobs with an adaptive cluster with between 0, 1000 nodes and the cluster scaled up then scaled back down as workers completed.

If we go with an implementation where recommendations() suggests scaling down based on workers_to_close's recommendation only, I suppose this could involve a race condition where we scale down idle workers before the scheduler has time to transfer tasks to them. So maybe only placing this logic in needs_cpu is the right way to go?

I'll modify the PR to only include a change in needs_cpu. I'll try to come up with a less interactive example/test soon, but won't get to implementing real tests until next week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment was just about moving the specific part of tasks running vs available cores.

But if this part is needed and sufficient as it seems, then I'm very happy to go only with it!

Thanks @delgadom for working on this.

@delgadom
Copy link
Contributor Author

delgadom commented Nov 4, 2018

Just made an update that improves performance significantly for large numbers of workers by looping through workers and returning as soon as the cumulative number of tasks exceeds the number of workers. Running a job with 100 workers and 10,000 tasks I got a 10-25% performance hit with the latest update vs a 65-150% performance hit for the previous commit.

The following tests the current version on master (needs_cpu_0) vs the previous implementation (needs_cpu_1) vs the latest commit (needs_cpu_2):

profile

I'm swamped through tuesday but if should be able to spend some time implementing tests and a good working example later next week. In the meantime, we're using this fix pretty heavily and will continue to tweak and test, but it works really well so far.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some details I did not see before.

else:
return False

num_workers = len(self.scheduler.workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you use total_cores for the number of tasks comparison? Do you have one thread per worker in your setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - I'm not sure I follow, but I am on a dask_kubernetes setup with 1:1 threads to workers, so maybe my use case isn't universal. I think I see, though... in many cases the number of dask processes/threads which can take on tasks is greater than the number of workers? Is total_cores the number of workers * (threads per worker or procs per worker)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on a dask_kubernetes setup with 1:1 threads to workers, so maybe my use case isn't universal
in many cases the number of dask processes/threads which can take on tasks is greater than the number of workers

Yes!

I think in most cases having several threads per worker is better, except for GIL boud tasks. Dask-kubernetes propose 2 threads by default, which is low but it depends on VMs you use:

https://github.com/dask/dask-kubernetes/blob/master/dask_kubernetes/kubernetes.yaml#L24-L26

With dask-jobqueue, I often use 4, 8 or even 24 cores per worker.

Is total_cores the number of workers * (threads per worker)?

Yes!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks! Just so I understand before modifying the code again... should I take this to mean that dask uses the term "core" to refer specifically to the number of tasks that can be handled simultaneously by a worker, regardless of the hardware? Or are you doing something clever with the relationship between physical cores and CPU needs that I'm not following? Thanks again for the helping hand in this :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I take this to mean that dask uses the term "core" to refer specifically to the number of tasks that can be handled simultaneously by a worker

Yes, see https://github.com/dask/distributed/blob/master/distributed/cli/dask_worker.py#L62-L63, https://github.com/dask/distributed/blob/master/distributed/cli/dask_worker.py#L217.

I like it better when it is called nthreads, like in dask-worker options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok this makes a lot of sense. so I should count the number of available threads or processes, not the number of workers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Worker = 1 process = several threads, on scheduler side. So just using the already computed total_cores should be fine.

if tasks_processing > num_workers:
logger.info(
"pending tasks exceed number of workers "
"[%d tasks / %d workers]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this message, or probably the one above, should be changed.

@mrocklin
Copy link
Member

Checking in. What is the status of this?

@guillaumeeb
Copy link
Member

Current PR only compares running tasks to the number of worker processes, not the real number of available cores. So we're waiting for an update by @delgadom who is probably busy elsewhere as many of us 🙂 !

@guillaumeeb
Copy link
Member

@delgadom, do you need help going forward here? If you're too busy with other things, maybe I can try to advance this PR?

@delgadom
Copy link
Contributor Author

delgadom commented Dec 7, 2018

Sorry for the delay, both. Last couple months did indeed get very busy. Will try to spend some time wrapping this up this weekend.

@delgadom
Copy link
Contributor Author

delgadom commented Dec 7, 2018

@guillaumeeb just made (I think) the change that you suggested. I'm not totally sure how to test this though, and I do think it needs a pretty thorough set of tests. Do you have a test that you think would be a good model to start with? I'll probably need help getting this across the finish line on the testing front.

@delgadom
Copy link
Contributor Author

delgadom commented Dec 8, 2018

Hmmm not sure what happened in this test

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
c = <Client: scheduler='tcp://127.0.0.1:35398' processes=2 cores=3>
s = <Scheduler: "tcp://127.0.0.1:35398" processes: 2 cores: 3>
a = <Worker: tcp://127.0.0.1:40977, running, stored: 10, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: tcp://127.0.0.1:45859, running, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
    @pytest.mark.skipif(sys.version_info[0] < 3, reason="intermittent failure")
    @gen_cluster(client=True)
    def test_dont_hold_on_to_large_messages(c, s, a, b):
        np = pytest.importorskip('numpy')
        da = pytest.importorskip('dask.array')
        x = np.random.random(1000000)
        xr = weakref.ref(x)
    
        d = da.from_array(x, chunks=(100000,))
        d = d.persist()
        del x
    
        start = time()
        while xr() is not None:
            if time() > start + 5:
                # Help diagnosing
                from types import FrameType
                x = xr()
                if x is not None:
                    del x
                    rc = sys.getrefcount(xr())
                    refs = gc.get_referrers(xr())
                    print("refs to x:", rc, refs, gc.isenabled())
                    frames = [r for r in refs if isinstance(r, FrameType)]
                    for i, f in enumerate(frames):
                        print("frames #%d:" % i,
                              f.f_code.co_name, f.f_code.co_filename, sorted(f.f_locals))
>               pytest.fail("array should have been destroyed")
E               Failed: array should have been destroyed
distributed/tests/test_batched.py:270: Failed

@guillaumeeb
Copy link
Member

For the test, it looks like there's already something just waiting for you!

@pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks")
@gen_test(timeout=30)
def test_no_more_workers_than_tasks():
loop = IOLoop.current()
cluster = yield LocalCluster(0, scheduler_port=0, silence_logs=False,
processes=False, diagnostics_port=None,
loop=loop, asynchronous=True)
yield cluster._start()
try:
adapt = Adaptive(cluster.scheduler, cluster, minimum=0, maximum=4,
interval='10 ms')
client = yield Client(cluster, asynchronous=True, loop=loop)
cluster.scheduler.task_duration['slowinc'] = 1000
yield client.submit(slowinc, 1, delay=0.100)
assert len(cluster.scheduler.workers) <= 1
finally:
yield client._close()
yield cluster._close()
.

Not sure it will work out of the box, but it is a good start. It point to everything you need: slowinc, how to specify default function duration directly in scheduler, how to launch a LocalCluster...

You can also add a more complex one, something like:

  1. Start Cluster
  2. Make it adaptive, with low interval, low starting cost, 10 processes max
  3. Make slowinc default to something relativly long, maybe 0.5 seconds, in scheduler
  4. Submit 5 slowinc of 0.5 seconds, and 5 slowinc lasting 1 second.
  5. wait for 0.1 seconds, check that 10 processes are started in LocalCluster
  6. wait for 0.5 seconds, check that there are only 5 processes left
  7. loop until the end of tasks, wating 0.1 seconds, and always checking there is only 5 processes running in the LocalCluster.

@guillaumeeb
Copy link
Member

Hmmm not sure what happened in this test

No trace of Adaptive here, so probably not linked to your PR.

@mrocklin
Copy link
Member

mrocklin commented Dec 9, 2018

That failure is indeed unrelated. Maybe (?) fixed here: #2404

@guillaumeeb
Copy link
Member

@delgadom still up for this one?

@SimonBoothroyd
Copy link

The changes proposed in this PR seem to fix an issue I was seeing when using the dask-jobqueue library, whereby the queue clusters in adaptive mode were continuously requesting new workers even if they did not have any tasks for them to perform.

It would be great to see this fix merged in! Are there any remaining issues holding this back?

@martindurant
Copy link
Member

Looks like this PR will pass after linting - will recommend merging. @guillaumeeb , were you thinking of contributing a test along the lines you described?

@mrocklin
Copy link
Member

@guillaumeeb , were you thinking of contributing a test along the lines you described?

The test that @guillaumeeb is pointing to already exists in the code. You just need to remove the xfail.

@guillaumeeb
Copy link
Member

Thanks @martindurant, sorry, did not find the time to take over this one...

The simple test already there is probably enough indeed.

@martindurant
Copy link
Member

OK, good to go - thank you.

@martindurant
Copy link
Member

Will merge end of day, if no further comments.

@martindurant martindurant merged commit 0c8918b into dask:master Apr 24, 2019
@delgadom
Copy link
Contributor Author

delgadom commented Apr 25, 2019

Just saw this last flurry of activity. Thank you so much for pushing this across the finish line @martindurant and @guillaumeeb

muammar added a commit to muammar/distributed that referenced this pull request May 8, 2019
* upstream/master:
  Fix deserialization of bytes chunks larger than 64MB (dask#2637)
  bump version to 1.27.1
  Updated logging module doc links from docs.python.org/2 to docs.python.org/3. (dask#2635)
  Adaptive: recommend close workers when any are idle (dask#2330)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants