Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive.needs_cpu does not depend on number of tasks remaining #2329

Closed
delgadom opened this issue Oct 31, 2018 · 10 comments
Closed

Adaptive.needs_cpu does not depend on number of tasks remaining #2329

delgadom opened this issue Oct 31, 2018 · 10 comments

Comments

@delgadom
Copy link
Contributor

Issue description

We're using distributed (with KubeCluster) with client.map to schedule a lot of long-running tasks (right now we're running a Fortran-based hydrological model).

We noticed that clusters don't scale down when the number of tasks remaining falls below the number of workers until all tasks have completed.

I isolated the problem to Adaptive.needs_cpu(). The current method does not check whether there are any pending tasks on the scheduler:

    def needs_cpu(self):
        """
        Check if the cluster is CPU constrained (too many tasks per core)
        Notes
        -----
        Returns ``True`` if the occupancy per core is some factor larger
        than ``startup_cost``.
        """
        total_occupancy = self.scheduler.total_occupancy
        total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()])

        if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2:
            logger.info("CPU limit exceeded [%d occupancy / %d cores]",
                        total_occupancy, total_cores)
            return True
        else:
            return False

This results in adapt.recommendations() returning the error message Trying to scale up and down simultaneously whenever there are fewer pending tasks than there are workers, as long as the average task time suggests that more cores are needed (independent of the number of pending tasks).

Proposed solution

I implemented a quick fix, by finding the total number of pending tasks and only recommending a "scale up" if the number of tasks exceeds the number of existing workers, in addition to the current criteria:

    def needs_cpu(self):
        """
        Check if the cluster is CPU constrained (too many tasks per core)
        Notes
        -----
        Returns ``True`` if the occupancy per core is some factor larger
        than ``startup_cost``.
        """
        total_occupancy = self.scheduler.total_occupancy
        total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()])

        if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2:
            logger.info("CPU limit exceeded [%d occupancy / %d cores]",
                        total_occupancy, total_cores)

            tasks_processing = sum((len(w.processing) for w in self.scheduler.workers.values()))
            num_workers = len(self.scheduler.workers)

            if tasks_processing > num_workers:
                logger.info("pending tasks exceed number of workers [%d tasks / %d workers]",
                            tasks_processing, num_workers)
                return True

        return False

Pros

  • Exhibits the desired behavior (we're using this fix now by subclassing KubeCluster)

Cons

  • May be a limited use case
  • Increases overhead of needs_cpu. I tested this out on limited cases with between 800 - 100,000 tasks and found the current implementation usually takes ~ 30-40 µs, and the proposed implementation roughly doubles this. There may be faster ways of doing this, but I imagine this may be a critical problem with this implementation, so help would be appreciated in estimating tasks remaining more quickly!

Testable example

Requires some interactivity, but reliably re-produces the problem

In [1]: import dask.distributed as dd

In [2]: cluster = dd.LocalCluster()

In [3]: adaptive = cluster.adapt(minimum=0, maximum=10)

In [5]: adaptive
Out[5]: <distributed.deploy.adaptive.Adaptive at 0x1153b3668>

In [6]: def wait_a_while(i):
   ...:     import time
   ...:     import random
   ...:     s = (random.random()) ** 6 * 60
   ...:     time.sleep(s)
   ...:
   ...:     return s

In [8]: client = dd.Client(cluster)

In [9]: f = client.map(wait_a_while, range(10))

In [10]: # wait for most futures to finish

In [17]: f
Out[17]:
[<Future: status: finished, type: float, key: wait_a_while-fdc644303e9be2c85edd9201261409af>,
 <Future: status: finished, type: float, key: wait_a_while-97098da3920c7582be062b54ee78efe1>,
 <Future: status: finished, type: float, key: wait_a_while-630e0e1fb8a0f8ede1140368de97ffce>,
 <Future: status: pending, key: wait_a_while-09f09368b6e9555668ab3f82efad91dd>,
 <Future: status: finished, type: float, key: wait_a_while-65d1c81d072269ab477d806d017302e2>,
 <Future: status: finished, type: float, key: wait_a_while-ca96a3b8db585962fc8638066458a815>,
 <Future: status: finished, type: float, key: wait_a_while-0a13c1a4f503a08e1edaf79dba3c94c5>,
 <Future: status: finished, type: float, key: wait_a_while-549f788086c75f350390b4a6131ae6cb>,
 <Future: status: pending, key: wait_a_while-17133623fc213adcb83f3b45e53839c9>,
 <Future: status: pending, key: wait_a_while-41e284f91b0a2bb1c3a33394e51c97fc>]

In [18]: cluster._adaptive.recommendations()
Out[18]: {'status': 'error', 'msg': 'Trying to scale up and down simultaneously'}
@mrocklin
Copy link
Member

Thanks for the excellent issue @delgadom . Your analysis seems spot-on to me.

Rather than iterate through tasks, it might be enough to either:

  1. sum the number or processing tasks on each worker (look at self.scheduler.workers.values() and ws.processing (or something like that))
  2. Check to see if we have any idle workers right now. Perhaps if we have any idle workers then we just choose not to scale up.

@mrocklin
Copy link
Member

I have a slight preference for the second choice I think

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

Thanks @mrocklin. The first option is actually the one I implemented - I first tried iterating through all tasks and it took about 10-100x as long!

The second option definitely sounds reasonable - I didn't try this because I was nervous about restructuring the recommendations() error reporting, but I'm game with your blessing.

The part where I think the change should be made is the start of recommendations:

    def recommendations(self, comm=None):
        should_scale_up = self.should_scale_up()
        workers = set(self.workers_to_close(key=self.worker_key,
                                            minimum=self.minimum))
        if should_scale_up and workers:
            logger.info("Attempting to scale up and scale down simultaneously.")
            self.close_counts.clear()
            return {'status': 'error',
                    'msg': 'Trying to scale up and down simultaneously'}
    ...

This in turn calls should_scale_up and workers_to_close, and if both should_scale_up is True and workers_to_close has len > 0, an error is reported (which I see when the cluster is attempting to scale down).

Adaptive.workers_to_close is essentially a wrapper on sheduler.workers_to_close (it also enforces the adapt maximum and minimum). But it will not recommend a worker be closed if there are tasks processing on it.

Could we postpone the should_scale_up logic in the event that there are workers that should be closed? something like:

    def recommendations(self, comm=None):
        workers_to_close = set(self.workers_to_close(key=self.worker_key,
                                            minimum=self.minimum))
        if workers_to_close:
            d = {}
            to_close = []
            for w, c in self.close_counts.items():
                if w in workers_to_close:
                    if c >= self.wait_count:
                        to_close.append(w)
                    else:
                        d[w] = c

            for w in workers_to_close:
                d[w] = d.get(w, 0) + 1

            self.close_counts = d

            if to_close:
                return {'status': 'down', 'workers': to_close}

        elif self.should_scale_up():
            self.close_counts.clear()
            return toolz.merge({'status': 'up'}, self.get_scale_up_kwargs())

        else:
            self.close_counts.clear()
            return None

I may be missing an edge case here, and if so, I'm happy to modify needs_cpu, but the way I'm reading this I think workers_to_close should be a sufficient sign that workers should be closed.

If this looks good to you I'll look into how this is tested.

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

Oh this will work on the first iteration, but needs_cpu still needs to be modified. Something like:

    def needs_cpu(self):
        """
        Check if the cluster is CPU constrained (too many tasks per core)

        Notes
        -----
        Returns ``False`` if any workers are idle. Otherwise, returns ``True``
        if the occupancy per core is some factor larger than ``startup_cost``.
        """
        if not all(ws.processing for ws in self.scheduler.workers.values()):
            return False

        total_occupancy = self.scheduler.total_occupancy
        total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()])
        ...

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

actually this may make more sense in should_scale_up:

    def should_scale_up(self):
        """
        Determine whether additional workers should be added to the cluster

        Returns
        -------
        scale_up : bool

        Notes
        ----
        Additional workers are added whenever

        1. There are fewer workers than our minimum
        2. There are unrunnable tasks and no workers
        3. There are no idle tasks, and
            a. The cluster is CPU constrained, or
            b. The cluster is RAM constrained

        See Also
        --------
        needs_cpu
        needs_memory
        """
        with log_errors():
            if len(self.scheduler.workers) < self.minimum:
                return True

            if self.maximum is not None and len(self.scheduler.workers) >= self.maximum:
                return False

            if self.scheduler.unrunnable and not self.scheduler.workers:
                return True

            if not all(ws.processing for ws in self.scheduler.workers.values()):
                return False

            needs_cpu = self.needs_cpu()
            needs_memory = self.needs_memory()

            if needs_cpu or needs_memory:
                return True

            return False

@delgadom
Copy link
Contributor Author

delgadom commented Nov 1, 2018

Hmm actually it seems tough to get around the need to check the number of tasks. Should have thought of this originally, but once the idle workers have been scaled down, should_scale_up reverts to its previous behavior and checks to see if there's a need for cpu and memory, then scales back up. To avoid leaving this messy fingerprint in too many places, but still with an eye to the worse performance of checking the tasks on each worker, I think it makes sense to add your option 1 to should_scale_up as another case:

    def should_scale_up(self):
        """
        Determine whether additional workers should be added to the cluster

        Returns
        -------
        scale_up : bool

        Notes
        ----
        Additional workers are added whenever

        1. There are fewer workers than our minimum
        2. There are unrunnable tasks and no workers
        3. There are no idle workers and the number of pending tasks exceeds
            the number of workers, and
            a. The cluster is CPU constrained, or
            b. The cluster is RAM constrained

        See Also
        --------
        needs_cpu
        needs_memory
        """
        with log_errors():
            if len(self.scheduler.workers) < self.minimum:
                return True

            if self.maximum is not None and len(self.scheduler.workers) >= self.maximum:
                return False

            if self.scheduler.unrunnable and not self.scheduler.workers:
                return True

            if not all(ws.processing for ws in self.scheduler.workers.values()):
                return False

            tasks_processing = sum((len(w.processing) for w in self.scheduler.workers.values()))
            num_workers = len(self.scheduler.workers)

            if tasks_processing <= num_workers:
                return False

            needs_cpu = self.needs_cpu()
            needs_memory = self.needs_memory()

            if needs_cpu or needs_memory:
                return True

            return False

@jsignell
Copy link
Member

Is this issue resolved now that scheduler has absorbed the logic in Scheduler.adaptive_target?

@mrocklin
Copy link
Member

I personally don't know. If someone wants to look though I would recommend starting here:

def adaptive_target(self, comm=None, target_duration="5s"):
""" Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a
number of desired workers. This is often used by adaptive scheduling.
Parameters
----------
target_duration: str
A desired duration of time for computations to take. This affects
how rapidly the scheduler will ask to scale.
See Also
--------
distributed.deploy.Adaptive
"""
target_duration = parse_timedelta(target_duration)
# CPU
cpu = math.ceil(
self.total_occupancy / target_duration
) # TODO: threads per worker
# Avoid a few long tasks from asking for many cores
tasks_processing = 0
for ws in self.workers.values():
tasks_processing += len(ws.processing)
if tasks_processing > cpu:
break
else:
cpu = min(tasks_processing, cpu)
if self.unrunnable and not self.workers:
cpu = max(1, cpu)
# Memory
limit_bytes = {addr: ws.memory_limit for addr, ws in self.workers.items()}
worker_bytes = [ws.nbytes for ws in self.workers.values()]
limit = sum(limit_bytes.values())
total = sum(worker_bytes)
if total > 0.6 * limit:
memory = 2 * len(self.workers)
else:
memory = 0
target = max(memory, cpu)
if target >= len(self.workers):
return target
else: # Scale down?
to_close = self.workers_to_close()
return len(self.workers) - len(to_close)

@guillaumeeb
Copy link
Member

This one must be fixed by #2330!

@jrbourbeau
Copy link
Member

Indeed, thank you for following up here @guillaumeeb!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants