Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue opening h5py arrays #7930

Open
N4321D opened this issue Jul 23, 2021 · 6 comments
Open

Issue opening h5py arrays #7930

N4321D opened this issue Jul 23, 2021 · 6 comments

Comments

@N4321D
Copy link

N4321D commented Jul 23, 2021

I used dask (and xarray) to combine a set of H5py files into a dataframe.
This worked great until I updated dask from 2.28 to 2021.07.1.

If I run the same script now, I always run out of memory, just after loading the files and doing any operation on the
dataframe.

I first thought it was an xarray issue (reported here: pydata/xarray#5585).
So I switched to a pure dask solution (based on: https://docs.dask.org/en/latest/array-creation.html)

However, I keep running into the same problem.

I tried 3 different methods:

  1. converting a h5py to a numpy dict and load as pandas df with dask.delayed. Then construct a dask dataframe from the delayed
  2. loading the individual files as dask arrays and convert them to dask dataframe per file, then combine the dataframes
  3. load the files as dask arrays, combine them into a big dask array, then convert that to dataframe.
    All with the same results: Kernel died, due to memory

I tried different chunk sizes. Smaller chunks just makes everything slower (incl overloading the RAM). Bigger chunks just make it faster but stilll overload the memory. In any case the kernel still crashes early in the process, due to memory overload.
Also when trying to save the dataframe, nothing/just a few kbs are written.
Therefore I suspect that there is a problem with the creation of the dataframe, not saving it.

First I suspected the transformation form array to dataframe to be the issue.
But running calculation on the array itself or saving it also cause the same issue.
That's why I think it has to do with the loading of an array from h5py.

If am doing something principally wrong here, please let me know. But this code worked great for me on earlier versions of dask.
Thank you!

Here is code to reproduce. I have 16GB RAM, so to reproduce you might need to adjust the number of files loaded if you have more RAM:

Create Datafiles (note this takes a couple of GB disk space)

import numpy as np
import h5py

from shutil import copyfile


def makefile(data, n, nfiles):
    for i in range(nfiles):
        print(f"\rCreating File: {i} / {nfiles - 1}       ", end="")
        if i == 0:
            with h5py.File(f"{i}.h5", 'w') as file:
                for par in range(n):
                    file.create_dataset(f'data/{par}',
                                        data=data,
                                        dtype='f4',
                                        maxshape=(None,),
                                        chunks= (32000,), # (dlength,),
                                        compression='gzip',
                                        compression_opts=5,
                                        fletcher32=True,
                                        shuffle=True,
                                    )
        else:
            copyfile('0.h5', f"{i}.h5")


data = np.random.randint(0, 0xFFFF, int(1e7))
makefile(data, 10, 100)    # ~100 files is enough to create an error on my 16GB RAM 24GB swap, increase if you have more RAM?
del data

Setup

import os
import h5py
import dask
import dask.dataframe as dd
import dask.array as da
import numpy as np
import pandas as pd


from dask.diagnostics import ProgressBar
ProgressBar().register()

file_list = sorted([f for f in os.listdir(".") if f.endswith('.h5')])

Method 1 (Pandas)

@dask.delayed
def hdfs_2_pd(f):
    file = h5py.File(f, 'r')["data/"]
    return pd.DataFrame.from_dict({k:v[:] for k, v in file.items()})
    
ddf = dd.from_delayed([hdfs_2_pd(f) for f in file_list])

Method 2 (ddf per file)

out = []
for fn in file_list:
    f = h5py.File(fn, "r") # HDF5 file
    d = f['/data/']
    x = da.stack([da.from_array(i, chunks=(100000,)) for i in d.values()], axis=1)
    keys = d.keys()
    out.append(dd.from_dask_array(x, columns=keys))
ddf = dd.concat(out)

Method 3: combine arrays

out = []

for fn in file_list:
    f = h5py.File(fn, "r") # HDF5 file
    d = f['/data/']
    x = da.stack([da.from_array(i, chunks=(100000,)) for i in d.values()], axis=1)
    keys = d.keys()
    out.append(x)

combined = da.concatenate(out)
ddf = dd.from_dask_array(combined, columns=keys)

Testing ddf

All these methods produce a dask dataframe. However in the next steps Memory will fully load
and the kernel will crash:

ddf['0'].mean().compute()

output:
[ ] | 0% Completed | 6min 57.9s

ddf.to_hdf("combined.h5", 
           key='/data*',
           complevel=5, 
           complib="blosc:lz4hc", 
          )

output:
[# ] | 3% Completed | 13min 8.7s

ddf.to_parquet("conbined.pq", 
               compression="gzip", 
               append=False,
               overwrite=True,
)

output:
[## ] | 5% Completed | 4min 11.9s

da.to_hdf5('./combined_arr.h5', 
           {f"/test": combined}, 
           compression='gzip', 
           shuffle=True, 
           compression_opts=5)

output:
[# ] | 3% Completed | 5min 57.9s

Environment:

  • Dask version: '2021.07.0'
  • h5py version: '2.10.0'
  • Python version: Python 3.8.2 (Ananconda)
  • Operating System: Ubuntu 21.04
  • Install method (conda, pip, source): conda
@jrbourbeau
Copy link
Member

Thanks for raising an issue @N4321D. I wonder if you're running into #7583. Could you try using the distributed scheduler to see if that resolves your memory issues? This could involve adding

from dask.distributed import Client

client = Client()

to you code snippets

@N4321D
Copy link
Author

N4321D commented Jul 23, 2021

Thanks for you quick reply.

With client calculating the mean works:
output:
32761.672892416

but I run into: #7926 with saving.

traceback (for full see #7926):

.....

HDF5ExtError: HDF5 error back trace

  File "H5F.c", line 509, in H5Fopen
    unable to open file
  File "H5Fint.c", line 1400, in H5F__open
    unable to open file
  File "H5Fint.c", line 1615, in H5F_open
    unable to lock the file
  File "H5FD.c", line 1640, in H5FD_lock
    driver lock request failed
  File "H5FDsec2.c", line 941, in H5FD_sec2_lock
    unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'

End of HDF5 error back trace

Unable to open/create file './delme/combined.h5'

@jsignell jsignell added the io label Jul 26, 2021
@gabrielmpp
Copy link

I've had the same problem using xarray's open_mfdataset to read large netcdf files. Downgrading Dask to 2.28 solved it for me.

@jsignell
Copy link
Member

I'm wondering if your original approach would work better now that #7583 is resolved. Are you able to update to latest dask and try again?

@N4321D
Copy link
Author

N4321D commented Sep 16, 2021

Thank you!
Using 2018.08.1 (newest version on anaconda)
It partly works now without the distributed cluster:

not using distributed:

  • Working: Calculating the mean
  • Working: Saving h5 with dask array (da.to_hdf)
  • Not working: Saving to h5 with dask dataframe (ddf.to_hdf) still overloads the memory and kills the kernel when at ~50% using the local cluster.

Using distributed cluster:

@jsignell
Copy link
Member

Ok thanks for the report!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants