Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converting large files #152

Closed
Nimi42 opened this issue Mar 29, 2019 · 45 comments
Closed

Converting large files #152

Nimi42 opened this issue Mar 29, 2019 · 45 comments

Comments

@Nimi42
Copy link

Nimi42 commented Mar 29, 2019

When converting large files to parquet or a Pandas Dataframe, I Always get a Memory
error. I was wondering, if it is possible to have some kind of low Memory mode or even better Streaming mode.

Essentially I want parquet files, but I saw that asammdf converts the .dat, mf4 etc... files to
a Pandas DataFrame under the Hood anyway and uses the result to export to Parquet.

So I was playing around with the Code trying to cast the columns to more appropriate dtypes.

def _downcast(self, src_series):
    if np.issubdtype(src_series.dtype, np.unsignedinteger):
        res_series = pd.to_numeric(src_series, downcast='unsigned')
    elif np.issubdtype(src_series.dtype, np.signedinteger):
        if src_series.min() < 0:
            res_series = pd.to_numeric(src_series, downcast='signed')
        else:
            res_series = pd.to_numeric(src_series, downcast='unsigned')
    elif np.issubdtype(src_series.dtype, np.floating):
        res_series = pd.to_numeric(src_series, downcast='float')
    else:
        res_series = src_series.astype('category')

    return res_series

It saves some memory, but unfortunately this is not enough. I do have some files that are 5Gb or larger. When converted to a DataFrame they get inflated to beyond 20Gbs.

Any help is appreciated.

@danielhrisca
Copy link
Owner

Hello @Nimi42 ,

I will have a look at this over the week-end. The dstaframe aproach was quick and convenient, I'm sure we can get a significant memory usage at the expense of slower execution performance

@danielhrisca
Copy link
Owner

@Nimi42 It looks like the parquet format is not friendly towards adding new columns/channels dask/fastparquet#244

@danielhrisca
Copy link
Owner

danielhrisca commented Mar 30, 2019

@Nimi42
I've added the argument reduce_memory_usage to to_dataframe https://asammdf.readthedocs.io/en/development/api.html#asammdf.mdf.MDF.to_dataframe.

results on my test file:

Convert to pandas.DataFrame Time [ms] RAM [MB]
asammdf 5.3.0.dev0 reduce=True 12977 1925
asammdf 5.3.0.dev0 reduce=False 13852 3140

@danielhrisca
Copy link
Owner

@Nimi42

I've made some improvements to the export method:

@Nimi42
Copy link
Author

Nimi42 commented Apr 1, 2019

@danielhrisca

Wow... You're damn quick.

I've been researching a bit over the Weekend and I found the following resources.
Not sure if this is a viable solution though.

First one is an issue on the fastparquet GitHub dask/fastparquet#72. If I understand this correctly, it might still be possible to write massive files.

Second one is a Question (and answer) on stackoverflow. It uses pyarrow though. I think pyarrow is
going to include Streaming soon or already, but the proposed answer splits it into a Collection of many files, which is ok for parquet files according to one of the commentors (Wes McKinney), who is part of the pyarrow development Team I think. https://stackoverflow.com/questions/46157709/converting-hdf5-to-parquet-without-loading-into-memory

Edit:
Damn…. I just learned that Wes McKinney is one of the authors of Pandas.

@danielhrisca
Copy link
Owner

Do you still get memroyerror with the new code?

@Nimi42
Copy link
Author

Nimi42 commented Apr 1, 2019

@danielhrisca

Did not try yet. But I have some files here that are About 4 Gb in size, which were inflated to over 20 Gbs and I have another one, which is 8 Gb in size. I did not even try that one yet.

Since you can optimise the Performance at the appropriate Level this might already be enough. I just wanted to throw in some Options. I will definitely try your new Version out first.

cheers

@danielhrisca
Copy link
Owner

danielhrisca commented Apr 1, 2019

I will look into them it would be a lot better if we could add the channels sequentially to the parquet file

@Nimi42
Copy link
Author

Nimi42 commented Apr 2, 2019

I've tested the new Code. It's about as Memory efficient as my work-around, but faster. Unfortunately,
spark can't handle the unsigned integers in the parquet files. I changed my work-around later to avoid unsigned integers, but I guess it would be to much to expect asammdf to account for spark behaviour.

Damn… sry to bother you with all these requests. I just really need these features.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

I can convert the respective columns manually with pyarrow afterwards, so I guess that's left for me to handle.

@legout
Copy link

legout commented Apr 3, 2019

I will look into them it would be a lot better if we could add the channels sequentially to the parquet file

Hi Daniel,

as you already know, I also had/have memory problems loading large mdf-files into pandas dataframes. Therefore I was playing around with pyarrow last week. I ended up, with a function that converts the data/samples of all channels of a data group into a pyarrow table. This table can be written to parquet files (one for each data group if not using a unique timebase) easily, without the memory issues.

Furthermore, converting this pyarrow tables to a pandas dataframe is also very fast and memory friendly.

I´ll post my function here next week, when I am back from holidays. However, implementing this is straight forward, so maybe you already realized it on your own.

Regards,
legout

@danielhrisca
Copy link
Owner

Hello @legout ,

my concern was indeed related to the single_time_base functionality.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

I must admit that I do not understand the MDF Format very well. What is a data group? All Signals with a common time base?

@danielhrisca
Copy link
Owner

MDF is designed to handle signals with different sampling rates. Inside a channel group all the channels have the same timebase.

Parquet as far as I see is designed for signals with the same sampling rate. This is why it is difficult to export a MDF file that may contain different sampling rates, to parquet.

@legout
Copy link

legout commented Apr 3, 2019

Hello @legout ,

my concern was indeed related to the single_time_base functionality.

Hi Daniel,

when using single_time_base, there will be only one pyarrow table. Adding columns/channels to this table and/or writing that table or just some columns/channels to a parquet file is possible.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

Multiple parquet files for one mdf file are definitely a valid use case, as Long as it is possible to roughly control the the size of each chunk.

@danielhrisca
Copy link
Owner

This still means that you have to hold the table in memory until all columns are ready, before writing the parquet file, isn't it? Can you write new columns to an existing parquet file/folder structure?

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

I guess it would be to complicated to read the MDF file partially in terms of rows and write multiple parquet files? (When using a single time base for all columns)

@legout
Copy link

legout commented Apr 3, 2019

Yes. The table is still in memory. However, using pandas to write the data to parquet produces a lot of overhead. Therefore, using pyarrow is much more memory friendly.

@danielhrisca
Copy link
Owner

Ok, I will have a crack at this

@legout
Copy link

legout commented Apr 3, 2019

I guess it would be to complicated to read the MDF file partially in terms of rows and write multiple parquet files? (When using a single time base for all columns)

I think this won´t reduce memory usage (when using pyarrow), because even when splitting the table into several row groups for writing, you have to hold the whole table in memory (as mentioned by daniel).

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

I think this won´t reduce memory usage (when using pyarrow), because even when splitting the table into several row groups for writing, you have to hold the whole table in memory (as mentioned by daniel).

Right... but there is a method called cut. I thought something along the lines of loading the MDF file partially (not necessarily with the cut method. I just want to illustrate my point) and then writing out a parquet file then loading the next part of the MDF file and creating another Parquet file with the same Schema. For Frameworks like spark it shouldn't make a difference.

In the ideal case, you'd be able to create parquet files of a particular chunk size.

@danielhrisca
Copy link
Owner

This would hurt performance a lot

@legout
Copy link

legout commented Apr 3, 2019

I think this won´t reduce memory usage (when using pyarrow), because even when splitting the table into several row groups for writing, you have to hold the whole table in memory (as mentioned by daniel).

Right... but there is a method called cut. I thought something along the lines of loading the MDF file partially (not necessarily with the cut method. I just want to illustrate my point) and then writing out a parquet file then loading the next part of the MDF file and creating another Parquet file with the same Schema. For Frameworks like spark it shouldn't make a difference.

In the ideal case, you'd be able to create parquet files of a particular chunk size.

What do you wanna do with that chunked parquet files? Loading all data from the parquet file into a pandas dataframe? If so, you´ll have the same memory issues as before.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

This would hurt performance a lot

That's too bad. Unfortunately, I do have MDF files that do not fit into memory as a whole. So I can't think of any other way.

Is it possible to somehow get the min and max values of a unified time base? Then I'd implement my own function using cut.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

What do you wanna do with that chunked parquet files? Loading all data from the parquet file into a pandas dataframe? If so, you´ll have the same memory issues as before.

Machine learning and analysis with spark. It would be in a cluster that can handle big data (multiple of these MDF files). Let's say 500 MDF files pre-processed and converted to parquet.

@legout
Copy link

legout commented Apr 3, 2019

Than i´ll suggest splitting the mdf-files into "column"-chunks using mdf.select instead of splitting into "row"-chunks. Write this "column"-chunks into parquet files and concatenate the data into one spark dataframe inside the cluster.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

Than i´ll suggest splitting the mdf-files into "column"-chunks using mdf.select instead of splitting into "row"-chunks. Write this "column"-chunks into parquet files and concatenate the data into one spark dataframe inside the cluster.

That should work. I'll try that for now.

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

@legout

Now that I think about it... it would not work. If take a number of channels and convert them to parquet, they would not have the same time base.

I guess it would be easy to calculate a unified time base and use it to convert a subset of channels to parquet, but at the moment asammdf does not provide a method to get this unified time base….
and more importantly use it to convert this subset.

It would need some slight changes in the library to make that workflow possible.

@legout
Copy link

legout commented Apr 3, 2019

You can calculate it on your own.

mdf = asammdf.MDF('file.mf4')
channels = list(mdf.channels_db.keys()) # replace this by a list of channels if not all channels should be used
raster = 0.1 # you can also use a specific channel name

ch_grp_idx = defaultdict(list)
for channel in channels:
    grp, idx = mdf.channels_db[channel][0]
    ch_grp_idx[grp].append([channel, grp, idx])

if isinstance(raster, str):
    grp, idx = mdf.channels_db[raster][0]
    timeNew = smdf.get(group=grp, index=idx).timestamps
else:
    tmin = []
    tmax = []
    grp_idx = []
    for grp in ch_grp_idx:
        idx = self._mdf.masters_db[grp]
        grp_idx.append([None, grp, idx])
    signals = mdf.select(grp_idx)
    for signal in signals:
        if len(signal.samples) != 0:
            tmin.append(signal.samples.min())
            tmax.append(signal.samples.max())
    timeNew = np.arange(np.min(tmin), np.max(tmax) + raster, raster)

if time_from_zero:
    timeNew = np.round(timeNew-timeNew[0], 3)
else:
    timeNew = np.round(timeNew, 3)

@Nimi42
Copy link
Author

Nimi42 commented Apr 3, 2019

@legout

That is true, but the to_dataframe method does not allow me to use my own time base. I can hack it into the library, but the api atm is not built for such a use case (is what I am getting at).

But thanks for the code. I'll use it as work around for now.

@Nimi42
Copy link
Author

Nimi42 commented Apr 4, 2019

@legout @danielhrisca

Thanks guys. I probably shouldn't use the thread to do this, but I just wanted to thank you for you help.
I 've played around with your suggestions and have now a reliable way to convert large dat files to parquet.

@legout
Copy link

legout commented Apr 4, 2019

No Problem.
If you don´t mind, can you share your code with me/us?

@Nimi42
Copy link
Author

Nimi42 commented Apr 4, 2019

@legout
Of Course. I just want to clean it up if that's ok.

@Nimi42
Copy link
Author

Nimi42 commented Apr 5, 2019

I should note that it is necessary to Change asammdf library Code to accept your own time base. I posted the Code here but I felt like it was to much for this thread so ive deleted it again. You can message me if necessary. Cheers

@legout
Copy link

legout commented Apr 5, 2019

Hi Nimi42,

thanks so far. Can you contact me please, as it seems you did not make your E-Mail public.

Regards,
legout

@danielhrisca
Copy link
Owner

You can calculate it on your own.

A faster way to find the min and max timestamps is this https://github.com/danielhrisca/asammdf/blob/development/asammdf/mdf.py#L2468

@danielhrisca
Copy link
Owner

hat is true, but the to_dataframe method does not allow me to use my own time base. I can hack it into the library, but the api atm is not built for such a use case (is what I am getting at).

I will implement something similar to the new resample method for to_dataframe as well https://asammdf.readthedocs.io/en/development/api.html#asammdf.mdf.MDF.resample

@Nimi42
Copy link
Author

Nimi42 commented Apr 8, 2019

@Nimi42
I've made some improvements to the export method:
you can use compression for the output (see compression keyword argument https://asammdf.readthedocs.io/en/development/api.html#asammdf.mdf.MDF.export)
reduce_memory_usage argument was added to the export method and can be used with parquet and if single_time_base argument is True

I took a look at the Code. If I understand correctly the savings are just from numerical types. Another big memory save I got was from using the pandas categorical type for objects. I have a lot of columns in my data that are composed of repeated strings. That might not be the case for everybody, but an approach like the following might be interesting.

https://www.dataquest.io/blog/pandas-big-data/

We’ll write a loop to iterate over each object column, check if the number of unique values is less than 50%, and if so, convert it to the category type.

for col in gl_obj.columns:
    num_unique_values = len(gl_obj[col].unique())
    num_total_values = len(gl_obj[col])
    if num_unique_values / num_total_values < 0.5:
        converted_obj.loc[:,col] = gl_obj[col].astype('category')
    else:
        converted_obj.loc[:,col] = gl_obj[col]

@danielhrisca
Copy link
Owner

@Nimi42

good idea, please check if the latest development code works as intended

@danielhrisca
Copy link
Owner

I will implement something similar to the new resample method for to_dataframe as well https://asammdf.readthedocs.io/en/development/api.html#asammdf.mdf.MDF.resample

Now you can use an arbitrary array or a channel's timestamps as raster in the to_dataframe (see to_dataframe documentation)

@danielhrisca
Copy link
Owner

I think we can close this. re-open if necessary

@danielhrisca
Copy link
Owner

In the development branch (soon to be 5.15.0) I have implemented the method iter_to_dataframe that can be used to yield pandas dataframes that should not exceed 200MB of RAM.

The memory requirement is to have enough RAM for:

  • the MDF object
  • all master channel arrays
  • the union of all the master channel arrays
  • the yielded datfram (~200MB)

Please try it out and see if works correctly.

@ronitch
Copy link

ronitch commented Jun 1, 2022

Thanks guys. I probably shouldn't use the thread to do this, but I just wanted to thank you for you help. I 've played around with your suggestions and have now a reliable way to convert large dat files to parquet.

@Nimi42
Hi! Could you please share the code with me? It will be extremely helpful. Thanks!

@hongmofangjay
Copy link

I should note that it is necessary to Change asammdf library Code to accept your own time base. I posted the Code here but I felt like it was to much for this thread so ive deleted it again. You can message me if necessary. Cheers

Please share your code with me! The memory expands four times when I convert mf4 files. I need your hints. Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants