Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Task Annotations within the graph #3783

Open
sjperkins opened this issue Jul 19, 2018 · 30 comments
Open

Support Task Annotations within the graph #3783

sjperkins opened this issue Jul 19, 2018 · 30 comments

Comments

@sjperkins
Copy link
Member

I think this issue has some relation to dask/distributed#2127, where @kkraus14 wants to run certain tasks on CPU/GPU workers. I've also wanted to run tasks on specific workers, or require resources to be exclusive for certain tasks.

Currently, these task dependencies must be specified as additional arguments to compute/persist etc. rather than at the point of actual construction -- embedding resource/worker dependencies in the graph is not currently possible.

To support this, how about adding a TaskAnnotation type? This can be a namedtuple, itself containing nested tuples representing key-value pairs. e.g.

annot = TaskAnnotation(an=(('resource', ('GPU': '1'), ('worker', 'alice')))

dask array graphs tend to have the following structure:

dsk = {
    (tsk_name, 0) : (fn, arg1, arg2, ..., argn),
    (tsk_name, 1) : (fn, arg1, arg2, ..., argn),
}

How about embedding annotations within value tuples?

dsk = {
    (tsk_name, 0) : (fn, arg1, arg2, ..., argn, annotation1),
    (tsk_name, 1) : (fn, arg1, arg2, ..., argn, annotation2),
}

If the scheduler discovers an annotation in the tuple, it could remove it from the argument list and attempt to satisfy the requested constraints. In the above example, annotations are placed at the end of the tuple, but the location could be arbitrary and multiple annotations are possible. Alternatively, it might be better to put them at the start.

I realise the above example is somewhat specific to dask arrays (I'm not too familiar with the dataframe and bag collections) so there may be issues I'm not seeing.

One problem I can immediately identify would be modifying existing graph construction functions to support the above annotations (atop/top support is probably the first place to look).

@jakirkham
Copy link
Member

What about decorators?

@sjperkins
Copy link
Member Author

sjperkins commented Jul 24, 2018

@jakirkham Decorators would work well. This is what I came up with off the top of my head:

from __future__ import print_function

from collections import namedtuple
from functools import wraps

from dask.core import flatten
import dask.array as da


TaskAnnotation = namedtuple("TaskAnnotation", ["annotations"])


def annotate(annotations):
    if isinstance(annotations, TaskAnnotation):
        annotations = (annotations,)
    elif not isinstance(annotations, (tuple, list)):
        annotations = (annotations,)

    for a in annotations:
        assert isinstance(a, TaskAnnotation), type(a)

    def inner(fn):
        # @wraps(fn)  # Fails to wrap a functools.partial
        def decorator(*args, **kwargs):
            array = fn(*args, **kwargs)
            graph = array.__dask_graph__()

            rewrite = {k: graph[k] + annotations for k
                       in flatten(array.__dask_keys__())}

            graph.update_with_key(rewrite, array.name)

            return array

        return decorator

    return inner


an = TaskAnnotation({"resources": {"GPU": 1}})
ones = annotate(an)(da.ones)
print(ones((10, 10), chunks=(5, 5)))

@sjperkins
Copy link
Member Author

sjperkins commented Jul 24, 2018

Another style I like from a user perspective is context managers (tensorflow style):

with dask.annotations(an1, an2):
    A = da.ones(...)
    B = da.arange(...)

This would involve changing the Array constructor to defer to graph annotation contexts (which is probably more complex from a codebase POV)

@jakirkham
Copy link
Member

Have you looked at array_plugins?

@sjperkins
Copy link
Member Author

@jakirkham Ah that looks ideal.

I haven't had time to collect my thoughts in detail, but my primary interest is using Task Annotations provide hints or directives to schedulers:

Examples include:

  • Embedding a size hint for a dask array chunk. {"size": 1048576}
  • Directives to instruct the scheduler where to run the task
    • Worker A. {"workers" : ["A", 192.168.1.10"]}
    • I/O or Compute Worker Pools. {"worker_pool": ["io", "compute"}
    • Scatter the key over the entire cluster. {"scatter": "all"} or {"scatter": ["A", "192.168.1.10", "compute"]}
  • Resource requirements. {"resources": {"GPU":2}

To reiterate, I think the power and usefulness of Annotations/Directives would be decoupling directives from the current functions which submit the graph to the scheduler (map/submit/persist etc.).

Additionally, this could introduce more flexibility into the current system of Scheduler Plugins. For e.g. the scheduler could request a valid worker list from the plugin for a task, given the associated Annotations/Directives.

This might be a fairly major change from the perspective of the distributed scheduler, but I think it would be a powerful + useful change.

@jakirkham
Copy link
Member

Very cool. Thanks for sharing. Agree this would be very useful.

Guessing you have already read the resources docs.

@sjperkins
Copy link
Member Author

Guessing you have already read the resources docs.

Thanks, yes I have.

@mrocklin
Copy link
Member

Thanks for putting something together @sjperkins . It's useful to have this to drive conversation. I'm curious to hear what others have to say.

My inital thoughts are as follows:

  1. How does this affect micro-performance of scheduling and optimization?
  2. Are there other things like this that we would want to add on eventually, if so how do we future proof this and avoid glomming on features?
  3. Would it make sense, as @shoyer has suggested earlier, of having a proper Task class, which would probably elliminate the need for this class (and would be more future-proofed) but would probably affect micro-performance?
  4. Are there situations that this will make more complex in the future? What is the maintenance burden of a feature like this? Who is likely to maintian complications around it going forward?

Anyway, just initial thoughts. Again, I look forward to seeing what others have to say.

@shoyer
Copy link
Member

shoyer commented Aug 13, 2018

I think a optional Task class (#2299) would be the way to go here.

Another use case that comes to mind is keeping track of the provenance of a task, to enable better error tracebacks that point back directly to the source of the problem.

@sjperkins
Copy link
Member Author

sjperkins commented Aug 13, 2018

How does this affect micro-performance of scheduling and optimization?

The current implementation of allowing arbitrary annotations with a task tuple is sub-optimal (split_task_annotations likely slows things down), but this could probably be much improved by only allowing a single annotation in the last tuple position by convention, and introducing an extra type test at this position.

Would it make sense, as @shoyer has suggested earlier, of having a proper Task class, which would probably elliminate the need for this class (and would be more future-proofed) but would probably affect micro-performance?

See example implementation based on #2299 below.

from __future__ import print_function


class Task(tuple):
    __slots__ = ()

    _index_names = ("function", "args", "kwargs", "annotations")

    def __new__(cls, fn, *args, **kwargs):
        annots = kwargs.pop("annotations", None)
        return tuple.__new__(Task, (fn, args, kwargs, annots))

    @property
    def function(self):
        return self[0]

    @property
    def args(self):
        return self[1]

    @property
    def kwargs(self):
        return self[2]

    @property
    def annotations(self):
        return self[3]

    def __repr__(self):
        details = ", ".join("%s=%s" % (n, repr(self[i]))
                            for i, n in enumerate(self._index_names)
                            if self[i])
        return 'Task({})'.format(details)


def f(x):
    return x + 1

print(Task(f, 4, bob='foo', annotations={"resource": "GPU"}))
Task(function=<function f at 0x7f5e2054acf8>, args=(4,), kwargs={'bob': 'foo'}, annotations={'resource': 'GPU'})
print(Task(f, 4, "qux", annotations={"resource": "GPU"}))
Task(function=<function f at 0x7f5e2054acf8>, args=(4, 'qux'), annotations={'resource': 'GPU'})

The following timings on python 2.7.12 suggest ~4.5X slower for task construction and ~3.5X slower for pickling.

# construct 1000 tuples
%timeit [(f, i, {"foo":"bar"}, {"resource":"GPU"}) for i in xrange(1000)]
10000 loops, best of 3: 168 µs per loop

# construct 1000 Tasks
%timeit [Task(f, i, foo="bar", annotation={"resource":"GPU"}) for i in xrange(1000)]
1000 loops, best of 3: 718 µs per loop

from cPickle import dumps

# Serialise 1000 tuples
%timeit dumps([(f, i, {"foo":"bar"}, {"resource":"GPU"}) for i in xrange(1000)])
100 loops, best of 3: 2.24 ms per loop

# Serialise 1000 Tasks
%timeit dumps([Task(f, i, foo="bar", annotation={"resource":"GPU"}) for i in xrange(1000)])
100 loops, best of 3: 8.03 ms per loop

Are there other things like this that we would want to add on eventually, if so how do we future proof this and avoid glomming on features?

I think the Task object provides the way forward. The Annotation specification (it's a dict) is deliberately open-ended, allowing arbitrary task metadata. In fact, perhaps metadata is a better name than annotation.

Task(f, 1, foo="bar", __task_metadata__={"resources": "GPU"})

If some requirement arose that was substantially different enough to not be expressed via metadata, an additional tuple position could be added. For e.g. @shoyer 's concept of provenance/origination (which should actually go in metadata) could be expressed at an additional position.

What is the maintenance burden of a feature like this?

I think this will lessen future maintenance burden. Currently, most user scheduling logic is expressed via kwargs in update_graph:
https://github.com/dask/distributed/blob/b16ee25506fd20ec5daa7d77e338e2725e778562/distributed/scheduler.py#L1312-L1316

This means that this signature (and the function itself) changes with each new addition of scheduling functionality. I see there is a new actor kwarg for instance. Embedding this data in the graph will simplify the interface. While this will increase the graph size, it will correspondingly decrease the scheduling logic that must be pickled (per task in any case) during Client._graph_to_futures.

Are there situations that this will make more complex in the future?

Greater flexibility probably introduces complexity. Current complexity might involve deciding whether annotations/metadata supercede current logic expressed in submit/compute/persist. Ultimately I think that most of the functionality in Scheduler.update_graph should be expressed in separate plugins, but I'm not familiar enough with the code to decide whether this is realistic.

@jcrist
Copy link
Member

jcrist commented Aug 15, 2018

As an alternative to the adding annotations in the graph, I previously toyed with the idea of expanding the dask collections interface to allow collections to provide extra keyword arguments to the scheduler on compute/persist. This would allow collections to track and enforce things like worker or resource constraints without modifying any of the scheduler or graph internals. I envisioned things like workers= or resources= then being added to methods accepting user-defined-functions (e.g. map_blocks, map_partitions, etc...), and the collection would maintain these constraints internally, forwarding them to the scheduler on compute or persist.

Pros:

  • Simple, no modification to graph or scheduler internals
  • Generic, not just for constraint annotations

Cons:

  • Collections specific, doesn't work for people building graphs manually
  • Splits state up, may add extra book-keeping in the collections (not sure)

@sjperkins
Copy link
Member Author

I'm putting together a script to test out the impact of the (tuple, annotated_tuple, task) strategies.

I'm not reproducing the 10X slower to pickle mentioned by @shoyer in #2299.

The pickling sizes are somewhat larger with annotations (but compression reduces this by several factors).

Currently, the script produces the following results for 1000 tasks:

TASK CREATION TIMING
tuple 180 nanoseconds
annotated_tuple 462 nanoseconds
task 804 nanoseconds

PICKLING TASK TIMING
tuple 2.95 microseconds
annotated_tuple 5.85 microseconds
task 7.88 microseconds

PICKLING SIZES
tuple 54753 bytes
annotated_tuple 77828 bytes
task 77818 bytes

BZIP2 PICKLED SIZES
tuple 7807 bytes
annotated_tuple 9129 bytes
task 8909 bytes

I intend to expand it to measure the timing impact on the various schedulers, but will be working on this in an incremental fashion over the next while.

I also wanted to point out that in the context of #3514 that I don't think annotations are an all or nothing affair -- Annotating every single task in the graph would seems like overkill to me.

@sjperkins
Copy link
Member Author

Updated the script.

  • Creates a dask dictionary of {key: task} values (hence increased creation time + pickled sizes)
  • patches dask.local._execute_task with the 3 different strategies (current task tuple, annotated task tuple and new Task object). This doesn't use annotations in any form, merely strips them out.
  • Test task execution timings with the 3 strats.
TASK CREATION TIMING
tuple 333 nanoseconds
annotated_tuple 616 nanoseconds
task 862 nanoseconds

PICKLING TASK TIMING
tuple 3.45 microseconds
annotated_tuple 6.75 microseconds
task 8.51 microseconds

TASK EXECUTION TIMING
tuple 28.4 microseconds
annotated_tuple 28.7 microseconds
task 12.1 microseconds

PICKLING SIZES
tuple 84652 bytes
annotated_tuple 106728 bytes
task 107717 bytes

BZIP2 PICKLED SIZES
tuple 15055 bytes
annotated_tuple 18253 bytes
task 17919 bytes

The task execution timing is interesting because the new Task object does pretty well. Its twice as fast as standard and annotated tuples probably because the task is already in canonical (function, args, kwargs, annots) form and some object creation is avoided. So there's some tradeoff here between Task construction and execution. . There doesn't seem to be a noticeable difference between the standard and annotated tuple execution timings, which is great.

@shoyer
Copy link
Member

shoyer commented Aug 16, 2018

I would be interested to see how well a Cython extension type does. If there's ever a time to add an optional compile-time dependency, this would be it.

@sjperkins
Copy link
Member Author

The "twice as fast" Task timings objects were incorrect -- Task objects were ending up in the cache. Nevertheless, after correcting for this, Task objects are still slightly faster than tuples and annotated tuples.

I've pushed the test script up to #3869 so that the changes can be viewed/tracked.

TASK EXECUTION TIMING
tuple 29.4 microseconds
annotated_tuple 29.8 microseconds
task 25.3 microseconds

TASK CREATION TIMING
tuple 364 nanoseconds
annotated_tuple 677 nanoseconds
task 942 nanoseconds

PICKLING TASK TIMING
tuple 3.96 microseconds
annotated_tuple 7.73 microseconds
task 9.5 microseconds

PICKLING SIZES
tuple 84652 bytes
annotated_tuple 106728 bytes
task 107717 bytes

BZIP2 PICKLED SIZES
tuple 15055 bytes
annotated_tuple 18253 bytes
task 17919 bytes

@sjperkins
Copy link
Member Author

@crusaderky. this might be useful in the context of #3549. As I interpret the latest incantation, you're using dummy functions in tasks to mark tasks that shouldn't be compiled?

@sjperkins
Copy link
Member Author

I would be interested to see how well a Cython extension type does. If there's ever a time to add an optional compile-time dependency, this would be it.

@shoyer Where do you think Cython would improve things in the context of the above benchmarks?

  • Task Creation?
  • Task Pickling?

@sjperkins
Copy link
Member Author

Using a slotted Task object (not subclassed from tuple) reduces task execution time to ~0.73X of plain tuple and compressed pickle size to ~1.32X plain tuple.

TASK EXECUTION TIMING
tuple 29.9 microseconds
annotated_tuple 30.5 microseconds
task 21.6 microseconds

TASK CREATION TIMING
tuple 348 nanoseconds
annotated_tuple 666 nanoseconds
task 899 nanoseconds

PICKLING TASK TIMING
tuple 3.95 microseconds
annotated_tuple 7.09 microseconds
task 7.36 microseconds

PICKLING SIZES
tuple 48550 bytes
annotated_tuple 57578 bytes
task 64568 bytes

BZIP2 PICKLED SIZES
tuple 9695 bytes
annotated_tuple 10621 bytes
task 12851 bytes

@mrocklin
Copy link
Member

Note that from a scalability perspective we probably care more about creation and serialization costs than exectution, which will be parallelized out. Execution cost does matter though on the single machine case, especially when people try to scale down pretty hard.

@shoyer
Copy link
Member

shoyer commented Aug 17, 2018

Where do you think Cython would improve things in the context of the above benchmarks?

Task Creation?
Task Pickling?

Yes, for both of these.

Simply taking your Task class and making it a Cython extension type instead makes it about twice as fast for task creation, and a little faster for pickling:
https://gist.github.com/shoyer/7a29cc510db06310b370cd4ac537a361

@shoyer
Copy link
Member

shoyer commented Aug 17, 2018

To summarize my results (on Python 3.6) from the notebook linked above:

TASK CREATION TIMING
tuple 343 ns
Task 1.14 µs
CythonTask 635 ns
CythonTask2 548 ns

PICKLING TASK TIMING
tuple 500 ns
Task 1.55 µs
CythonTask 1.31 µs
CythonTask2 1.17 µs

(I don't know why my pickling timings are so far off)

CythonTask is simply a Cython extension type version of Task that uses a C struct internally instead of Python slots. CythonTask2 is a slightly refactored version that does less parsing in the class constructor, and which I think is probably more suitable for use in dask.

@crusaderky
Copy link
Collaborator

crusaderky commented Aug 21, 2018

Just my +1 here - this would be a first step in letting dask.array let the scheduler know the expected RAM usage of a task output, so that the dask scheduler can optimize more aggressively to minimize RAM. In my dask.array problems I frequently face chunks that are 100x the size of other chunks.

Other super useful usages would be to let the scheduler know that a task is limited by I/O or network and requires very little CPU, thus is should be run out of band.

Another benefit would be to be able to say "this is a CUDA task, so it should detract from a pool of X cuda workers instead of your CPU workers". This is fundamental for hybrid dask.array problems when only a small part of the problem is solved by CUDA, whereas the rest is run by CPU.

Finally, it would allow letting the dask scheduler know that a task will lock the GIL, thus allowing for hybrid multi-threaded / multi-process workflows.

As for the possible implementations:

  • as correctly pointed out, WIP: AST Optimization #3549 uses dummy functions as markers. It's ugly to look at, but does the job.
  • Task classes (Key and Task classes #2299) is probably more elegant although it sounds somewhat more invasive?
  • decorators in my opinion are a no-no, because if you apply a decorator on the fly it can't be pickled, and it will confuse anything that relies on detecting specific functions in the graph

@dhirschfeld
Copy link

My interest in this functionality is that I think it may enable #3344 - i.e. if you can annotate a task you specify what higher-level "job" the task belong too.

I think this functionality is similar to what @shoyer was referring to:

Another use case that comes to mind is keeping track of the provenance of a task, to enable better error tracebacks that point back directly to the source of the problem

On my dask cluster I run jobs which are composed of multiple underlying tasks. When dozens of jobs are running at any given time it's hard to know the state of any particular "job" which is the level I actually care about.

@shoyer
Copy link
Member

shoyer commented Aug 22, 2018 via email

@sjperkins
Copy link
Member Author

Thanks for the input @shoyer, @dhirschfeld and @crusaderky.

decorators in my opinion are a no-no, because if you apply a decorator on the fly it can't be pickled, and it will confuse anything that relies on detecting specific functions in the graph

I envisaged using decorators to place Annotation (and possibly now Task) objects within the graph via a rewrite mechanism, rather than embedding the decorators themselves.

One thing that should be considered is that adding Cython as a dependency could move dask away from being a pure python source distribution. This might be avoided if we required users to install with the latest pip and therefore gaining the ability to specify cython as a build requirement under the new PEP-518 system. Otherwise binary wheels would become necessary?

@jakirkham
Copy link
Member

We could always move any Cython portions into an optional external dependency. That way users can install it if they like and get the speed up or not and enjoy Dask's pure Python implementation.

Having not looked into this problem as deeply as others, this may be a naive suggestion, but is Numba useful here? If so, this might be another way to solve the problem that might alleviate some of the distribution concerns. Again this could be made optional, but would have the benefit of avoiding code duplication in Python and Cython forms.

@sjperkins
Copy link
Member Author

sjperkins commented Sep 6, 2018

We could always move any Cython portions into an optional external dependency. That way users can install it if they like and get the speed up or not and enjoy Dask's pure Python implementation.

Yes, I'm more keen on initial support for a pure python Task object which, while more expensive in terms of creation and serialisation than tuples, doesn't appreciably affect the execution performance existing schedulers. I think it's more important to work on:

  1. defining a set of standard annotations.
  2. interfaces (array_plugins) for creating annotations and by implication, new Task objects.
  3. possible updates to the Scheduler plugin interface

rather than looking to chase performance first. In any case, the older tuple tasks will still need to be supported for backwards compatibility while I expect the new Task object will not be widely used at first

My plan is to look at 1. first, supplementing the arguments passed in to update_graph with similarly named annotations ('resource', 'restrictions'). Then there'd be feature parity with the existing resource/worker restriction interface.

I just need to block out some time for this...

Having not looked into this problem as deeply as others, this may be a naive suggestion, but is Numba useful here? If so, this might be another way to solve the problem that might alleviate some of the distribution concerns. Again this could be made optional, but would have the benefit of avoiding code duplication in Python and Cython forms.

If you mean in terms of creating a Task object, then I would say no. As I understand numba it transforms

but not the inverse, which I think would be needed.

@sjperkins
Copy link
Member Author

I hacked away at this in dask/distributed#2180. Its still fairly exploratory, but the basic idea seems to be working. See for e.g. this test case.

One issue that I hadn't considered was nested tasks. Tasks like (inc, (inc, (inc, 2))) are regarded as complex and are serialised into a blob by Worker.dumps_task, as opposed to (inc, 1) whose function and args are serialised separately and stored as entries in a dict.

It seems there's a grey area here that relates to dask optimization (i.e. fuse_linear). Keys that may have existed in the graph and might be passed to worker kwargs in compute/persist could be removed by fuse_linear. Effectively the worker kwarg directive gets ignored, as the blob gets sent to t a worker, deserialised and recursively processed by execute_task. As the code is currently structured, it looks like the same can only be assumed for Annotations.

Could the requirement for serialising nested tasks be explained? It would be useful to relax this in Worker.dumps_task to handle the outer level in the same way as a simple task. This allows a single annotation to be specified for the entire nested task.

This also opens up the question of how Annotations should be merged or even culled during an optimisation process. Its not immediately clear to me how to support generality here -- perhaps some sort of user hook per annotation type that indicates whether fusing is possible. For example, if two tasks have {'worker': 'a'} and {'worker': 'b'} annotations respectively, then a fusing should be disallowed.

@quasiben
Copy link
Member

ping @mrocklin (this issue had a fair amount of activity last fall and we should probably pick it back up)

@mrocklin
Copy link
Member

Agreed that it had activity. No objection to people picking it up. Just to make it clear though, despite being pinged, I personally am unlikely to work on this short term.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants