Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ impl BatchSerializer for CsvSerializer {
self.header = false;
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
let new_self = CsvSerializer::new()
.with_builder(self.builder.clone())
.with_header(self.header);
self.header = false;
Ok(Box::new(new_self))
}
}

/// Implements [`DataSink`] for writing to a CSV file.
Expand Down Expand Up @@ -579,6 +587,7 @@ impl DataSink for CsvSink {
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl BatchSerializer for JsonSerializer {
//drop(writer);
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Ok(Box::new(JsonSerializer::new()))
}
}

/// Implements [`DataSink`] for writing to a Json file.
Expand Down Expand Up @@ -364,6 +368,7 @@ impl DataSink for JsonSink {
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
Expand Down
278 changes: 219 additions & 59 deletions datafusion/core/src/datasource/file_format/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, FileCompression

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_execution::RecordBatchStream;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};

/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores.
/// It is specifically designed for the `object_store` crate's `put` method and sends
Expand Down Expand Up @@ -237,29 +240,11 @@ pub enum FileWriterMode {
pub trait BatchSerializer: Unpin + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}

/// Checks if any of the passed writers have encountered an error
/// and if so, all writers are aborted.
async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
result: Result<T>,
writers: &mut [AbortableWrite<W>],
) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(e) => {
// Abort all writers before returning the error:
for writer in writers {
let mut abort_future = writer.abort_writer();
if let Ok(abort_future) = &mut abort_future {
let _ = abort_future.await;
}
// Ignore errors that occur during abortion,
// We do try to abort all writers before returning error.
}
// After aborting writers return original error.
Err(e)
}
/// Duplicates self to support serializing multiple batches in parallel on multiple cores
fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Err(DataFusionError::NotImplemented(
"Parallel serialization is not implemented for this file type".into(),
))
}
}

Expand Down Expand Up @@ -315,58 +300,233 @@ pub(crate) async fn create_writer(
}
}

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type SerializerType = Box<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
async fn serialize_rb_stream_to_object_store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some ideas how to simplify this code, which I will try out shortly, but I also think it can be merged like this too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to rewrite this as a futures::stream computation using buffered -- but I got stuck on some "higher-ranked lifetime error" so I think this is about as good as it is going to get

mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
unbounded_input: bool,
) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType, DataFusionError)>
{
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);

let serialize_task = tokio::spawn(async move {
while let Some(maybe_batch) = data_stream.next().await {
match serializer.duplicate() {
Ok(mut serializer_clone) => {
let handle = tokio::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
});
tx.send(handle).await.map_err(|_| {
DataFusionError::Internal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define the return error as a closure at the beginning of the method to make it common.

"Unknown error writing to object store".into(),
)
})?;
if unbounded_input {
tokio::task::yield_now().await;
}
}
Err(_) => {
return Err(DataFusionError::Internal(
"Unknown error writing to object store".into(),
))
}
}
}
Ok(serializer)
});

let mut row_count = 0;
while let Some(handle) = rx.recv().await {
match handle.await {
Ok(Ok((cnt, bytes))) => {
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(e) => {
return Err((
writer,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
))
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
writer,
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
));
}
}
}

let serializer = match serialize_task.await {
Ok(Ok(serializer)) => serializer,
Ok(Err(e)) => return Err((writer, e)),
Err(_) => {
return Err((
writer,
DataFusionError::Internal("Unknown error writing to object store".into()),
))
}
};
Ok((serializer, writer, row_count as u64))
}

/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
pub(crate) async fn stateless_serialize_and_write_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocks inside the match serialize_rb_stream_to_object_store can be refactored into separate functions for better readability and maintainability.

pub(crate) async fn stateless_serialize_and_write_files(
    data: Vec<SendableRecordBatchStream>,
    mut serializers: Vec<Box<dyn BatchSerializer>>,
    mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
    single_file_output: bool,
    unbounded_input: bool,
) -> Result<u64> {
    if single_file_output {
        if serializers.len() != 1 || writers.len() != 1 {
            return internal_err!("single_file_output is true, but got more than 1 writer!");
        }
        return serialize_single_file(data, serializers.remove(0), writers.remove(0), unbounded_input).await;
    } else {
        if data.len() != writers.len() {
            return internal_err!("Did not get 1 writer for each output partition!");
        }
        return serialize_multiple_files(data, serializers, writers, unbounded_input).await;
    }
}

mut data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<Box<dyn BatchSerializer>>,
mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<SerializerType>,
mut writers: Vec<WriterType>,
single_file_output: bool,
unbounded_input: bool,
) -> Result<u64> {
if single_file_output && (serializers.len() != 1 || writers.len() != 1) {
return internal_err!("single_file_output is true, but got more than 1 writer!");
}
let num_partitions = data.len();
if !single_file_output && (num_partitions != writers.len()) {
let num_writers = writers.len();
if !single_file_output && (num_partitions != num_writers) {
return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!");
}
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
// TODO parallelize serialization accross partitions and batches within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) {
let idx = match single_file_output {
false => part_idx,
true => 0,
};
while let Some(maybe_batch) = data_stream.next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
// tracks if any writers encountered an error triggering the need to abort
let mut any_errors = false;
// tracks the specific error triggering abort
let mut triggering_error = None;
// tracks if any errors were encountered in the process of aborting writers.
// if true, we may not have a guarentee that all written data was cleaned up.
let mut any_abort_errors = false;
match single_file_output {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be if single_file_output { ... } else { ... }

false => {
let mut join_set = JoinSet::new();
for (data_stream, serializer, writer) in data
.into_iter()
.zip(serializers.into_iter())
.zip(writers.into_iter())
.map(|((a, b), c)| (a, b, c))
{
join_set.spawn(async move {
serialize_rb_stream_to_object_store(
data_stream,
serializer,
writer,
unbounded_input,
)
.await
});
}
let mut finished_writers = Vec::with_capacity(num_writers);
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => match res {
Ok((_, writer, cnt)) => {
finished_writers.push(writer);
row_count += cnt;
}
Err((writer, e)) => {
finished_writers.push(writer);
any_errors = true;
triggering_error = Some(e);
}
},
Err(e) => {
// Don't panic, instead try to clean up as many writers as possible.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join errors (perhaps because a thread was killed by the OS?) could also result in non atomic writes, since we would have no way to recover ownership of the writer and abort it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should return the error after aborting the writer, in case an execution error occurs within the plans.

// If we hit this code, ownership of a writer was not joined back to
// this thread, so we cannot clean it up (hence any_abort_errors is true)
any_errors = true;
any_abort_errors = true;
triggering_error = Some(DataFusionError::Internal(format!(
"Unexpected join error while serializing file {e}"
)));
}
}
}

// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be if any_errors { ... } else { ... }

true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => {
writer.shutdown()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the trickiest case to ensure the write is atomic. Suppose we have two writers A and B. Writer A could successfully commit and shutdown. Then, before Writer B can complete, a network or hardware fault could prevent Writer B from either finalizing or Writer A from Aborting.

For this to be atomic, we would need some way to simultaneously commit all or none of our multipart writers. I don't think ObjectStores (S3 ect) support a way to do that.

Downstream table providers could make this atomic in practice via an atomic metadata operation, which is I believe how DeltaLake and friends work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that if someone wants atomic commit/rollback they should build that in at a higher level than datafusion -- there isn't much we can do with just the object store API

.await
.map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?;
}
}
}
}
true => {
let mut writer = writers.remove(0);
let mut serializer = serializers.remove(0);
let mut cnt;
for data_stream in data.into_iter() {
(serializer, writer, cnt) = match serialize_rb_stream_to_object_store(
data_stream,
serializer,
writer,
unbounded_input,
)
.await
{
Ok((s, w, c)) => (s, w, c),
Err((w, e)) => {
any_errors = true;
triggering_error = Some(e);
writer = w;
break;
}
};
row_count += cnt;
}
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => writer.shutdown().await?,
}
}
}
// Perform cleanup:
let n_writers = writers.len();
for idx in 0..n_writers {
check_for_errors(
writers[idx].shutdown().await.map_err(err_converter),
&mut writers,
)
.await?;

if any_errors {
match any_abort_errors{
true => return Err(DataFusionError::Internal("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written.".into())),
false => match triggering_error {
Some(e) => return Err(e),
None => return Err(DataFusionError::Internal("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.".into()))
}
}
}
Ok(row_count as u64)

Ok(row_count)
}
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
writer_mode,
unbounded_input: self.options().infinite_source,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
Expand Down
Loading