Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use atop fusion in dask dataframe #4229

Merged
merged 64 commits into from Dec 27, 2018
Merged

Conversation

mrocklin
Copy link
Member

The high level atop fusion layers can be used in dask dataframe as well as dask array. Today this makes it cheaper to perform task fusion on dataframes with many partitions. In the future this should make it easier to build more sophisticated high level optimizations.

This currently builds on #4092

This currently breaks tests. There is a fair amount that we can clean up in both dask.dataframe's broadcast operations (map_partitions, elemwise) and in the dask.array.atop code.

  • Tests added / passed
  • Passes flake8 dask

@mrocklin
Copy link
Member Author

OK, most dataframe tests pass here now

@mrocklin mrocklin changed the title [WIP] Use atop fusion in dask dataframe Use atop fusion in dask dataframe Dec 10, 2018
@mrocklin
Copy link
Member Author

OK, this is ready for review. There is some future work though:

  • Regression: I've turned off dataframe-parquet column projection for now. (The previous implementation was very brittle). I think that we'll want to redo it anyway in short order.
  • I think that we should rename atop to blockwise, or something else (see Better name for atop? #4035)
  • There are issues around fusing diamond-like atop graphs. We might need to repeat some of the work of @jcrist and @eriknw at a higher level.

@mrocklin
Copy link
Member Author

@jcrist @TomAugspurger if either of you have time it would be helpful if you could take a look at this

@TomAugspurger
Copy link
Member

TomAugspurger commented Dec 13, 2018 via email

dask/dataframe/core.py Outdated Show resolved Hide resolved
dask/dataframe/core.py Outdated Show resolved Hide resolved
assert len(b) <= 15


@pytest.mark.xfail(reason="need better high level fusion")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is future work. We need better fusion on the high level graph level. cc @jcrist ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that'd be fun to work on. Is this blocking for this PR to merge, or just future work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking. Things are better with this PR (df + 1 + 2 + 3 + 4 happens in one task) but could be much better still with better fusion. In particular I've seen the use case in this failing test happen frequently. It seems that common use of Pandas includes dozens of lines of modifying columns in place, all of which generate diamond-like graphs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR itself could also use review though.

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good, but I haven't really kept up with the new high-level graph stuff.

Only request would be a docstring for the new broadcast method. And could you explain the reasoning behind that name? IIUC, adds a task to each partition / block of the input args? I worry a bit about broadcast because it clashes with NumPy's concept (and broadcasting data in distributed), but I haven't come up with a better name.

@mrocklin
Copy link
Member Author

@mrocklin
Copy link
Member Author

Only request would be a docstring for the new broadcast method. And could you explain the reasoning behind that name?

Renamed to partitionwise_graph and added a docstring. I also moved it into core.py

@bluecoconut
Copy link
Contributor

Just did a test locally and this branch had a dramatic impact (~2x improvement) in performance for the code below, and even has a lower memory footprint for the larger stat tests.

import dask
import dask.datasets
import numpy as np
import time
from distributed import Client

client = Client()
client

df = dask.datasets.timeseries()
df = df.repartition(npartitions=300)
df = client.persist(df)

def random_indexer(df):
    indexer = ~df.index.isnull()
    for i in range(np.random.randint(15)+1):
        col = np.random.choice(['x','y'])
        value = np.random.uniform(-1,1)
        op = np.random.choice([lambda x, y: x < y, lambda x, y: x > y])
        indexer = np.logical_and(indexer, op(df[col], value))
    return indexer

def random_statistic(indexer, df):
    col = np.random.choice(['x', 'y', 'name'])
    if col == 'name':
        op = np.random.choice([lambda x: x.unique().size, np.min, np.max])
    else:
        op = np.random.choice([lambda x: x.unique().size, np.min, np.max, np.sum, np.mean])
    return op(df[col][indexer])

np.random.seed(137)
stats = []
for i in range(10):
    ind = random_indexer(df)
    for k in range(20):
        stats.append(random_statistic(ind, df))

st = time.time()
stat_computed = client.compute(stats)
ft = time.time()
print(ft-st)

st = time.time()
stat_results = client.gather(stat_computed)
ft = time.time()
print(ft-st)

For calculating 200 statistics

(10 unique filtering, 20 different statistics from that filtered subset)

npartitions graph create (master) graph create (high-level) execution (master) execution (high-level)
100 1.7s 2.5s 10.2s 5.6s
300 5.4s 8.1s 31s 15.3s
600 11.6s 16s 59 s 29.35s

For calculating 2000 statistics

I also tried to increase the scale of stats (for i in range(100), 2000 stats total) and I see the same garbage collection warnings in both branches, and memory still seems to grow a lot more than I anticipated.

However, on this branch, the calculation with 2000 stats actually completes! Also, the Bytes stored on the dashboard seems to be accurate (though quite large, from 1.5 GB after persist(df) to 7 GB stored after calculating stats. (will need to dig into why there is so much data left in the cluster next).

The biggest difference from current 1.0.0 master is that master actually doesn't complete this same calculation on my machine. It has the memory issue (?) that even prevents this from completing, heading into swap and eventually killing workers.

@mrocklin
Copy link
Member Author

Thank you for trying things out @bluecoconut and for the benchmark. It's always nice to see things work well on unseen problems :)

I look forward to finding out why graph construction was more expensive (I would have expected it to decrease).

I suspect that you'll see additional boosts as we get a bit better at fusing things at the high level (see the comments on the xfailed test).

@mrocklin
Copy link
Member Author

Merging this tomorrow if there are no further comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants