Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an async ParquetWriter for arrow #1269

Closed
alamb opened this issue Feb 4, 2022 · 19 comments · Fixed by #3957
Closed

Provide an async ParquetWriter for arrow #1269

alamb opened this issue Feb 4, 2022 · 19 comments · Fixed by #3957
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog help wanted parquet Changes to the parquet crate

Comments

@alamb
Copy link
Contributor

alamb commented Feb 4, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As it is nice to be able to read parquet files using rust async IO #111, it would be nice to write them as well

 
Describe the solution you'd like
A async writer similar in spirit to the reader created in #111

@alamb alamb added parquet Changes to the parquet crate enhancement Any new improvement worthy of a entry in the changelog help wanted labels Feb 4, 2022
@xudong963
Copy link
Member

I'll try it @alamb

@alamb
Copy link
Contributor Author

alamb commented Feb 5, 2022

Thanks @xudong963 - I personally suggest starting with the API design. As in write an example program showing how you would use the feature. Something like the following (sorting out what types to use for streams, etc):

async fn main() { 
  // get output stream
  let writer = AsyncParquet::new(...);
  // write batches to the writer somehow (not sure how??)
  for batch in batches {
    ....
  }
}

@xudong963
Copy link
Member

Thanks for your nice suggestion! @alamb, I'll start the task tomorrow.

@xudong963
Copy link
Member

I felt the need to provide these new APIs and structs.

pub struct AsyncArrowWriter<W: W: AsyncWrite + Unpin + Send> {
    writer: FileStream<W>,
    ...
}

pub struct FileStream<W: AsyncWriter + Send + Unpined> {
    writer: W
}

impl <...> FileStream {
    pub async fn write(&mut self, ...) -> Result<> {
        // 1. firstly, write header
        write_header().await?;
        // 2. secondly, write rowgroups
        write_row_groups().await?;
        // 3. thirdly, write metadata
        write_metadata().await?;
    }
}

I noticed that we have a buffer optimization in the sync write part. I'm not sure if we need to keep the buffer in the async write part.

@tustvold
Copy link
Contributor

tustvold commented Feb 6, 2022

I noticed that we have a buffer optimization in the sync write part. I'm not sure if we need to keep the buffer in the async write part.

Are you referring to the buffer added in #1214 as it would be really cool to keep that if possible? RecordBatch are likely significantly smaller than the ideal size for a row group

@xudong963
Copy link
Member

@tustvold Yep, thanks! I'll take a look at #1214 and reorganize my thoughts.

@ShiKaiWi
Copy link
Member

@tustvold Hi, this feature will help us a lot if it is implemented. Is there anyone working on this issue? I'm glad to try it if not.

@xudong963
Copy link
Member

@ShiKaiWi Thanks! Feel free to try it. I'm busy with other things.

@tustvold
Copy link
Contributor

I wonder if this could simply be implemented by adding an inner_mut method to ArrowWriter.

This would then allow providing a Vec<u8> as the writer, and then periodically gaining access to it and flushing its contents asynchronously. We could provide an AsyncWriter that encapsulates this logic, but we could also just provide a code example in a doc comment.

The nature of parquet is that an entire row group is buffered up and written in one shot, as data for different columns cannot be interleaved, so I'm not sure it is possible to do better than this

@ShiKaiWi
Copy link
Member

@tustvold Thanks for your quick response. Here are my some thoughts about your proposal:

This would then allow providing a Vec as the writer, and then periodically gaining access to it and flushing its contents asynchronously. We could provide an AsyncWriter that encapsulates this logic, but we could also just provide a code example in a doc comment.

I guess the periodically gaining access is not an elegant way (but it indeed can solve the problem we encounter), because the polling frequency can't be determined easily, that is to say, polling with high frequency may introduce overhead and with slow frequency may lead to high memory usage by the inner writer Vec<u8>.

The nature of parquet is that an entire row group is buffered up and written in one shot, as data for different columns cannot be interleaved, so I'm not sure it is possible to do better than this

For now, I doesn't know the details of the writer very well, but I guess the buffer logic can still be kept in the async writer.

@tustvold
Copy link
Contributor

tustvold commented Mar 15, 2023

because the polling frequency can't be determined easily

Checking after each call to write should be fine. A more sophisticated writer could track the number of written rows and only check once they exceed the max row group size, in practice this is highly unlikely to be make a tangible performance difference

may lead to high memory usage by the inner writer Vec

Assuming a single RecordBatch does not exceed the maximum size of a row group, the above approach should be optimal in this respect. In practice the parquet data is so much smaller than the corresponding arrow data, that the size of this buffer is likely irrelevant in the grand scheme of things.

guess the buffer logic can still be kept in the async writer

If we can share as much between the sync and async implementations as possible, that would be beneficial. The separate async read path, whilst a necessary evil, is a non-trivial maintenance burden

@tustvold
Copy link
Contributor

FYI I've created #3871 to track reducing the buffering of arrow data, this won't change the need to buffer an entire row group's data before flushing, but would allow buffering parquet encoded data instead of arrow data, which may reduce memory pressure.

@ShiKaiWi
Copy link
Member

ShiKaiWi commented Mar 15, 2023

Checking after each call to write should be fine. A more sophisticated writer could track the number of written rows and only check once they exceed the max row group size, in practice this is highly unlikely to be make a tangible performance difference

👍. This way is good enough to solve the problem we encounter.

share as much between the sync and async implementations as possible

After digging into the code, I find this is the hardest part to implement async writer.

I guess your suggested way is the best workaround before the most codes of the sync and async implementation can be shared.

@ShiKaiWi
Copy link
Member

@tustvold Maybe I can solve this issue by implementing your proposal:

We could provide an AsyncWriter that encapsulates this logic, but we could also just provide a code example in a doc comment.

Please let me know when you make the decision.

@tustvold
Copy link
Contributor

Please let me know when you make the decision.

How about we implement an AsyncWriter and see what it looks like, using the buffering approach, and see what it looks like

@ShiKaiWi
Copy link
Member

Please let me know when you make the decision.

How about we implement an AsyncWriter and see what it looks like, using the buffering approach, and see what it looks like

Ok. I'll give it a try.

@ShiKaiWi
Copy link
Member

ShiKaiWi commented Mar 16, 2023

Please let me know when you make the decision.

How about we implement an AsyncWriter and see what it looks like, using the buffering approach, and see what it looks like

@tustvold I find it is hard for me to implement the AsyncWriter by sharing the codes with current sync writer. And I try the initial proposal by you, which looks simple and pretty: ShiKaiWi@f62cedc
What do you think about the demo? I'll polish it up and make a PR if you feel it is OK.

@tustvold
Copy link
Contributor

Looks good to me, I would consider putting it in a separate async_writer module behind the async feature flag, but otherwise it looks 👌

@ShiKaiWi
Copy link
Member

ShiKaiWi commented Mar 16, 2023

I would consider putting it in a separate async_writer module

Of course, I will make it in the formal PR.

tustvold pushed a commit that referenced this issue Mar 28, 2023
* feat: support async writer

* fix: clippy warnings and test failure

* fix: broken docs

* feat: flush the inner async writer when threshold is exceeded
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog help wanted parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants