Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow explicit chunk size for to_zarr #5953

Open
manzt opened this issue Feb 27, 2020 · 7 comments
Open

Allow explicit chunk size for to_zarr #5953

manzt opened this issue Feb 27, 2020 · 7 comments

Comments

@manzt
Copy link
Contributor

manzt commented Feb 27, 2020

Description

Currently working on a project with zarr and dask where the on-disc dataset has less than optimal chunk sizes for computation with dask (specifically creating image tiles). Fortunately, data.array.from_zarr allows you to specify the chunk sizes explicitly and optimize for computation, but dask.array.to_zarr does not allow the reverse when creating an array.

Example

z = zarr.open('my_image.zarr')
print(z.shape)
# (50000, 50000, 4)
print(z.chunks)
# (512, 512, 4)

img = da.from_zarr('my_image.zarr', chunks=(5000, 5000, 4))
downsampled = da.coarsen(np.mean, img, {1: 2, 2: 2})
downsampled.to_zarr("downsampled01.zarr", chunks=(512, 512, 4))
# TypeError: DelayedLeaf object got multiple values for keyword argument 'chunks'

A work around at the moment is:

downsampled.rechunk((512, 512, 4)).to_zarr("downsampled01.zarr")

However, it seems that there is likely a bit of overhead with rechunking to this extreme just to write to a store on-disc. Many thanks in advance for having a look at this. I love using dask!

@mrocklin
Copy link
Member

However, it seems that there is likely a bit of overhead with rechunking to this extreme just to write to a store on-disc

Not necessarily. We would need to rechunk anyway. I think that using rechunk before to_zarr is likely as good as we can do here. I'm inclined to leave things as they are. It might make sense to add a line in the docstring of to_zarr to point people to rechunk if they want to control chunking though.

I may not fully understand things. Is there a particular reason why you're concerned about rechunking, or is this just a general concern about uncertainty around rechunking performance.

@manzt
Copy link
Contributor Author

manzt commented Feb 28, 2020

Thanks for the response. This is generally a concern about rechunking performance but I very well could be off in my understanding. Maybe I need to look into dask.array.to_zarr a bit further, but in the current implementation I'm guessing one dask chunk is written to a corresponding chunk in the zarr store.

If the dask chunks sizes are some multiple of the zarr chunks, then would it be possible for that dask chunk to write to multiple zarr chunks rather than needing to rechunk? Maybe this is quite minor in terms of performance...

I would like to used delayed + dask array like this, but if I understand correctly using delayed with other dask collections should be avoided:

import dask.array as da
from dask import delayed
import zarr

z = zarr.open("my_data.zarr", shape=(50000, 50000), chunks=(512, 512)) # create mutable store
x = da.random.random((50000, 50000), chunks=(4096, 4096))

@delayed
def write_chunks(z, x, selection):
    z[selection] = x[selection]

tasks = []
for i in range(0, 50000, 4096):
    task = write_chunks(z, x, (slice(i, i + 4096), slice(i, i + 4096)))
    tasks.append(task)

dask.compute(tasks)

In this senario, each dask chunk is responsible for writing to 16 zarr chunks and there shouldn't be issues with trying to write to the same chunks.

@mrocklin
Copy link
Member

mrocklin commented Mar 1, 2020

If the dask chunks sizes are some multiple of the zarr chunks, then would it be possible for that dask chunk to write to multiple zarr chunks rather than needing to rechunk?

That sounds like a reasonable use case that is not currently supported . cc @jakirkham , who works in this space in case he has interest.

I would like to used delayed + dask array like this, but if I understand correctly using delayed with other dask collections should be avoided:

Using Dask array with Dask delayed is definitely doable, but there are many ways to shoot yourself in the foot. The way that you have proposed is one such way. I recommend not doing that. When you pass a Dask array into a delayed function the dask array will become a numpy array.

@jakirkham
Copy link
Member

Yeah sounds reasonable. Are you interested in doing a PR? 😉

@manzt
Copy link
Contributor Author

manzt commented Mar 1, 2020

I am interested in doing a PR! However, considering my first suggestion seems to be a foot-gun I would appreciate some guidance in approach.

@jsignell
Copy link
Member

If you are still interested in doing this PR @manzt it seems to me like you could just tweak to_zarr to accept chunks, and then pass chunks to zarr.create or zarr.array if #7379 merges.

@jsignell jsignell added the io label Mar 12, 2021
@jakirkham
Copy link
Member

Yeah I think the normal way to do this would be to rechunk since Dask isn't doing anything with locking.

If the chunks in Zarr are smaller than those in Dask and all Zarr chunks fit within Dask chunks, that may be safe to use without locking. However any deviations will require we use Zarr locking. I'm not totally sure how we will handle that when using the Distributed Scheduler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants