Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Loaders #96

Closed
zain-sohail opened this issue Jan 27, 2023 · 18 comments
Closed

Parallelize Loaders #96

zain-sohail opened this issue Jan 27, 2023 · 18 comments
Labels
enhancement New feature or request

Comments

@zain-sohail
Copy link
Member

Describe the solution you'd like
The loaders should be able to load the files in parallel. Ideally, all loaders, otherwise at least flash and lab.

Describe alternatives you've considered
With abstract class, it currenty can not pickle and unpickle data for multiprocessing module. Hence, it doesn't work. Using multiprocessing on the BaseLoader level would be ideal, as then things should work.

@zain-sohail zain-sohail added the enhancement New feature or request label Jan 27, 2023
@steinnymir
Copy link
Member

As discussed, I propose dropping the ABC library, if this creates problems.
The parallelization can be done in the hextof reader for the time being, but extending it to the base class is a good idea.

@rettigl
Copy link
Member

rettigl commented Jan 31, 2023

To be honst, I don't quite understand the issue, as I don't know how your loaders work. Why do you need to picle the class? Can't you create a delayed dataframe, like in the mpes and generic loader?
Anyways, looking at this: cloudpipe/cloudpickle#367 I would expect pickling to work (?)

@zain-sohail
Copy link
Member Author

To be honst, I don't quite understand the issue, as I don't know how your loaders work. Why do you need to picle the class? Can't you create a delayed dataframe, like in the mpes and generic loader? Anyways, looking at this: cloudpipe/cloudpickle#367 I would expect pickling to work (?)

Since we were using multiprocessing module to parallelize the data, it uses pickling in the backend to seralize the data.
I have found another library that does it using a backend 'loky' that does use cloudpickle and it seems to function. So I will close this issue for now.

@zain-sohail
Copy link
Member Author

@daviddoji we'd like to parallelize the loading of the files for all loaders. It could possibly be a method in the BaseLoader class. Do you have any suggestions regarding this?

@zain-sohail zain-sohail reopened this Feb 1, 2023
@daviddoji
Copy link
Collaborator

@daviddoji we'd like to parallelize the loading of the files for all loaders. It could possibly be a method in the BaseLoader class. Do you have any suggestions regarding this?

Take into account that I'm using a node in Maxwell cluster to do this, so in my use case, memory is less of an issue. Having said that, here are my suggestions:

  • If I want to bin the files, I'm using a combination of numba, for the binning process, and multiprocess to launch jobs for a chunk of files. I can show you this on the next meeting.

  • If I'm interested in the data without binning, I'm using pasha, a library developed by a colleague from the DA group at XFEL. Think of it as a "parallel for loop" that allocates an array in memory and, while reading the files, it dumps their content into the allocated array. I can also demonstrate it in the next meeting

@rettigl
Copy link
Member

rettigl commented Feb 1, 2023

I think I still don't understand. What is the advantage compared to handling the data access and parallelization with dask?
Any code that tries loading all data simultaneously into memory is destined to break eventually on any machine, as dtasets can become arbitrarily large.

@steinnymir
Copy link
Member

Let me contextualize this a little:

what we want to achieve is to transform each .h5 file, composed of multiple tables with 3 different indexes (very fast, fast and very slow). These need to be aligned to form a single table, which we save as a parquet file. This parquet file is then loaded again and we "explode" it to have all channels with the common "very fast" index.

We do this process using pandas and multiindex for each file. As each file is indipendent of the others, we can do this parallely, loading n-files simultaneously and generating the n parquet files, which we later can concatenate and explode when needed.

This worked in hextof processor, and @zainsohail04 is now trying to find a more elegant way to achieve this here, where the loader has a parent class which might be common to different loaders from different end-stations.

@daviddoji do you have suggestions in this direction? it is unrelated to he binning, only re-sorting tables with different indexes, if you want.

@rettigl
Copy link
Member

rettigl commented Feb 1, 2023

Why do you have to go the way via the parquet files? Can't you assemble and explode the columns on-the-fly and directly generate the delayed Dask dataframe from the h5 files, as the mpes loader does?

@daviddoji
Copy link
Collaborator

Let me contextualize this a little:

what we want to achieve is to transform each .h5 file, composed of multiple tables with 3 different indexes (very fast, fast and very slow). These need to be aligned to form a single table, which we save as a parquet file. This parquet file is then loaded again and we "explode" it to have all channels with the common "very fast" index.

We do this process using pandas and multiindex for each file. As each file is indipendent of the others, we can do this parallely, loading n-files simultaneously and generating the n parquet files, which we later can concatenate and explode when needed.

This worked in hextof processor, and @zainsohail04 is now trying to find a more elegant way to achieve this here, where the loader has a parent class which might be common to different loaders from different end-stations.

@daviddoji do you have suggestions in this direction? it is unrelated to he binning, only re-sorting tables with different indexes, if you want.

I'd never use pandas for anything involving 'fast' access. Did you have a look to polars?
Otherwise, if the dataset fits into memory, pasha is a possible solution which does parallelization and zero-copy of the data.

@steinnymir
Copy link
Member

I looked a little into polars, as it might help to generate the parquets we need in the first step. However it does not seem to support hdf5 and thy suggest to use pandas instead to load... any thoughts on this? @daviddoji

@daviddoji
Copy link
Collaborator

I looked a little into polars, as it might help to generate the parquets we need in the first step. However it does not seem to support hdf5 and thy suggest to use pandas instead to load... any thoughts on this? @daviddoji

Using pasha I'm able to load 12GiB of hdf5 files in 1.6 s
image

@zain-sohail
Copy link
Member Author

I looked a little into polars, as it might help to generate the parquets we need in the first step. However it does not seem to support hdf5 and thy suggest to use pandas instead to load... any thoughts on this? @daviddoji

We never really use the hdf5 method from pandas to read our data so I think directly using polars should work okay, no? We are just creating numpy arrays from the different groups that are defined in config.

Using pasha I'm able to load 12GiB of hdf5 files in 1.6 s

This also looks very promising for our FlashLoader at the very least.

@daviddoji
Copy link
Collaborator

As @steinnymir very well mentioned, you can't really read directly hdf5 directly into a polars df, but you can definitely use the same trick as I did above (creating a numpy array and then converted it to a df)

@steinnymir
Copy link
Member

This looks interesting, and you are right @zainsohail04, we read the hdf5 files with h5py, not pandas, so we might be able to use polars directly.
If polars can better handle our forward filling problem, this might be the solution all together! reading 12 GB in 1.6 seconds would be far faster than our dreams! 😄

@rettigl
Copy link
Member

rettigl commented Feb 2, 2023

I think we should understand where the bottleneck lies. @Prun can help.
12 GB in 1.6 seconds is almost 8 GByte/second. Can any long-term storage be so fast? Even NVMes can maybe do 3-4 GB/s, s far as I know. This looks to me either RAM caching, or the data are not completely read...

@daviddoji
Copy link
Collaborator

Definitively, you should time your loaders and, as @rettigl says, find where the bottleneck is.

Concerning the numbers I showed above, bare in mind that I run my code on Maxwell, which uses GPFS as filesystem. This is not an SSD from a laptop!
Regarding the code, low-level parallelization on shared-memory is what gives you such speeds. And yes, data is completely read (that's not a dask trick doing lazy operations ;))
The magic timeit, by default, turns off garbage collector and caching.

@steinnymir
Copy link
Member

@zain-sohail is this still relevant? FLASH has fully parallel loaders, and we don't really have other loaders to parallelize...

@zain-sohail
Copy link
Member Author

@zain-sohail is this still relevant? FLASH has fully parallel loaders, and we don't really have other loaders to parallelize...

I think improvements can def be made but flash loader is plenty fast as of now so closing for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants