Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidating all tasks that write to a file on a single worker #2163

Open
shoyer opened this issue Aug 6, 2018 · 5 comments
Open

Consolidating all tasks that write to a file on a single worker #2163

shoyer opened this issue Aug 6, 2018 · 5 comments

Comments

@shoyer
Copy link
Member

shoyer commented Aug 6, 2018

A common pattern when using xarray with dask is to have a large number of tasks writing to a smaller number of files, e.g., an xarray.Dataset consisting of a handful of dask arrays gets stored into a single netCDF file.

This works pretty well with the non-distributed version of dask, but doing it with dask-distributed presents two challenges:

  1. There's significant overhead associated with opening/closing netCDF files (comparable to the cost of writing a single chunk), so we'd really prefer to avoid doing so for every write task -- yet this is what we currently do. I haven't measure this directly but I have a strong suspicion this is part of why xarray can be so slow for writing netCDF files with dask-distributed.
  2. We need to coordinate a bunch of distributed locks to ensure that we don't try to write to the same file from multiple processes at the same time. These are tricky to reason about (see fix distributed writes pydata/xarray#1793 and xarray.backends refactor pydata/xarray#2261) and unfortunately we still don't have it right yet in xarray -- we only ever got this working with netCDF4-Python, not h5netcdf or scipy.

It would be nice if we could simply consolidate all tasks that involve writing to a single file onto a single worker. This would avoid the necessity to reopen files, pass around open files between processes or worry about distributed locks.

Does dask-distributed have any sort of existing machinery that would facilitate this? In particular, I wonder if this could be a good use-case for actors (#2133).

@mrocklin
Copy link
Member

mrocklin commented Aug 6, 2018

Actors are experimental and may be removed at any time. I don't recommend that XArray depend on them. They're also advanced technology, and probably bring along problems that are hard to foresee. That being said, yes, they would be a possible solution here to manage otherwise uncomfortable state.

I wonder how much of this problem could be removed by consolidating data beforehand into a single task or into a chain of dependant tasks? Are files are going to be much larger than an individual task? Would creating artificial dependencies between tasks help in some way?

Do you have more information about what is wrong with distributed locking? This approach seems simplest to me if it is cheap.

@shoyer
Copy link
Member Author

shoyer commented Aug 6, 2018

Are files are going to be much larger than an individual task?

Often, yes. It's pretty common to encounter netCDF files consisting of 1-20 arrays with total size in the 200MB-10GB range. This is solidly in the "medium data" range where streaming computation is valuable.

We could encourage changing best practices to write smaller files, but users will be surprised/disappointed if switching to dask-distributed suddenly means they can't write netCDF files that don't fit in memory on a single node.

This does probably make sense when using netCDF backends like scipy that don't (yet?) support writes without loading the entire file into memory.

Would creating artificial dependencies between tasks help in some way?

Yes, I think this could also work nicely, at least to resolve any need for lockings. The downside is that we would need a priori knowledge of the proper task ordering to handle streaming computation use cases.

Do you have more information about what is wrong with distributed locking? This approach seems simplest to me if it is cheap.

I'm pretty sure that with more futzing/refactoring I could locks and reopening files for every operation working. The overhead could be minimized with appropriate (automatic?) rechunking.

Maybe this is the better way to go.

@mrocklin
Copy link
Member

mrocklin commented Aug 6, 2018 via email

@jakirkham
Copy link
Member

If you do go the copying direction, this may be helpful.

@jakirkham
Copy link
Member

Other things to consider, had been playing with the idea of sending the graph over to a worker to run. ( dask/dask#3275 ) Maybe something with Variable makes sense. We could revisit locking for each write. ( dask/dask#3179 ) Maybe something involving per worker resources could allow us to force tasks to jobs with the needed resource. That might work well, but we will have to probably do a few passes to get syntax that succinctly captures the intent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants