Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make AsyncArrowWriter accepts AsyncFileWriter #5753

Merged
merged 1 commit into from
May 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,62 @@ use crate::{
};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::mem;
use tokio::io::{AsyncWrite, AsyncWriteExt};

/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`]
/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files
pub trait AsyncFileWriter: Send {
/// Write the provided bytes to the underlying writer
///
/// The underlying writer CAN decide to buffer the data or write it immediately.
/// This design allows the writer implementer to control the buffering and I/O scheduling.
///
/// The underlying writer MAY implement retry logic to prevent breaking users write process.
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document atomicity requirements for this method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find current ArrowWriter's design is also not safe to retry. I will update the behavior notes of write to make it more clear and just take the Vec<u8> here to make this PR more simple.


/// Flush any buffered data to the underlying writer and finish writing process.
///
/// After `complete` returns `Ok(())`, caller SHOULD not call write again.
fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
}

impl AsyncFileWriter for Box<dyn AsyncFileWriter> {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
self.as_mut().write(bs)
}

fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
self.as_mut().complete()
}
}

impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
async move {
self.write_all(&bs).await?;
Ok(())
}
.boxed()
}

fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
async move {
self.flush().await?;
self.shutdown().await?;
Ok(())
}
.boxed()
}
}

/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
///
/// ## Memory Usage
///
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`],
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
/// nature of parquet forces data for an entire row group to be buffered in memory, before
/// it can be flushed. Depending on the data and the configured row group size, this buffering
Expand Down Expand Up @@ -97,7 +146,7 @@ pub struct AsyncArrowWriter<W> {
async_writer: W,
}

impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer
pub fn try_new(
writer: W,
Expand Down Expand Up @@ -178,28 +227,25 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {

// Force to flush the remaining data.
self.do_write().await?;
self.async_writer.shutdown().await?;
self.async_writer.complete().await?;

Ok(metadata)
}

/// Flush the data written by `sync_writer` into the `async_writer`
///
/// # Notes
///
/// This method will take the inner buffer from the `sync_writer` and write it into the
/// async writer. After the write, the inner buffer will be empty.
async fn do_write(&mut self) -> Result<()> {
let buffer = self.sync_writer.inner_mut();

self.async_writer
.write_all(buffer.as_slice())
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
let buffer = mem::take(self.sync_writer.inner_mut());

self.async_writer
.flush()
.write(Bytes::from(buffer))
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

// reuse the buffer.
buffer.clear();

Ok(())
}
}
Expand Down
Loading