Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation for set_index(col, compute=True) is unclear/inaccurate #8415

Open
DahnJ opened this issue Nov 23, 2021 · 13 comments
Open

Documentation for set_index(col, compute=True) is unclear/inaccurate #8415

DahnJ opened this issue Nov 23, 2021 · 13 comments
Labels
dataframe documentation Improve or add to documentation needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Comments

@DahnJ
Copy link
Contributor

DahnJ commented Nov 23, 2021

I think the documentation is currently unclear/inaccurate about the nature of the compute parameter for set_index:

df.set_index(col, compute=True)

The documentation currently contains this description:

compute: bool, default False

  • Whether or not to trigger an immediate computation. Defaults to False. Note, that even if you set compute=False, an immediate computation will still be triggered if divisions is None.

This would suggest that if I provide divisions and set compute=True, immediate computation will be triggered. This only seems to be the case when using shuffle=disk, however. Even then, it's not clear to me what is actually being computed.

Examples from the SO question where I originally asked about this (What does set_index(col, compute=True) do in Dask?):

import dask.datasets
df = dask.datasets.timeseries()

# Nothing gets submitted to the scheduler
df.set_index(
    'name', 
    divisions=('Alice', 'Michael', 'Zelda'), 
    compute=True
) 

Going down the stack of functions set_index actually calls, it appears that the only place where compute is actually used in rearrange_by_column_disk. And indeed:

# Still, nothing gets submitted
df.set_index(
    'name', 
    divisions=('Alice', 'Michael', 'Zelda'), 
    shuffle='tasks',
    compute=True
) 

# Something is computed here
df.set_index(
    'name', 
    divisions=('Alice', 'Michael', 'Zelda'), 
    shuffle='disk',
    compute=True
) 

If I'm correct, then I believe the documentation should reflect the fact that this setting only affects the shuffle=disk case. Also, I can't work out from the documentation what is actually being computed — "immediate computation" of what?

@scharlottej13
Copy link
Contributor

Thanks for raising this issue and for the examples-- I was able to reproduce what you're describing and was actually about to open an issue after reading your SO post. @ian-r-rose and I had a good discussion on this, and it does seem that at the very least the documentation should be updated to reflect this behavior, but it's possible this could be a bug. @gjoseph92 maybe you have thoughts on this?

@ian-r-rose ian-r-rose added dataframe documentation Improve or add to documentation labels Nov 24, 2021
@DahnJ
Copy link
Contributor Author

DahnJ commented Nov 24, 2021

@scharlottej13 thanks for looking into it!

Btw, do please upvote any SO questions you find useful — it's a minor thing that nonetheless helps the question get recognized and lets the asker know that they're at least being useful :)

@gjoseph92
Copy link
Collaborator

A quick skim of the code:

if compute:
graph = HighLevelGraph.merge(df.dask, dsk1, dsk2)
graph = HighLevelGraph.from_collections(name, graph, dependencies=[df])
keys = [p, sorted(dsk2)]
pp, values = compute_as_if_collection(DataFrame, graph, keys)
dsk1 = {p: pp}
dsk2 = dict(zip(sorted(dsk2), values))

makes me think that compute=True with shuffle='disk' causes it to immediately write all the data to disk, but not read any back. What gets left in the graph (run when you compute the resulting DataFrame) is reading that data back out of disk.

It's not immediately clear to me what the value of this is? Seems like it was added long, long ago in #378.

The documentation is certainly lacking here, but I'm not sure how much reason there even is to keep the feature around?

@ian-r-rose
Copy link
Collaborator

The documentation is certainly lacking here, but I'm not sure how much reason there even is to keep the feature around?

I agree that it's not obvious that compute= is a useful feature to have. A quick flip through the code finds the same pattern in a few places:

  • DataFrame I/O
  • Array I/O
  • DataFrame shuffling
  • DataFrame head/tail

I don't think these are hugely valuable options, personally, though unfortunately compute=True is the default for a number of the dataframe methods, and they are not obscure, so it would take a long-ish deprecation cycle to remove them.

@DahnJ
Copy link
Contributor Author

DahnJ commented Nov 29, 2021

Regarding the usefulness:

compute=True methods

I don't think these are hugely valuable options, personally, though unfortunately compute=True is the default for a number of the dataframe methods, and they are not obscure, so it would take a long-ish deprecation cycle to remove them.

I haven't gone through them all, but regarding I/O and head (and I suspect all methods where compute is True by default), the compute option is absolutely useful to me.

By default, df.head() computes the head. If I instead want to get a lazy object, perhaps because I want to visualize its computational graph, then I use compute=False. I have used that parameter for example in this question.

Shuffle-related compute=None methods

I am still not 100% sure I understand, but if I understood @gjoseph92 correctly, then df.set_index(col, divisions=divisions, shuffle='disk', compute=True) is essentially akin to df.set_index(col, divisions=divisions).persist(), but "persists" to disk instead of worker memory?

If that's the case, then this does seem useful, but then the API is still somewhat confusing (perhaps should instead be a parameter of persist ?)

@gjoseph92
Copy link
Collaborator

Agreed that compute=True is certainly useful in other places, head being a prime example. I was only referring to set_index specifically when I said that. @ian-r-rose for I/O, do you specifically mean O (to_csv, etc.)? I'm not seeing a compute in read_csv or read_parquet, but maybe there's another input function I'm not thinking of that does this.

If compute is actually just used in output functions + head/tail/introspection-type things, you could generalize and say it's really about making functions that you'd use for their side effects—that feel like they should run immediately—actually run immediately. Output has an obvious side effect; head/tail is probably most often used for the "side effect" of printing the first few rows in your notebook. Compare this to functions that produce some meaningful data you'd pass into other functions... like set_index.

df.set_index(col, divisions=divisions, shuffle='disk', compute=True) is essentially akin to df.set_index(col, divisions=divisions).persist(), but "persists" to disk instead of worker memory?

@DahnJ yes, basically. You'd probably use it in a similar way. Being pedantic, it is slightly different, in that you are writing 100% of the data to disk, but in an intermediate form where there's still a little work left to do to get it in its final shuffled form. It's basically writing [a, b, c, d]; when you read it back, you still have to do pd.concat([a, b, c, d]). But that's all.

If there was some more general way to have persist write to disk instead of to memory, then I think there'd be no use for the argument in set_index at all. As it is now, you could always do df.set_index(col, divisions=divisions).to_parquet(...), and then another from_parquet step for reading it back. I'm not sure how performance compares, but I might consider that better practice anyway, since you'd be using a "real" format for the data on disk, as opposed to whatever serialization scheme dask is using in the current version that may be incompatible with the next.

@ian-r-rose
Copy link
Collaborator

ian-r-rose commented Nov 29, 2021

@ian-r-rose for I/O, do you specifically mean O (to_csv, etc.)? I'm not seeing a compute in read_csv or read_parquet, but maybe there's another input function I'm not thinking of that does this.

Yes, I'm referring to "O".

If compute is actually just used in output functions + head/tail/introspection-type things, you could generalize and say it's really about making functions that you'd use for their side effects—that feel like they should run immediately—actually run immediately. Output has an obvious side effect; head/tail is probably most often used for the "side effect" of printing the first few rows in your notebook. Compare this to functions that produce some meaningful data you'd pass into other functions... like set_index.

I suppose you could make this argument, but it seems pretty slippery to me. I can say that I've been surprised by all of the above having compute=True as a default before, and it would make more sense to me to have all operations be lazy, and just ask users to do df.head().compute(). But I can appreciate that there is a lot of code out there which probably relies on the immediate execution, so it would be a tough sell to change the default.

@gjoseph92
Copy link
Collaborator

Agreed, it is a little odd that in this lazy computing library, a handful of functions are not lazy by default. And that that is probably not going to change.

Back to this one in set_index though, do we think the compute=True option is worth having? (When you could achieve something similar, but maybe not quite a performant, with a to_*+read_* if you wanted.) I feel like the choice here is between figuring out how to document this odd option, or going through a deprecation cycle to remove it.

@DahnJ
Copy link
Contributor Author

DahnJ commented Nov 30, 2021

To me, the interesting question would be whether using this option provides any improvements compared to using to_parquet. If not, then I'm in favour of deprecating.

Based on this quick benchmark, it seems that while the writing is faster, reading is much slower (possibly because of the necessity to still concat as mentioned by @gjoseph92 ), resulting in a similar overall performance.

import dask

import dask.dataframe as dd
df = dask.datasets.timeseries()

# `compute=True` 
# 8.97s
dfb = df.set_index('name', divisions=('Alice', 'Michael', 'Zelda'), shuffle='disk', compute=True)
# 36.9s
dfb.compute()


# `to_parquet`
# 49s
df.set_index('name', divisions=('Alice', 'Michael', 'Zelda'), shuffle='disk').to_parquet('test.temp')
# 1.21s
dd.read_parquet('test.temp').compute()

One more thing to possibly consider: where are the files written to? If to the workers' disks, then this is possibly still a significantly different operation from what to_parquet provides. If that was the case, having a disk option in persist again seems like a cleaner option.

An additional point is that I think it might be quite likely that only a very few people are using this option (an perhaps even fewer who actually do so on purpose), given that it's so hard to work out what it even does.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Dec 1, 2021

Funny, running your benchmarks on my machine (2019 Intel MacBook Pro, 8-core, SSD), they only take 6-8sec, not the 45sec-1min you posted. And with compute=True, the dfb.compute() part is also ~3x slower than the set_index part. But parquest is slightly slower for me as well.

This makes sense, because with to_parquet you're doing all the same work as the compute=True case (write pieces to disk via Dask's shuffle, read them back), plus reading and writing the same amount of data again through parquet. So all this really tells us is that parquet is fast :)

The fact that with compute=True, you're only "persisting" the half of the operation that's fast-ish anyway makes me see even less value in it. If you're actually going to re-compute the shuffled DataFrame multiple times, parquet is clearly far, far faster—in your benchmark, you'd be re-computing a 1sec operation instead of a 37sec operation. Again this makes sense: if you're bothering to persist something, you want to optimize for subsequent re-use (reads) of it to be as fast as possible, even if that makes the persist slightly slower. Persisting the fully-shuffled data does just this.

One more thing to possibly consider: where are the files written to? If to the workers' disks, then this is possibly still a significantly different operation from what to_parquet provides.

Yup, written to workers' disks. It's true that with multiple worker machines, you probably couldn't get to_parquet to write bits of the data to each worker's disk. However, you also can't use shuffle="disk" with multiple worker machines for the same reason (they don't share a disk!). And you certainly couldn't use shuffle="disk", compute=True safely, because the tasks to read back in the data might not run on workers where the data was actually written! So basically, this comparison is only even relevant for single-machine use.


All this is making me lean towards deprecating the compute option. Its use is very fringe, its performance benefit seems dubious, yet everyone has to see and be confused by that parameter that's not relevant in most cases. And if we do #8435, it would become even more confusing (if there's both divisions="compute" and compute=False).

@DahnJ
Copy link
Contributor Author

DahnJ commented Dec 1, 2021

All this is making me lean towards deprecating the compute option. Its use is very fringe, its performance benefit seems dubious, yet everyone has to see and be confused by that parameter that's not relevant in most cases.

This makes sense to me.

However, I still want to explicitly ask about the option of having persist with a disk option. I have only ever used LocalCluster so far, so I could easily utilize to_parquet, but what would I do if I use a distributed cluster, want to persist my data on the workers, but don't have enough memory?

I would probably start this as another issue since it's independent of this issue, but I first wanted to ask you to see if I'm not missing something.

@scharlottej13
Copy link
Contributor

Thanks all for the discussion! To summarize, it sounds like there are two options (1) to deprecate the compute option in set_index() or (2) to improve the documentation so it's clear that compute=True is only used when shuffling with disk, and therefore is only relevant when using a single machine. @jsignell, would you like to weigh in here?

@jsignell
Copy link
Member

Thanks for the ping, I am all for deprecating surprising behaviors and eager computations. @DahnJ did you open an issue for adding a disk option to persist? That does seem like an interesting idea. I wonder if it could be supported that the workers could just pick up the pieces of data that they know about even if they don't share a disk.

@scharlottej13 scharlottej13 removed their assignment Mar 1, 2022
@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Sep 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe documentation Improve or add to documentation needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.
Projects
None yet
Development

No branches or pull requests

5 participants