From 86aba9687a0a2b51f98a1ebb9a90cd386ecb5695 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 30 Aug 2023 11:47:50 -0400 Subject: [PATCH 01/14] implement tokio task spawning --- .../core/src/datasource/file_format/csv.rs | 8 ++ .../core/src/datasource/file_format/json.rs | 4 + .../core/src/datasource/file_format/write.rs | 133 +++++++++++++----- 3 files changed, 111 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 4d2caccacd0d..6f235c37ff31 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -432,6 +432,14 @@ impl BatchSerializer for CsvSerializer { self.header = false; Ok(Bytes::from(self.buffer.drain(..).collect::>())) } + + fn duplicate(&mut self) -> Result>{ + let new_self = CsvSerializer::new() + .with_builder(self.builder.clone()) + .with_header(self.header); + self.header = false; + Ok(Box::new(new_self)) + } } /// Implements [`DataSink`] for writing to a CSV file. diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 27b17d86f9b9..a5778c16c4a0 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -221,6 +221,10 @@ impl BatchSerializer for JsonSerializer { //drop(writer); Ok(Bytes::from(self.buffer.drain(..).collect::>())) } + + fn duplicate(&mut self) -> Result>{ + Ok(Box::new(JsonSerializer::new())) + } } /// Implements [`DataSink`] for writing to a Json file. diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 272eee1fbcc3..73b905dd4353 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -33,12 +33,14 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, FileCompression use async_trait::async_trait; use bytes::Bytes; +use datafusion_execution::RecordBatchStream; use futures::future::BoxFuture; use futures::FutureExt; use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::task::{JoinSet, self, JoinHandle}; /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends @@ -237,6 +239,10 @@ pub enum FileWriterMode { pub trait BatchSerializer: Unpin + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. async fn serialize(&mut self, batch: RecordBatch) -> Result; + /// Duplicates self to support serializing multiple batches in parralell on multiple cores + fn duplicate(&mut self) -> Result>{ + return Err(DataFusionError::NotImplemented("Parallel serialization is not implemented for this file type".into())) + } } /// Checks if any of the passed writers have encountered an error @@ -315,13 +321,56 @@ pub(crate) async fn create_writer( } } +async fn serialize_rb_stream_to_object_store( + mut data_stream: Pin>, + mut serializer: Box, + mut writer: AbortableWrite> + ) -> Result{ + let mut row_count = 0; + // Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order + let mut serialize_tasks: Vec>> = Vec::new(); + while let Some(maybe_batch) = data_stream.next().await { + let mut serializer_clone = serializer.duplicate()?; + serialize_tasks.push(task::spawn( + async move { + let batch = maybe_batch?; + let num_rows = batch.num_rows(); + let bytes = serializer_clone.serialize(batch).await?; + Ok((num_rows, bytes)) + } + )); + } + for serialize_result in serialize_tasks { + let result = serialize_result.await; + match result { + Ok(res) => { + let (cnt, bytes) = res?; + row_count += cnt; + writer.write_all(&bytes) + .await + .map_err(|_| DataFusionError::Internal("Unexpected FileSink Error".to_string()))?; + }, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + writer.shutdown().await?; + Ok(row_count as u64) + } + /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. /// each RecordBatch can be serialized without any /// dependency on the RecordBatches before or after. pub(crate) async fn stateless_serialize_and_write_files( - mut data: Vec, + data: Vec, mut serializers: Vec>, mut writers: Vec>>, single_file_output: bool, @@ -334,39 +383,55 @@ pub(crate) async fn stateless_serialize_and_write_files( return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); } let mut row_count = 0; - // Map errors to DatafusionError. - let err_converter = - |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match single_file_output { - false => part_idx, - true => 0, - }; - while let Some(maybe_batch) = data_stream.next().await { - // Write data to files in a round robin fashion: - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; + match single_file_output { + false => { + let mut join_set = JoinSet::new(); + for (data_stream, serializer, writer) in + data + .into_iter() + .zip(serializers.into_iter()) + .zip(writers.into_iter()) + .map(|((a,b), c)| (a,b,c)) + { + join_set.spawn(async move { + serialize_rb_stream_to_object_store(data_stream, serializer, writer).await + } + ); + } + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => { + let cnt = res?; + row_count += cnt; + }, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } } + }, + true => { + for mut data_stream in data.into_iter() + { + let serializer = &mut serializers[0]; + let writer = &mut writers[0]; + while let Some(maybe_batch) = data_stream.next().await { + // Write data to files in a round robin fashion: + let batch = maybe_batch?; + row_count += batch.num_rows() as u64; + let bytes = serializer.serialize(batch).await?; + writer.write_all(&bytes) + .await + .map_err(|_| DataFusionError::Internal("Unexpected FileSink Error".to_string()))?; + } + } + writers[0].shutdown().await?; + } } - // Perform cleanup: - let n_writers = writers.len(); - for idx in 0..n_writers { - check_for_errors( - writers[idx].shutdown().await.map_err(err_converter), - &mut writers, - ) - .await?; - } - Ok(row_count as u64) + + Ok(row_count) } From d793ec525d795f2df39f8c6ab1cf6e0745850cbf Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 31 Aug 2023 09:43:39 -0400 Subject: [PATCH 02/14] improve error handling --- .../core/src/datasource/file_format/csv.rs | 2 +- .../core/src/datasource/file_format/json.rs | 2 +- .../core/src/datasource/file_format/write.rs | 256 +++++++++++------- datafusion/core/tests/fifo.rs | 1 + 4 files changed, 162 insertions(+), 99 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 6f235c37ff31..c0dfa7ff6a1d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -433,7 +433,7 @@ impl BatchSerializer for CsvSerializer { Ok(Bytes::from(self.buffer.drain(..).collect::>())) } - fn duplicate(&mut self) -> Result>{ + fn duplicate(&mut self) -> Result> { let new_self = CsvSerializer::new() .with_builder(self.builder.clone()) .with_header(self.header); diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index a5778c16c4a0..59caf0dcb2d4 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -222,7 +222,7 @@ impl BatchSerializer for JsonSerializer { Ok(Bytes::from(self.buffer.drain(..).collect::>())) } - fn duplicate(&mut self) -> Result>{ + fn duplicate(&mut self) -> Result> { Ok(Box::new(JsonSerializer::new())) } } diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 73b905dd4353..3e5579308e85 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -40,7 +40,7 @@ use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::task::{JoinSet, self, JoinHandle}; +use tokio::task::{self, JoinHandle, JoinSet}; /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends @@ -240,32 +240,10 @@ pub trait BatchSerializer: Unpin + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. async fn serialize(&mut self, batch: RecordBatch) -> Result; /// Duplicates self to support serializing multiple batches in parralell on multiple cores - fn duplicate(&mut self) -> Result>{ - return Err(DataFusionError::NotImplemented("Parallel serialization is not implemented for this file type".into())) - } -} - -/// Checks if any of the passed writers have encountered an error -/// and if so, all writers are aborted. -async fn check_for_errors( - result: Result, - writers: &mut [AbortableWrite], -) -> Result { - match result { - Ok(value) => Ok(value), - Err(e) => { - // Abort all writers before returning the error: - for writer in writers { - let mut abort_future = writer.abort_writer(); - if let Ok(abort_future) = &mut abort_future { - let _ = abort_future.await; - } - // Ignore errors that occur during abortion, - // We do try to abort all writers before returning error. - } - // After aborting writers return original error. - Err(e) - } + fn duplicate(&mut self) -> Result> { + Err(DataFusionError::NotImplemented( + "Parallel serialization is not implemented for this file type".into(), + )) } } @@ -322,48 +300,74 @@ pub(crate) async fn create_writer( } async fn serialize_rb_stream_to_object_store( - mut data_stream: Pin>, + mut data_stream: Pin>, mut serializer: Box, - mut writer: AbortableWrite> - ) -> Result{ - let mut row_count = 0; - // Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order - let mut serialize_tasks: Vec>> = Vec::new(); - while let Some(maybe_batch) = data_stream.next().await { - let mut serializer_clone = serializer.duplicate()?; - serialize_tasks.push(task::spawn( - async move { - let batch = maybe_batch?; - let num_rows = batch.num_rows(); - let bytes = serializer_clone.serialize(batch).await?; - Ok((num_rows, bytes)) - } - )); - } - for serialize_result in serialize_tasks { - let result = serialize_result.await; - match result { - Ok(res) => { - let (cnt, bytes) = res?; - row_count += cnt; - writer.write_all(&bytes) - .await - .map_err(|_| DataFusionError::Internal("Unexpected FileSink Error".to_string()))?; - }, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); + mut writer: AbortableWrite>, +) -> std::result::Result< + (AbortableWrite>, u64), + ( + AbortableWrite>, + DataFusionError, + ), +> { + let mut row_count = 0; + // Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order + let mut serialize_tasks: Vec>> = + Vec::new(); + while let Some(maybe_batch) = data_stream.next().await { + let mut serializer_clone = match serializer.duplicate() { + Ok(s) => s, + Err(_) => { + return Err(( + writer, + DataFusionError::Internal( + "Unknown error writing to object store".into(), + ), + )) + } + }; + serialize_tasks.push(task::spawn(async move { + let batch = maybe_batch?; + let num_rows = batch.num_rows(); + let bytes = serializer_clone.serialize(batch).await?; + Ok((num_rows, bytes)) + })); + } + for serialize_result in serialize_tasks { + let result = serialize_result.await; + match result { + Ok(res) => { + let (cnt, bytes) = match res { + Ok(r) => r, + Err(e) => return Err((writer, e)), + }; + row_count += cnt; + match writer.write_all(&bytes).await { + Ok(_) => (), + Err(_) => { + return Err(( + writer, + DataFusionError::Internal( + "Unknown error writing to object store".into(), + ), + )) } - } + }; } - } - - writer.shutdown().await?; - Ok(row_count as u64) + Err(_) => { + return Err(( + writer, + DataFusionError::Internal( + "Unknown error writing to object store".into(), + ), + )) + } + } } + Ok((writer, row_count as u64)) +} + /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. @@ -379,59 +383,117 @@ pub(crate) async fn stateless_serialize_and_write_files( return internal_err!("single_file_output is true, but got more than 1 writer!"); } let num_partitions = data.len(); - if !single_file_output && (num_partitions != writers.len()) { + let num_writers = writers.len(); + if !single_file_output && (num_partitions != num_writers) { return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); } let mut row_count = 0; + // tracks if any writers encountered an error triggering the need to abort + let mut any_errors = false; + // tracks the specific error triggering abort + let mut triggering_error = None; + // tracks if any errors were encountered in the process of aborting writers. + // if true, we may not have a guarentee that all written data was cleaned up. + let mut any_abort_errors = false; match single_file_output { - false => { - let mut join_set = JoinSet::new(); - for (data_stream, serializer, writer) in - data - .into_iter() - .zip(serializers.into_iter()) - .zip(writers.into_iter()) - .map(|((a,b), c)| (a,b,c)) + false => { + let mut join_set = JoinSet::new(); + for (data_stream, serializer, writer) in data + .into_iter() + .zip(serializers.into_iter()) + .zip(writers.into_iter()) + .map(|((a, b), c)| (a, b, c)) { join_set.spawn(async move { - serialize_rb_stream_to_object_store(data_stream, serializer, writer).await + serialize_rb_stream_to_object_store(data_stream, serializer, writer) + .await + }); + } + let mut finished_writers = Vec::with_capacity(num_writers); + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => match res { + Ok((writer, cnt)) => { + finished_writers.push(writer); + row_count += cnt; + } + Err((writer, e)) => { + finished_writers.push(writer); + any_errors = true; + triggering_error = Some(e); + } + }, + Err(_) => { + // Don't panic, instead try to clean up as many writers as possible. + // If we hit this code, ownership of a writer was not joined back to + // this thread, so we cannot clean it up (hence any_abort_errors is true) + any_errors = true; + any_abort_errors = true; + } } - ); } - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => { - let cnt = res?; - row_count += cnt; - }, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); + + // Finalize or abort writers as appropriate + for mut writer in finished_writers.into_iter() { + match any_errors { + true => { + let abort_result = writer.abort_writer(); + if abort_result.is_err() { + any_abort_errors = true; + } + } + false => { + // TODO if we encounter an error during shutdown, delete previously written files? + writer.shutdown() + .await + .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; } } } } - }, - true => { - for mut data_stream in data.into_iter() - { + true => { + // TODO use fn call in this branch + 'outer: for mut data_stream in data.into_iter() { let serializer = &mut serializers[0]; let writer = &mut writers[0]; while let Some(maybe_batch) = data_stream.next().await { // Write data to files in a round robin fashion: - let batch = maybe_batch?; + let batch = match maybe_batch { + Ok(b) => b, + Err(e) => { + any_errors = true; + triggering_error = Some(e); + break 'outer; + } + }; row_count += batch.num_rows() as u64; let bytes = serializer.serialize(batch).await?; - writer.write_all(&bytes) - .await - .map_err(|_| DataFusionError::Internal("Unexpected FileSink Error".to_string()))?; + writer.write_all(&bytes).await.map_err(|_| { + DataFusionError::Internal("Unexpected FileSink Error".to_string()) + })?; + } + } + match any_errors { + true => { + let abort_result = writers[0].abort_writer(); + if abort_result.is_err() { + any_abort_errors = true; + } } + false => writers[0].shutdown().await?, + } + } + } + + if any_errors { + match any_abort_errors{ + true => return Err(DataFusionError::Internal("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written.".into())), + false => match triggering_error { + Some(e) => return Err(e), + None => return Err(DataFusionError::Internal("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.".into())) } - writers[0].shutdown().await?; - } + } } - + Ok(row_count) } diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 754389a61433..f54181cd53fd 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -336,6 +336,7 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] + #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); From b0d5c1e68773308dc4dfdd0e1122cf1dca2fde77 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 31 Aug 2023 11:26:17 -0400 Subject: [PATCH 03/14] parallelize single_file true --- .../core/src/datasource/file_format/write.rs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 3e5579308e85..4f10abedc484 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -299,12 +299,16 @@ pub(crate) async fn create_writer( } } +/// Serializes a single data stream in parallel and writes to an ObjectStore +/// concurrently. Data order is preserved. In the event of an error, +/// the ObjectStore writer is returned to the caller in addition to an error, +/// so that the caller may handle aborting failed writes. async fn serialize_rb_stream_to_object_store( mut data_stream: Pin>, mut serializer: Box, mut writer: AbortableWrite>, ) -> std::result::Result< - (AbortableWrite>, u64), + (Box, AbortableWrite>, u64), ( AbortableWrite>, DataFusionError, @@ -365,7 +369,7 @@ async fn serialize_rb_stream_to_object_store( } } - Ok((writer, row_count as u64)) + Ok((serializer, writer, row_count as u64)) } /// Contains the common logic for serializing RecordBatches and @@ -413,7 +417,7 @@ pub(crate) async fn stateless_serialize_and_write_files( while let Some(result) = join_set.join_next().await { match result { Ok(res) => match res { - Ok((writer, cnt)) => { + Ok((_, writer, cnt)) => { finished_writers.push(writer); row_count += cnt; } @@ -452,35 +456,29 @@ pub(crate) async fn stateless_serialize_and_write_files( } } true => { - // TODO use fn call in this branch - 'outer: for mut data_stream in data.into_iter() { - let serializer = &mut serializers[0]; - let writer = &mut writers[0]; - while let Some(maybe_batch) = data_stream.next().await { - // Write data to files in a round robin fashion: - let batch = match maybe_batch { - Ok(b) => b, - Err(e) => { - any_errors = true; - triggering_error = Some(e); - break 'outer; - } - }; - row_count += batch.num_rows() as u64; - let bytes = serializer.serialize(batch).await?; - writer.write_all(&bytes).await.map_err(|_| { - DataFusionError::Internal("Unexpected FileSink Error".to_string()) - })?; - } + let mut writer = writers.remove(0); + let mut serializer = serializers.remove(0); + let mut cnt; + for data_stream in data.into_iter() { + (serializer, writer, cnt) = match serialize_rb_stream_to_object_store(data_stream, serializer, writer).await{ + Ok((s, w, c)) => (s, w, c), + Err((w, e)) => { + any_errors = true; + triggering_error = Some(e); + writer = w; + break; + } + }; + row_count += cnt; } match any_errors { true => { - let abort_result = writers[0].abort_writer(); + let abort_result = writer.abort_writer(); if abort_result.is_err() { any_abort_errors = true; } } - false => writers[0].shutdown().await?, + false => writer.shutdown().await?, } } } From 1dd0e988ceb6ecc1df4053fd7d065c15c6e8b431 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 31 Aug 2023 12:10:11 -0400 Subject: [PATCH 04/14] cargo fmt --- .../core/src/datasource/file_format/write.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 4f10abedc484..28787ba89d3b 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -300,15 +300,19 @@ pub(crate) async fn create_writer( } /// Serializes a single data stream in parallel and writes to an ObjectStore -/// concurrently. Data order is preserved. In the event of an error, +/// concurrently. Data order is preserved. In the event of an error, /// the ObjectStore writer is returned to the caller in addition to an error, -/// so that the caller may handle aborting failed writes. +/// so that the caller may handle aborting failed writes. async fn serialize_rb_stream_to_object_store( mut data_stream: Pin>, mut serializer: Box, mut writer: AbortableWrite>, ) -> std::result::Result< - (Box, AbortableWrite>, u64), + ( + Box, + AbortableWrite>, + u64, + ), ( AbortableWrite>, DataFusionError, @@ -460,7 +464,13 @@ pub(crate) async fn stateless_serialize_and_write_files( let mut serializer = serializers.remove(0); let mut cnt; for data_stream in data.into_iter() { - (serializer, writer, cnt) = match serialize_rb_stream_to_object_store(data_stream, serializer, writer).await{ + (serializer, writer, cnt) = match serialize_rb_stream_to_object_store( + data_stream, + serializer, + writer, + ) + .await + { Ok((s, w, c)) => (s, w, c), Err((w, e)) => { any_errors = true; From bcb2baf444214d87949a544665f584a16685baa6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 5 Sep 2023 11:31:46 -0400 Subject: [PATCH 05/14] Update datafusion/core/src/datasource/file_format/write.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com> --- .../core/src/datasource/file_format/write.rs | 83 ++++++++++++------- 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 28787ba89d3b..f1327e21f3c2 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -318,38 +318,40 @@ async fn serialize_rb_stream_to_object_store( DataFusionError, ), > { - let mut row_count = 0; - // Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order - let mut serialize_tasks: Vec>> = - Vec::new(); - while let Some(maybe_batch) = data_stream.next().await { - let mut serializer_clone = match serializer.duplicate() { - Ok(s) => s, - Err(_) => { - return Err(( - writer, - DataFusionError::Internal( + let (tx, mut rx) = + mpsc::channel::>>(100); // buffer size of 100, adjust as needed + + let serialize_task = tokio::spawn(async move { + while let Some(maybe_batch) = data_stream.next().await { + match serializer.duplicate() { + Ok(mut serializer_clone) => { + let handle = tokio::spawn(async move { + let batch = maybe_batch?; + let num_rows = batch.num_rows(); + let bytes = serializer_clone.serialize(batch).await?; + Ok((num_rows, bytes)) + }); + tx.send(handle).await.map_err(|_| { + DataFusionError::Internal( + "Unknown error writing to object store".into(), + ) + })?; + yield_now().await; + } + Err(_) => { + return Err(DataFusionError::Internal( "Unknown error writing to object store".into(), - ), - )) + )) + } } - }; - serialize_tasks.push(task::spawn(async move { - let batch = maybe_batch?; - let num_rows = batch.num_rows(); - let bytes = serializer_clone.serialize(batch).await?; - Ok((num_rows, bytes)) - })); - } - for serialize_result in serialize_tasks { - let result = serialize_result.await; - match result { - Ok(res) => { - let (cnt, bytes) = match res { - Ok(r) => r, - Err(e) => return Err((writer, e)), - }; - row_count += cnt; + } + Ok(serializer) + }); + + let mut row_count = 0; + while let Some(handle) = rx.recv().await { + match handle.await { + Ok(Ok((cnt, bytes))) => { match writer.write_all(&bytes).await { Ok(_) => (), Err(_) => { @@ -361,20 +363,37 @@ async fn serialize_rb_stream_to_object_store( )) } }; + row_count += cnt; + } + Ok(Err(e)) => { + // Return the writer along with the error + return Err((writer, e)); } Err(_) => { + // Handle task panic or cancellation return Err(( writer, DataFusionError::Internal( - "Unknown error writing to object store".into(), + "Serialization task panicked or was cancelled".into(), ), - )) + )); } } } + let serializer = match serialize_task.await { + Ok(Ok(serializer)) => serializer, + Ok(Err(e)) => return Err((writer, e)), + Err(_) => { + return Err(( + writer, + DataFusionError::Internal("Unknown error writing to object store".into()), + )) + } + }; Ok((serializer, writer, row_count as u64)) } +} /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. From 584d925d8945f230ecf28190978dff792e11f55d Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 5 Sep 2023 12:13:30 -0400 Subject: [PATCH 06/14] add channel --- datafusion/core/src/datasource/file_format/write.rs | 8 ++++---- datafusion/core/tests/fifo.rs | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index f1327e21f3c2..3f6fef0b7fec 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -40,7 +40,8 @@ use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::task::{self, JoinHandle, JoinSet}; +use tokio::sync::mpsc; +use tokio::task::{JoinHandle, JoinSet}; /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends @@ -319,7 +320,7 @@ async fn serialize_rb_stream_to_object_store( ), > { let (tx, mut rx) = - mpsc::channel::>>(100); // buffer size of 100, adjust as needed + mpsc::channel::>>(100); let serialize_task = tokio::spawn(async move { while let Some(maybe_batch) = data_stream.next().await { @@ -336,7 +337,6 @@ async fn serialize_rb_stream_to_object_store( "Unknown error writing to object store".into(), ) })?; - yield_now().await; } Err(_) => { return Err(DataFusionError::Internal( @@ -393,7 +393,7 @@ async fn serialize_rb_stream_to_object_store( }; Ok((serializer, writer, row_count as u64)) } -} + /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index f54181cd53fd..754389a61433 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -336,7 +336,6 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] - #[ignore] async fn test_sql_insert_into_fifo() -> Result<()> { // To make unbounded deterministic let waiting = Arc::new(AtomicBool::new(true)); From 6abbdfce3ec90200d60a6a750fc2d71955627bc3 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 5 Sep 2023 12:15:47 -0400 Subject: [PATCH 07/14] cargo fmt --- datafusion/core/src/datasource/file_format/write.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 3f6fef0b7fec..898d76c2e9a3 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -320,7 +320,7 @@ async fn serialize_rb_stream_to_object_store( ), > { let (tx, mut rx) = - mpsc::channel::>>(100); + mpsc::channel::>>(100); let serialize_task = tokio::spawn(async move { while let Some(maybe_batch) = data_stream.next().await { @@ -394,7 +394,6 @@ async fn serialize_rb_stream_to_object_store( Ok((serializer, writer, row_count as u64)) } - /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. From 64d76927aca35933e7765f87e8a672b9702e855b Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Sep 2023 10:47:40 -0400 Subject: [PATCH 08/14] yield_now if input is fifo unbounded --- datafusion/core/src/datasource/file_format/csv.rs | 1 + .../core/src/datasource/file_format/json.rs | 1 + .../core/src/datasource/file_format/write.rs | 15 +++++++++++++-- datafusion/core/src/datasource/listing/table.rs | 1 + .../core/src/datasource/listing_table_factory.rs | 8 +++++--- .../core/src/datasource/physical_plan/mod.rs | 2 ++ datafusion/core/src/physical_planner.rs | 1 + 7 files changed, 24 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index c0dfa7ff6a1d..4578ab5a4324 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -587,6 +587,7 @@ impl DataSink for CsvSink { serializers, writers, self.config.single_file_output, + self.config.unbounded_input, ) .await } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 59caf0dcb2d4..0aa87a9a3228 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -368,6 +368,7 @@ impl DataSink for JsonSink { serializers, writers, self.config.single_file_output, + self.config.unbounded_input, ) .await } diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 898d76c2e9a3..ffab5f35f7e4 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -308,6 +308,7 @@ async fn serialize_rb_stream_to_object_store( mut data_stream: Pin>, mut serializer: Box, mut writer: AbortableWrite>, + unbounded_input: bool, ) -> std::result::Result< ( Box, @@ -337,6 +338,9 @@ async fn serialize_rb_stream_to_object_store( "Unknown error writing to object store".into(), ) })?; + if unbounded_input { + tokio::task::yield_now().await; + } } Err(_) => { return Err(DataFusionError::Internal( @@ -404,6 +408,7 @@ pub(crate) async fn stateless_serialize_and_write_files( mut serializers: Vec>, mut writers: Vec>>, single_file_output: bool, + unbounded_input: bool, ) -> Result { if single_file_output && (serializers.len() != 1 || writers.len() != 1) { return internal_err!("single_file_output is true, but got more than 1 writer!"); @@ -431,8 +436,13 @@ pub(crate) async fn stateless_serialize_and_write_files( .map(|((a, b), c)| (a, b, c)) { join_set.spawn(async move { - serialize_rb_stream_to_object_store(data_stream, serializer, writer) - .await + serialize_rb_stream_to_object_store( + data_stream, + serializer, + writer, + unbounded_input, + ) + .await }); } let mut finished_writers = Vec::with_capacity(num_writers); @@ -486,6 +496,7 @@ pub(crate) async fn stateless_serialize_and_write_files( data_stream, serializer, writer, + unbounded_input, ) .await { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7740bff2109b..e5e729f37e45 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -915,6 +915,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, + unbounded_input: self.options().infinite_source, single_file_output: self.options.single_file, overwrite, file_type_writer_options, diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index a788baf80fee..2c8fe4f88c9f 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,8 +137,9 @@ impl TableProviderFactory for ListingTableFactory { let mut statement_options = StatementOptions::from(&cmd.options); // Extract ListingTable specific options if present or set default - // Discard unbounded option if present - statement_options.take_str_option("unbounded"); + let unbounded = statement_options + .take_bool_option("unbounded")? + .unwrap_or(false); let create_local_path = statement_options .take_bool_option("create_local_path")? .unwrap_or(false); @@ -213,7 +214,8 @@ impl TableProviderFactory for ListingTableFactory { .with_file_sort_order(cmd.order_exprs.clone()) .with_insert_mode(insert_mode) .with_single_file(single_file) - .with_write_options(file_type_writer_options); + .with_write_options(file_type_writer_options) + .with_infinite_source(unbounded); let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 46556cb3c7b4..4e444c6e5d06 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -95,6 +95,8 @@ pub struct FileSinkConfig { /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. pub single_file_output: bool, + /// If input is unbounded, tokio tasks need to yield to not block execution forever + pub unbounded_input: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 25b11b1f973f..d4abdf04b3db 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -581,6 +581,7 @@ impl DefaultPhysicalPlanner { file_groups: vec![], output_schema: Arc::new(schema), table_partition_cols: vec![], + unbounded_input: false, writer_mode: FileWriterMode::PutMultipart, single_file_output: *single_file_output, overwrite: false, From cf1b1a504062dd3439e587434822ba80110d2234 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Sep 2023 11:42:43 -0400 Subject: [PATCH 09/14] fix unbounded option parse --- .../core/src/datasource/listing_table_factory.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 2c8fe4f88c9f..e72f0030e730 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,9 +137,15 @@ impl TableProviderFactory for ListingTableFactory { let mut statement_options = StatementOptions::from(&cmd.options); // Extract ListingTable specific options if present or set default - let unbounded = statement_options + let unbounded = if infinite_source{ + statement_options.take_str_option("unbounded"); + infinite_source + } else{ + statement_options .take_bool_option("unbounded")? - .unwrap_or(false); + .unwrap_or(false) + }; + let create_local_path = statement_options .take_bool_option("create_local_path")? .unwrap_or(false); From d8b84dd149fc9e315446be62bcebc5609ca6a47b Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Sep 2023 11:45:55 -0400 Subject: [PATCH 10/14] cargo fmt --- .../core/src/datasource/listing_table_factory.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index e72f0030e730..6d850bebd855 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,15 +137,15 @@ impl TableProviderFactory for ListingTableFactory { let mut statement_options = StatementOptions::from(&cmd.options); // Extract ListingTable specific options if present or set default - let unbounded = if infinite_source{ + let unbounded = if infinite_source { statement_options.take_str_option("unbounded"); infinite_source - } else{ + } else { statement_options - .take_bool_option("unbounded")? - .unwrap_or(false) + .take_bool_option("unbounded")? + .unwrap_or(false) }; - + let create_local_path = statement_options .take_bool_option("create_local_path")? .unwrap_or(false); From e6f1b2fe5a0219118919da5eaa24ac9293334ff3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 8 Sep 2023 13:26:50 -0400 Subject: [PATCH 11/14] Remove redundant source --- datafusion/core/src/datasource/listing_table_factory.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 6d850bebd855..b4892119b785 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -216,7 +216,6 @@ impl TableProviderFactory for ListingTableFactory { .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) - .with_infinite_source(infinite_source) .with_file_sort_order(cmd.order_exprs.clone()) .with_insert_mode(insert_mode) .with_single_file(single_file) From c65d42988502395c38547f73223c3b08a18aba8a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 8 Sep 2023 19:50:06 -0400 Subject: [PATCH 12/14] Apply suggestions from code review Co-authored-by: Andrew Lamb --- .../core/src/datasource/file_format/write.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index ffab5f35f7e4..05ab2fcec580 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -240,7 +240,7 @@ pub enum FileWriterMode { pub trait BatchSerializer: Unpin + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. async fn serialize(&mut self, batch: RecordBatch) -> Result; - /// Duplicates self to support serializing multiple batches in parralell on multiple cores + /// Duplicates self to support serializing multiple batches in parallel on multiple cores fn duplicate(&mut self) -> Result> { Err(DataFusionError::NotImplemented( "Parallel serialization is not implemented for this file type".into(), @@ -358,11 +358,11 @@ async fn serialize_rb_stream_to_object_store( Ok(Ok((cnt, bytes))) => { match writer.write_all(&bytes).await { Ok(_) => (), - Err(_) => { + Err(e) => { return Err(( writer, - DataFusionError::Internal( - "Unknown error writing to object store".into(), + DataFusionError::Execution( + format!("Error writing to object store: {e}"), ), )) } @@ -373,12 +373,12 @@ async fn serialize_rb_stream_to_object_store( // Return the writer along with the error return Err((writer, e)); } - Err(_) => { + Err(e) => { // Handle task panic or cancellation return Err(( writer, - DataFusionError::Internal( - "Serialization task panicked or was cancelled".into(), + DataFusionError::Execution( + format!("Serialization task panicked or was cancelled: {e}") ), )); } From 0df35b12da4ec786912e786dddee2e982c58cfd4 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 8 Sep 2023 20:16:06 -0400 Subject: [PATCH 13/14] address comments, add type aliases --- .../core/src/datasource/file_format/write.rs | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 05ab2fcec580..5b5bba60bf57 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -300,6 +300,9 @@ pub(crate) async fn create_writer( } } +type WriterType = AbortableWrite>; +type SerializerType = Box; + /// Serializes a single data stream in parallel and writes to an ObjectStore /// concurrently. Data order is preserved. In the event of an error, /// the ObjectStore writer is returned to the caller in addition to an error, @@ -309,17 +312,8 @@ async fn serialize_rb_stream_to_object_store( mut serializer: Box, mut writer: AbortableWrite>, unbounded_input: bool, -) -> std::result::Result< - ( - Box, - AbortableWrite>, - u64, - ), - ( - AbortableWrite>, - DataFusionError, - ), -> { +) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType, DataFusionError)> +{ let (tx, mut rx) = mpsc::channel::>>(100); @@ -361,9 +355,9 @@ async fn serialize_rb_stream_to_object_store( Err(e) => { return Err(( writer, - DataFusionError::Execution( - format!("Error writing to object store: {e}"), - ), + DataFusionError::Execution(format!( + "Error writing to object store: {e}" + )), )) } }; @@ -377,9 +371,9 @@ async fn serialize_rb_stream_to_object_store( // Handle task panic or cancellation return Err(( writer, - DataFusionError::Execution( - format!("Serialization task panicked or was cancelled: {e}") - ), + DataFusionError::Execution(format!( + "Serialization task panicked or was cancelled: {e}" + )), )); } } @@ -405,8 +399,8 @@ async fn serialize_rb_stream_to_object_store( /// dependency on the RecordBatches before or after. pub(crate) async fn stateless_serialize_and_write_files( data: Vec, - mut serializers: Vec>, - mut writers: Vec>>, + mut serializers: Vec, + mut writers: Vec, single_file_output: bool, unbounded_input: bool, ) -> Result { @@ -459,12 +453,13 @@ pub(crate) async fn stateless_serialize_and_write_files( triggering_error = Some(e); } }, - Err(_) => { + Err(e) => { // Don't panic, instead try to clean up as many writers as possible. // If we hit this code, ownership of a writer was not joined back to // this thread, so we cannot clean it up (hence any_abort_errors is true) any_errors = true; any_abort_errors = true; + triggering_error = Some(DataFusionError::Internal(format!("Unexpected join error while serializing file {e}"))); } } } @@ -479,7 +474,6 @@ pub(crate) async fn stateless_serialize_and_write_files( } } false => { - // TODO if we encounter an error during shutdown, delete previously written files? writer.shutdown() .await .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; From b8f2f0aae5e8f62f4f48fe2cb769e31de9288311 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 8 Sep 2023 20:22:19 -0400 Subject: [PATCH 14/14] cargo fmt --- datafusion/core/src/datasource/file_format/write.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 5b5bba60bf57..222fe5b519aa 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -459,7 +459,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // this thread, so we cannot clean it up (hence any_abort_errors is true) any_errors = true; any_abort_errors = true; - triggering_error = Some(DataFusionError::Internal(format!("Unexpected join error while serializing file {e}"))); + triggering_error = Some(DataFusionError::Internal(format!( + "Unexpected join error while serializing file {e}" + ))); } } }