Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing overhead from the client (and server) by using abstract graphs #3872

Open
spirali opened this issue Jun 9, 2020 · 6 comments
Open

Comments

@spirali
Copy link
Contributor

spirali commented Jun 9, 2020

Problem

The goal of this idea is to make Dask dataframe operations more scalable by removing bottlenecks in centralized components (client & server). This text is separated into two parts, first part is about removing the problem in client, the second one focuses on the server.

Let's start with the client. The main problem that prevents scalability of dataframe operations on the client is that the complexity of the current implementation of pandas operations depends on the number of used partitions. Therefore even operations like "column + 1" will be more and more expensive in the client as the cluster grows (because the number of partitions will become larger) and it will be more and more problematic. The situation gets even worse in operations like "shuffle" as the grow is superlinear.

If all Dataframe graphs could be created with more abstract operations and without referring to individual tasks, we could remove a lot of burden from the client caused by materializing such tasks. The number of partitions should be just a numeric parameter for the client, it should not affect how much work it has to do to construct the dataframe graph.

Phase 1

The goal of this phase is to make client’s code independent of the number of partitions. Hence operations like "column + 1" or "shuffle" should have constant complexity in client.

The approach is quite simple, we can move expansion of the graph to the server. So instead of constructing individual tasks in client and send a big graph to server, we just send a declarative description how to build a task graph.

It has the following benefits:

  • We avoid materialization and serialization of large tasks in the client (and one deserialization in the server).
  • Tasks constructed in the server can be put directly into the scheduler even when the whole task graph is not yet fully constructed. So the computation will start as soon as possible.
  • In the case of Rsds, constructing the task graph in Rust will be probably much faster than in Python.

The big question is to how actually describe the task graph. There are many options, e.g:

  • Python code -- This would be easy but with many disadvantages. It is not language independent, we lose some performance gains, it prevents Phase 2.
  • A fixed set of graph operations that are supported by the server (e.g. for a map operation, for shuffle) - This would be ideal if we could find a small fixed set of task graph expanders and just support them. It is probably doable for Dataframe; however, flexibility of this solution is bad for a generic user code.
  • Presburger arithmetic (PA) - PA is a fragment of arithmetics without multiplication. I strongly believe that all real-world dependencies between tasks can be efficiently described by PA. PA is decidable and many operations can be implemented quite efficiently for real-world formulas. I have implemented a solver of PA via automatic structures (https://github.com/spirali/pas) and first experiments with task-like graphs seems promising.

This phase needs the following modifications:

  • HighlevelGraph that contains specific tasks has to be replaced by an "abstract graph" that contains only description of tasks. Despite the fact that HighlevelGraph is nearly everywhere, it is not as bad as there is only a relatively small number of places where the graph is constructed.
  • The server needs to know how to expand the graph.
  • The server needs to know how to compose arguments for a function. It does not have to understand or construct arguments itself, it just needs to compose the wrapping list.
    It will induce a small change in the protocol (and therefore also in the worker). Right now task arguments are serialized as a pickled list of arguments. It has to be changed to a msgpack list with pickled elements.

If you know of any examples of dataframe graphs that could not be constructed from such abstract operations and that need to work with specific individual tasks, please let us know.

Phase 2

When Phase 1 is finished and when the right description is chosen, we can also avoid the expansion of the task graph in the scheduler. This change is much more complex as the scheduler has to be able to do some nontrivial operations with the task graph while it has to remain in the abstract form. I believe that it can be done for some reasonable set of graph expanders. But I also have some weak evidence and ideas how to do it in PA while maintaining reasonable complexity for practical cases.

The main negative is that it would need a big change in the scheduler and also in workers (but transparent to client if phase 1 is finished).

Our road plan

  • Introduce "CompactGraph" as an abstract replacement for "HighlevelGraph", in the first prototype with a fixed set of graph expanders suitable for pandas-like operations.
  • Implement expanders in the server (probably into rsds as we are more familiar with the code).
  • Evaluate results. If a simple declarative description is not enough then try PA solution.
  • If it works and elimination of client overhead is not enough, then go to Phase 2.

This is a very generic overview of our plan, we would be very glad for any comments, if it makes sense to you and/or if there is already some work like this going on.

Relevant issue: #3783

@quasiben
Copy link
Member

quasiben commented Jun 9, 2020

This may be of interest to @madsbk and @jacobtomlinson

@madsbk
Copy link
Contributor

madsbk commented Jun 9, 2020

Sound interesting @spirali.

The idea of letting the server expand/generate task based on client specification is definitely something that can reduce the scheduling overhead significantly. I have been working on something similar: #3765 and #3811.

Using a declarative description of operations is also a great idea. Dask has Blockwise, which support much of your Phase 1 already but the full graph is still materialized before it is send to the scheduler.

@spirali
Copy link
Contributor Author

spirali commented Jun 10, 2020

@madsbk Thank you for the links on your ideas. I will look at them.

I am aware of Blockwise and it is one of reasons why I think that moving from
materializing HighLevelGraph should be less work that it seems at a first sight.

@mrocklin
Copy link
Member

Some thoughts:

In general, yes I agree that moving around compact representations between the client and scheduler sounds like a good idea. As you say it can help us reduce overhead short term, and in the future it can provide more space for optimizations for people who want to build other schedulers.

The subgraphs within HighLevelGraphs are currently arbitrarily complex, so they're probably not a good target for this, but yes I agree that things like Blockwise are great examples of operations that could be replicated in the scheduler. I'm not comfortable sending Python code to be executed in the scheduler, but probably we can find a small number of patterns to handle the most costly operations, and then fall back to ordinary task graphs for the rest. My inclination would be to hard-code a set of graph patterns that the scheduler knows about in Python code, and then have the Client refer to that set of graph patterns.

Breifly, I want to make it clear that we're never going to be able to cover all graph types, and so we're always going to need to support fully arbitrary custom graphs.

Two challenges:

  1. How do we handle task graph optimization? Currently we do an optimization pass with high level graphs, and then we do a second pass with low-level graph optimizations. My guess is that in order to achieve a reasonable user experience we're going to need to replicate most of the value of the low-level graph optimizations in new high-level graph optimizations.

    For example, consider a random-access computation like (df + 1).loc["2020-01-01"]. Currently we generate the task graph for df + 1 as a Blockwise graph, then we generate it, then we call loc which brings us down to just a few partitions. When we optimize the low-level optimizations cull the unnecessary tasks from the graph.

    So if we want to do a scheduler change like this then we may also have to improve the downstream libraries' optimizations in order to achiee the same user experience. This is possible, but probably requires work outside of the scheduler codebase.

    The same thing happens with dask arrays and slicing, but is a bit more complex.

  2. Graph ordering. Currently ordering is done on the client side. We will have to move this to the scheduler side. This is probably a good idea regardless if we're thinking about accelerating the scheduler. People here probably need to become familiar with the dask.order module though.

@spirali
Copy link
Contributor Author

spirali commented Jun 10, 2020

I would like to just say that my goal is not to get rid of HighLevelGraph (and others) entirely. I agree that there should always be an API for arbitrary graphs. My goal is to introduce a new API for large regular graphs. And as a proof of concept, I want replace usages of HighLevelGraph as an internal representation in DataFrame operations, because these graphs are regular and the end-user does not work with individual tasks. Also, this approach is compatible with the original approach as abstract graphs may always be materialized and connected to ordinary tasks.

Sorry that I did not emphasised this in the original post.

@mrocklin
I will look on your examples more closer to give a precise answer. My quick answer now: in the case of PA solver, reachability analysis is straightforward and can be done directly in the abstract representation. So unnecessary tasks will not only be removed from the graph it will also never be materialized at all.

@spirali
Copy link
Contributor Author

spirali commented Jun 16, 2020

Task arrays

We have implemented a prototype for declarative task graphs.

Our implementation is based on "Task arrays" a group of tasks that is defined but not materialized in the client.

The basic usage is the following:

# "index" is an imported object from a module with task arrays
TaskArray(100, do_something, [index * 123])  

which is roughly equivalent to:

[do_something(index * 123) for index in range(100)]

Task arrays can be combined together (example of "map" operation):

array1 = TaskArray(100, load_data, [index])
array2 = TaskArray(100, add, [array1[index], 1])

which is equivalent to:

array1 = [load_data(index) for index in range(100)]
array2 = [add(array1[index], 1) for index in range(100)]

It is not yet fully integrated into Distributed, we have only implemented _taskarrays_to_futures (as an analogue to _graph_to_futures).

So you can do the following:

fs = client._taskarray_to_futures(array2)
result = client.gather(fs)

[Note: The current implementation materializes futures for the "last layer" of task arrays, all other task arrays are not materialized. In this example, array1 is also submited as it is a dependency for array2, but array1 will be always a small object in the client not matter how many tasks it contains.

Later, we want to create a future that wraps the whole task array so we can avoid materalizing even the last layer]

Benchmark

We have used the following simple shuffle implementation:

import distributed

from distributed.taskarrays import TaskArray, index

import random
import itertools

def create_data(index):
    size_of_input = 1000
    random.seed(index)
    return [random.randint(0, 1_000_000) for i in range(size_of_input)]


def make_partitions(data, n_partitions):
    result = [[] for _ in range(n_partitions)]
    for item in data:
        result[item % n_partitions].append(item)
    return result

def get_item(data, i):
    return data[i]


def join(data):
    return list(itertools.chain.from_iterable(data))


def main():
    client = distributed.Client("localhost:8786")

    in_parts = 1000
    out_parts = 1000

    # Eager equivalent: input = [create_data(index) for index in range(in_parts)]
    input = TaskArray(in_parts, create_data, [index])

    # Eager equivalent: parts = [make_partitions(input[index], out_parts) for index in range(in_parts)]
    parts = TaskArray(in_parts, make_partitions, [input[index], out_parts])

    # Eager equivalent: items = [get_item(input[index // out_parts], index % out_parts) for index in range(in_parts * out_parts)]
    items = TaskArray(in_parts * out_parts, get_item, [parts[index // out_parts], index % out_parts])

    # Eager equivalent: joins = [items[index::out_parts]) for index in range(out_parts)]
    joins = TaskArray(out_parts, join, [items[index::out_parts]])
    fs = client._taskarray_to_futures(joins)
    print(client.gather(fs))

This is version with task arrays; for comparison we have used @delayed version of the shuffle where graph is materialized in the client.

Results

image

  • The benchmark was executed on a cluster with 1 node (=24 cores; denoted as cls=1n) and 7 nodes (=168 cores; denoted as cls=7n), one worker per core.
  • "v" is the size of input and output partitions.
  • "ta" (orange) is version with task arrays
  • "eager" (blue) is @delay version
  • both versions were executed with rsds server
  • task array version is implemented up to "phase 1", i.e. task arrays are materialized in the server
  • times are full run of the benchmark including the the computation

Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants