Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saving and Loading Large Datasets #2784

Open
CSSFrancis opened this issue Jun 28, 2021 · 26 comments
Open

Saving and Loading Large Datasets #2784

CSSFrancis opened this issue Jun 28, 2021 · 26 comments

Comments

@CSSFrancis
Copy link
Member

As more users are starting to work with larger datasets (specifically datasets which even one copy doesn't fit into RAM) and operating on clusters with multiple cores there appears to be a couple of fairly severe bottlenecks with regards to the hdf5 file format.

Problems with HDF5

Specifically the main issues with h5py (and hdf5) as best I can summarize:

  • Compression of the data locks up the GIL/ operates on one Core (and is very slow)
    • Any h5py operation locks up the GIL
  • Saving when using a multiprocessing/distrubted dask scheduler throws an error Multiprocess writes using to_hdf5 dask/dask#3074
    • There doesn't seem to be a perscribed solution to this other than using zarr
    • H5py parallel support is offered but it seems to be mostly an mpi implementation which seem tricky to set up correctly and isn't very optimized https://docs.h5py.org/en/latest/mpi.html

All of these things severely limit things like parallel operations in dask where i/o is a large overhead.

Solutions

  • Adding in the ability to save/ load large datasets in the zarr file format rather than hdf5.

    • zarr follows much of the same structure as hdf5 but is designed for parallel operations and fast compression/ decompression of chunks to help speed up i/o operations.
    • Maybe throw a warning if a user is saving a lazy object over 15 GB that zarr might work better than h5py
  • Otherwise looking into the parallel h5py support might be rewarding but I haven't found a clear way to implement that yet.

I wouldn't suggest that zarr replace the hdf5 format we are currently using, rather that for larger data type applications we give the ability to use a filetype that might better suit each users needs.

Extra Information

Based on a conversation here: abTEM/abTEM#27

Just pinging some people who might have additional thoughts...
@magnunor @pc494 @din14970 @jacobjma @ericpre

@din14970
Copy link
Contributor

Looks interesting, I had honestly never heard of zarr before. I've followed discussions with @uellue who was very much in the anti-HDF5 camp from the viewpoint of performance and pointed me to the following articles which are worth a read: https://cyrille.rossant.net/should-you-use-hdf5/ and https://cyrille.rossant.net/moving-away-hdf5/. He was in favor of storing array data in raw binary form and the metadata in a separate file or simple header; I would be curious about his opinion on zarr. I think the video makes some of the main drawbacks of zarr and advantages of hdf5 clear:

  • limited support for zarr in languages other than python. There's still quite a lot of TEM people working with MATLAB, and I can imagine that some of the newer people entering the field may also be using Julia as it is being taught as first programming language at some universities. Some of the "real" computing guys might still prefer C/C++. It would be good if we can all at least open each other's datasets.
  • 20 years of existence should come with some expectation of stability for HDF5. Then again, the format has gone through quite some redesigns in the past it seems.
  • interoperability with other scientific fields. If you get an hdf5 file, you more or less know what you can do with it, you can open it and explore the data. In principle you could build a service that takes in hdf5 data from various sources and extracts information from it. It would be convenient if we could all just use the same format, and the hdf5 format is pretty much a de-facto standard.

On the last point, while I was initially of the opinion that a common data format would be super important, I now believe that the most important thing is having good open file readers and writers. Not to minimize all the other stuff that Hyperspy offers, but this is probably one of its biggest selling points: it can open almost any experimental file. This makes discussions on file formats moot - use what works.

To conclude I would mostly agree with you: keep the default hdf5, most users will probably be best served by that. On the other hand, offer zarr support for those who need it and who know what they are doing.

@ericpre
Copy link
Member

ericpre commented Jun 28, 2021

I have started to use a zarr backend in this branch, which seems to address the issue you mentioned above, pretty much in the way you described them. This branch doesn't change the hspy format and it simply copy the hdf5 dataset to zarr group, which is then used for processing.

When looking at this, I came across this issue and these posts: https://medium.com/pangeo/cloud-performant-reading-of-netcdf4-hdf5-data-using-the-zarr-library-1a95c5c92314 and https://medium.com/pangeo/cloud-performant-netcdf4-hdf5-with-zarr-fsspec-and-intake-3d3a3e7cb935. This seems to be very fluid but maybe there will be soon a solution to read hdf5 file efficiently and easily!

@CSSFrancis
Copy link
Member Author

I have started to use a zarr backend in this branch, which seems to address the issue you mentioned above, pretty much in the way you described them. This branch doesn't change the hspy format and it simply copy the hdf5 dataset to zarr group, which is then used for processing.

Very cool! I will check out the branch and see how it works in my case. This seems like a good step in the right direction.

When looking at this, I came across this issue and these posts: https://medium.com/pangeo/cloud-performant-reading-of-netcdf4-hdf5-data-using-the-zarr-library-1a95c5c92314 and https://medium.com/pangeo/cloud-performant-netcdf4-hdf5-with-zarr-fsspec-and-intake-3d3a3e7cb935. This seems to be very fluid but maybe there will be soon a solution to read hdf5 file efficiently and easily!

Hmm that is interesting. I've been working on reading .seq binary files from a DE detector into hyperspy here. Right now I've got a very chuck agnostic approach which I think is similar to what they are talking about. While the chunked approach to saving data might be faster in some cases, there is the added benefit of choosing the fast axis upon loading regardless of how the dataset is saved if you are more agnostic about saving binary data.

@din14970
Copy link
Contributor

@CSSFrancis had a glancing look at where you did here and thought maybe you might be able to get some ideas from what I made here and here. Maybe I'm misunderstanding the file structure as I didn't delve too deeply and is my thing not applicable and this comment irrelevant, but from my (limited) experience np.memmap works quite nicely with dask and it allows you to avoid a python loop over the file, you get random access, it looks cleaner and is much much faster.

@CSSFrancis
Copy link
Member Author

CSSFrancis commented Jun 28, 2021

@din14970 Thanks for the suggestion. I've considered it a couple of times but never very earnestly. The part that I have had trouble recreating without a for loop is iterating over a binary file in a non linear way. I don't know if memmap can do that or if it faster than repeatedly calling fromfile. What I have been doing is indexing all of the images in the chunks and then reading each chunk from those indexes.

What I really need is a memmap function which can take multiple offsets in. From my understanding that is what this issue is referring to. zarr-developers/zarr-python#556 ... If something like that exists then it can be used in place or saving in chunks.

@din14970
Copy link
Contributor

@CSSFrancis the memmap strategy will work if there is a predictable header/block size between frames. Whatever is the repeating element in the file you just create a custom dtype for and that's it, you then memmap over the entire dataset (possibly with one offset for a main header at the beginning) and you get the array data with memmap["array"] or whatever you called the "array" part in your dtype. I'm not sure what these chunks are in the context of this dataset, but once you memmap, pass the memmap into a dask array, and rechunk you don't have to care anymore where things came from and just operate on the dask array. No? But I guess that is what you mean with non-linear... If there is no predictable offset in between data then of course you have to loop, but then I wonder who would create such a file format. There must be a way for fast random access if there is a program with which you can browse the data.

@CSSFrancis
Copy link
Member Author

CSSFrancis commented Jun 28, 2021

@din14970
From my experience not rechunking the data is a huge advantage. When using memmap your original chunk is defined by which images are next to each other in the binary format which can be horrendous if you are trying to apply an operation not along a fast axis. If you have a dataset that is (100,100,256,256) in x then you most likely have something like a chunk structure like (1,100,256,256) but it might be more ideal to have something like (10,10,256,256). That rechunk is fairly computationally costly.

In the case that you have something huge like (8000, 100,100,256,256) (t,x,y,kx,ky) then you might have a chunk structure like (1,1,100,256,256) and now rechunking is most of the computational cost of every single operation.

Datasets like this aren't exactly common these days but are definitely going to become more and more common with faster and faster detectors. In that case it might be important to include something like chunk_shape when loading some dataset.

The rechunking operation also causes a little bit of cross talk which isn't ideal.

Maybe memmap solves these problems with a creative backend that I don't really understand. Ideally when you called rechunk for something that has only been loaded then it would reindex the data and then create a new task graph rather than the rechunk-merge that occurs now.

Edit: So I guess this is almost exactly what memmap does. I'm not sure if Dask takes complete advantage of this but hopefully it is fairly efficient.

I'm going to try and benchmark a couple of these things and hopefully this will give (at least for me) a clearer idea of what works best.

@din14970
Copy link
Contributor

to be honest I've never actually experienced significant slowdowns due to rechunking, only by trying to do operations on inappropriately chunked data. I tend to always rechunk depending on the operation I want to do... But I've never actually delved into the implementation details so you could certainly be right. All I know is that converting a .hspy file lazy loaded took an hour to convert to a .blo file with the previous loop implementation, but 3 minutes with the memmap implementation. Also loading and accessing the tvips data is fast, but only provided the signal dimensions are not chunked up. Would be interested to see your benchmarks!

@uellue
Copy link

uellue commented Jun 29, 2021

What I really need is a memmap function which can take multiple offsets in.

If you want to skip frame headers that have a constant size and occur in constant intervals, one can memory map the data as NumPy bytes in a 2D shape where each frame is a row, then slice the data out like arr[:, header_bytes:], which creates a view, and then use NumPy.ndarray.view() to reinterpret this view as the actual dtype you want and reshape() it into the desired shape.

@uellue
Copy link

uellue commented Jun 29, 2021

Edit: So I guess this is almost exactly what memmap does. I'm not sure if Dask takes complete advantage of this but hopefully it is fairly efficient.

Creating Dark arrays from memory mapped chunks works extremely well: dask/dask#7355 https://docs.dask.org/en/latest/array-creation.html#memory-mapping

From what I could see, the performance is better than any other method if the data is in the file system cache. If not, Linux does an excellent job with loading the data efficiently, while Windows is not so good at it.

@uellue
Copy link

uellue commented Jun 29, 2021

He was in favor of storing array data in raw binary form and the metadata in a separate file or simple header; I would be curious about his opinion on zarr.

@sk1p did some tests with zarr and it looked good, as far as I can recall. The most tricky part to do efficiently is compression, since that requires some form of chunking (potential read amplification), breaks the direct correspondence of index in the dataset and offset in the file, and only allows simple memory mapping if the compression is handled by the operating system. I tried memory mapping NTFS compressed files, and it worked fairly well overall, but the compression ratio was less than a chunked HDF5 file. Unfortunately this is also not very portable. @sk1p, you tried compression with zarr as well, right? I have in the back of my mind that it did a lot better than HDF5?

@uellue
Copy link

uellue commented Jun 29, 2021

to be honest I've never actually experienced significant slowdowns due to rechunking, only by trying to do operations on inappropriately chunked data.

On the threaded executor rechunking ist quite efficient. It becomes an issue on distributed Dask clusters since it requires data to be sent around through the network. The biggest performance hit I've seen so far came from read amplification with compressed HDF5. A whole chunk has to be read and decompressed whenever any part of the chunk is being accessed, and the default HDF5 chunk cache is so small (1 MB) that it is not effective on larger files and chunks. One has to be very careful with the access pattern and process chunk by chunk to avoid this. Since HDF5 can chunk in any way and applications may have other constraints on access pattern and chunk size, this makes code that reads HDF5 efficiently rather complex.

@uellue
Copy link

uellue commented Jun 29, 2021

Since HDF5 can chunk in any way and applications may have other constraints on access pattern and chunk size, this makes code that reads HDF5 efficiently rather complex.

...and for applications like interactive frame picking to explore a 4D STEM dataset, there is no way to avoid read amplification (edit: except one chunk per frame). It crawls with compressed HDF5, and flies with memory mapping since memory mapping is optimal to access arbitrary small subsets of a dataset, provided it fits in the file system cache.

@din14970
Copy link
Contributor

@uellue

If you want to skip frame headers that have a constant size and occur in constant intervals, one can memory map the data as NumPy bytes in a 2D shape where each frame is a row, then slice the data out like arr[:, header_bytes:], which creates a view, and then use NumPy.ndarray.view() to reinterpret this view as the actual dtype you want and reshape() it into the desired shape.

Is there any specific reason to do it this way as it seems more complicated? It's also possible to memmap the dataset in the desired output shape with a much cleaner distinction between frame header and data. For example, if you don't care about the information in the frame header at all and suppose these are 64 bytes with frames of 256x256 of 16-bit unsigned int values, you could create a dtype:

record_dtype = [
     ("HEADER", bytes, 64),
     ("FRAME", np.uint16, (256, 256)),
]

Suppose you then load a 4D-STEM dataset with:

ds = np.memmap(file, dtype = record_dtype, offset=main_header_offset, shape=(scan_y, scan_x))

Then you have the dataset already in a good shape with

# (scan_y, scan_x, 256, 256)
ds["FRAME"].shape

Frame header info is also automatically in the right shape:

# (scan_y, scan_x)
ds["HEADER"].shape

@CSSFrancis
Copy link
Member Author

It seems like there are a couple of different best case scenarios. There is the compressed data vs uncompressed.

Let me see if I have this correct...

For uncompressed data:

  • Store data in a simple binary format
  • Read the file using memmap
  • Then rechunking is fairly efficient and data is rather flexible

For Compressed Data:

  • Store data in a chunked structure (probably zarr, hdf5 with smaller data)
  • Rechunking operations more are costly/RAM intensive (probably avoid them unless multithreading)
  • Smaller data size --> More efficient i-o

Overall, it seems like both cases have their benefits. I think most of the data comes in as binary data streams so viewing the data from that stream should be fast as should rechunking etc. (Assuming the files are memory mapped)

Then saving the data to a chunked format should give reduced datasize faster i-o especially if you are operating mostly along some fast axis.

I would propose that we create some additional documentation basically compressing this thread.

  • Explaining common reasons for slow lazy operations
  • Binary vs compressed data type
  • When/ how to rechunk data
  • File formats --> which are compressed vs not.

@uellue
Copy link

uellue commented Jun 30, 2021

@din14970 Oh, that is neat as well! I didn't know one could "dispatch" data to different categories like this in a dtype definition. Interesting! If this method or the folding method are easier depends probably on what one is more used to... Would be interesting to know if there is a performance difference, too!

@uellue
Copy link

uellue commented Jun 30, 2021

@CSSFrancis regarding your nice summary: An additional point could be for compressed chunked data in combination with Dask arrays that the chunks in the dataset shouldn't necessarily correspond to chunks in the Dask array. In HDF5 they are currently implemented to correspond 1:1, afaik.

However, for Dask arrays a good size is about 100 MB to balance Dask overheads (1-2 ms per chunk afaik) with actual processing (1-2 GB/s for typical NumPy array operations, roughly). For compression, in contrast, 1 MB or smaller could be a good size to match typical L3 caches per core and to reduce the leverage for read amplification. Gzip has a block size of 32 kB, for example.

Maybe it could be worth experimenting with that a bit. The key would be that the created Dask array is not backed by NumPy arrays, but by objects that implement the array interface and only read the required data when it is accessed. Most important would be to implement slicing so that only the required slice is actually read. That's the "secret" that makes https://docs.dask.org/en/latest/array-creation.html#memory-mapping work so well, and the same approach could perhaps be used for compressed files with small chunks. Dask can handle that, in principle: https://docs.dask.org/en/latest/array-sparse.html

However, accessing parts of an array in a HDF5 file with a slice immediately creates a NumPy array, i.e. it is not lazy like the memory map. Maybe one could write a lazy wrapper for a slice of a HDF5 dataset that implements slicing by calculating the "slice of a slice" and only reads that part from the HDF5 dataset?

@uellue
Copy link

uellue commented Jun 30, 2021

In the context of the mmap example, the delayed function that creates the array chunks would return the lazy wrapper / proxy instead of slicing the HDF5 file immediately. Dask would do its thing with this wrapper, accepting it as "some kind of array". The wrapper would dispatch most array functions simply by reading its "native" slice from the HDF5 file and applying the array function to that. Only slicing would get a special implementation that first calculates the "slice of the slice", and only dispatches that smaller slice to the HDF5 file.

Since Dask will possibly re-use the proxy object for several calculations in a task tree and dispose of it after it doesn't need it anymore, the proxy object should probably cache the "native" slice to avoid decompressing it several times -- the HDF5 chunk cache is too small to cache good-sized Dask array chunks.

@din14970
Copy link
Contributor

I must say that after reading some of the responses my first reaction could be characterized by this fragment.
Jokes aside, it would be helpful if we can draw up some diagrams for the proposed documentation explaining the data layouts and data flows. These tend to be much more informative than long text based explanations.

@uellue
Copy link

uellue commented Jul 1, 2021

Here is a notebook that compares the "lazy HDF5 reader" method with normal loading from Hyperspy.

Summary of results: Small chunks in the HDF5 file, larger Dask array chunks and "lazy slicing" to make sure only the required data is loaded give indeed the best performance, as discussed above!

LazyHDF5Slice.zip

@uellue
Copy link

uellue commented Jul 1, 2021

If this is of interest, I could prepare a pull request to use this method for opening HDF5 files instead of the current implementation.

@uellue
Copy link

uellue commented Jul 2, 2021

FYI, today I worked a bit more on the code. It is now faster than the current HDF5 loading in Hyperspy under almost all circumstances. It is generally insensitive to the chunking of the HDF5 file and the Dask array within wide margins, minus the inherent read amplification of large HDF5 chunks. The new code does particularly well for small HDF5 chunks where the current implementation in Hyperspy fails due to massive Dask overheads.

LazyHDF5Slice.zip

The only scenario where the current HDF5 loader does better is re-chunking the Dask array in a way that increases the chunk size, since it triggers read amplification and transfers from chunk assembly. That should be avoidable in practice since there is little reason to rechunk now.

@CSSFrancis it doesn't solve the issues with writing HDF5 that you describe, but it does solve some performance "gotchas" for reading. In particular, it should greatly improve the issues with HDF5 described in LiberTEM/LiberTEM#984.

@magnunor this should be particularly good for 4D STEM data since it makes extraction of slices from the data very efficient, independent of the size and shape of the Dask array chunks. The HDF5 chunks can now be very small, which should greatly reduce read amplification.

Again, let me know if this is interesting, and I can integrate this in Hyperspy!

@ericpre
Copy link
Member

ericpre commented Jul 5, 2021

@uellue: would it be better to submit a PR to https://github.com/h5py/h5py or https://github.com/dask/dask to improve reading hdf5 file?
The speed improvements that you show in your notebook are substantial and it would be great anyone reading hdf5 in python could benefit from these changes!

@magnunor
Copy link
Contributor

magnunor commented Jul 6, 2021

I ran some quick and dirty benchmarking using one of my 4D-STEM-DPC datasets.

The same dataset (256 x 256 x 256 x 256), where the hdf5 file has been chunked two different ways:

  • (1, 1, 256, 256): "small chunk"
  • (32, 32, 32, 32): "large chunk"

The benchmarking script: https://gist.github.com/magnunor/2e209902d81d7a39186e7b99aa999f86

Results:

Get one nav position
lazyhdf5slice, large chunk 0.5438826084136963
lazyhdf5slice, small chunk 0.004204750061035156
hyperspy lazy, large chunk 0.6513786315917969
hyperspy lazy, small chunk 0.010268449783325195

Get one whole chunk
lazyhdf5slice, large chunk 0.3614935874938965
lazyhdf5slice, small chunk 0.0018658638000488281
hyperspy lazy, large chunk 0.7069470882415771
hyperspy lazy, small chunk 1.164686679840088

Get sum of whole dataset
lazyhdf5slice, large chunk 24.830284595489502
lazyhdf5slice, small chunk 23.582030296325684
hyperspy lazy, large chunk 40.935097217559814
hyperspy lazy, small chunk 64.95020604133606

Using same datasets, with pyXem's center_of_mass (https://github.com/pyxem/pyxem/blob/de140bb5dc19c0e9d1a91a725c656c2bd2c056d0/pyxem/signals/diffraction2d.py#L893). Note that there might some rechunking happening here.

Benchmarking script: https://gist.github.com/magnunor/71c95289f48bcbc7a22476f8ff6911d4

lazyhdf5slice, large chunk 56.57200646400452
lazyhdf5slice, small chunk 277.5361018180847
hyperspy lazy, large chunk 42.39361882209778
hyperspy lazy, small chunk 66.91742897033691

@uellue
Copy link

uellue commented Jul 6, 2021

@magnunor thank you for testing it, interesting results! I've tried it out and found two items that improved performance dramatically:

  • Start with a single chunk Dask array from hdf5_dask_array() and let the center_of_mass() code handle the chunking. That way the re-chunking is guaranteed to reduce the chunk size in every dimension.
  • Implement two more methods from the NumPy array interface so that they are not delegated to the wrapped array and loading the data is avoided.

I had to change it to chunk_calculations=None since I got a zero division error otherwise, just FYI. That means the tests are not comparable, probably?

Still, the issues with increasing the chunk size is apparently pretty serious since code "out there" does indeed rechunk and we can't foresee or control how that will be done! We'll probably have to rechunk for the Dask-LiberTEM integration, too, by the way.

I'd like to find out why exactly the current Hyperspy method has no issue with rechunking, while the notebook code suffers. I'll think about it and play a bit more!

@CSSFrancis
Copy link
Member Author

I started a PR for adding in support for zarr here #2798.

I think this is a nice feature to offer, especially if someone is operating with larger datasets and needs to use something like dask-distributed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants