From 3e6a5709c97a9e886428c20c5f1310397153d610 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 17:01:51 -0600 Subject: [PATCH 01/26] Infer stream ipc format for arrow data sources --- .../datasource-arrow/src/file_format.rs | 222 ++++++++++-------- .../tests/data/example_stream.arrow | Bin 0 -> 1480 bytes 2 files changed, 126 insertions(+), 96 deletions(-) create mode 100644 datafusion/datasource-arrow/tests/data/example_stream.arrow diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 3b8564080421..4c6818af559c 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; -use arrow::ipc::reader::FileReader; +use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use datafusion_common::error::Result; @@ -150,12 +150,12 @@ impl FileFormat for ArrowFormat { let schema = match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let reader = FileReader::try_new(&mut file, None)?; - reader.schema() - } - GetResultPayload::Stream(stream) => { - infer_schema_from_file_stream(stream).await? + match FileReader::try_new(&mut file, None) { + Ok(reader) => reader.schema(), + Err(_) => StreamReader::try_new(&mut file, None)?.schema(), + } } + GetResultPayload::Stream(stream) => infer_ipc_schema(stream).await?, }; schemas.push(schema.as_ref().clone()); } @@ -344,40 +344,69 @@ impl DataSink for ArrowFileSink { } } +/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. +/// See + const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; -/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. -/// See -async fn infer_schema_from_file_stream( +async fn infer_ipc_schema( mut stream: BoxStream<'static, object_store::Result>, ) -> Result { - // Expected format: - // - 6 bytes - // - 2 bytes - // - 4 bytes, not present below v0.15.0 - // - 4 bytes - // - // - - // So in first read we need at least all known sized sections, - // which is 6 + 2 + 4 + 4 = 16 bytes. + // Expected IPC format is either: + // + // stream: + // - 4 bytes (added in v0.15.0+) + // - 4 bytes + // + // + // + // file: + // - 6 bytes + // - 2 bytes + // + + // Perform the initial read such that we always have the metadata size let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; - // Files should start with these magic bytes - if bytes[0..6] != ARROW_MAGIC { - return Err(ArrowError::ParseError( - "Arrow file does not contain correct header".to_string(), - ))?; - } - - // Since continuation marker bytes added in later versions - let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { - (&bytes[12..16], 16) + // The preamble size is everything before the metadata size + let preamble_size = if bytes[0..6] == ARROW_MAGIC { + // File format starts with magic number "ARROW1" + if bytes[8..12] == CONTINUATION_MARKER { + // Continuation marker was added in v0.15.0 + 12 + } else { + // File format before v0.15.0 + 8 + } + } else if bytes[0..4] == CONTINUATION_MARKER { + // Stream format after v0.15.0 starts with continuation marker + 4 } else { - (&bytes[8..12], 12) + // Stream format before v0.15.0 does not have a preamble + 0 }; + return infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await; +} + +/// Infer schema from IPC format, ignoring the preamble bytes +async fn infer_ipc_schema_ignoring_preamble_bytes( + bytes: Vec, + preamble_size: usize, + mut stream: BoxStream<'static, object_store::Result>, +) -> Result { + let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = ( + bytes[preamble_size..preamble_size + 4] + .try_into() + .map_err(|err| { + ArrowError::ParseError(format!( + "Unable to read IPC message as metadata length: {err:?}" + )) + })?, + preamble_size + 4, + ); + let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; let meta_len = i32::from_le_bytes(meta_len); @@ -524,79 +553,80 @@ mod tests { #[tokio::test] async fn test_infer_schema_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let state = MockSession::new(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; - - // Test chunk sizes where too small so we keep having to read more bytes - // And when large enough that first read contains all we need - for chunk_size in [7, 3000] { - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); - let inferred_schema = arrow_format - .infer_schema( - &state, - &(store.clone() as Arc), - std::slice::from_ref(&object_meta), - ) - .await?; - let actual_fields = inferred_schema - .fields() - .iter() - .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); - assert_eq!(expected, actual_fields); - } + for file in ["example.arrow", "example_stream.arrow"] { + let mut bytes = std::fs::read(format!("tests/data/{}", file))?; + bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file + let location = Path::parse(file)?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + let arrow_format = ArrowFormat {}; + let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; + + // Test chunk sizes where too small so we keep having to read more bytes + // And when large enough that first read contains all we need + for chunk_size in [7, 3000] { + let store = + Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); + let inferred_schema = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + std::slice::from_ref(&object_meta), + ) + .await?; + let actual_fields = inferred_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(expected, actual_fields); + } + } Ok(()) } #[tokio::test] async fn test_infer_schema_short_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(20); // should cause error that file shorter than expected - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let state = MockSession::new(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); - let err = arrow_format - .infer_schema( - &state, - &(store.clone() as Arc), - std::slice::from_ref(&object_meta), - ) - .await; + for file in ["example.arrow", "example_stream.arrow"] { + let mut bytes = std::fs::read(format!("tests/data/{}", file))?; + bytes.truncate(20); // should cause error that file shorter than expected + let location = Path::parse(file)?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let arrow_format = ArrowFormat {}; - assert!(err.is_err()); - assert_eq!( - "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", - err.unwrap_err().to_string().lines().next().unwrap() - ); + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); + let err = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + std::slice::from_ref(&object_meta), + ) + .await; + + assert!(err.is_err()); + assert_eq!( "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", err.unwrap_err().to_string().lines().next().unwrap()); + } Ok(()) } diff --git a/datafusion/datasource-arrow/tests/data/example_stream.arrow b/datafusion/datasource-arrow/tests/data/example_stream.arrow new file mode 100644 index 0000000000000000000000000000000000000000..dbe10596f3a9d8af7aaeb16bc41ce9c2b308ff69 GIT binary patch literal 1480 zcmbVL&2G~`5T3T71PFygB_hPZ+H({o<>$aD55OBB6s_ZpFmk++;{YM5)Q94V)FY3; zfrsGOoB6)=E)7l_DV=1zGvCb4=ULBIRaIRbiaZs{LphO-JdzeHkyD4s0lg0Fw#VDB z{~U=9i`AQds{laRWO|eLA zPINfz5BdXzQ&66IGuVIKf2q##9gm@PT;vG$M`#b)f_}%`a6UK}n1thc75NPu<0R8F zkzcU8uzk+Gdd9e)KM|KV;R`(5sM5k@a=u1T2*3x4!{i%pN@c76lhWv|$U-`VdE}<^JC563HLVvY)`*XAB+p~5@$jMn{ j&!O+#JFcTsYhzu+`r9?Mm-TqB?Cs=z-?v$T^Gob6VUM_D literal 0 HcmV?d00001 From 0ad62ed330e78514663707eb43d6cccf51d05287 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 17:57:32 -0600 Subject: [PATCH 02/26] Allow FileOpener for ArrowSource to open both IPC formats --- .../datasource-arrow/src/file_format.rs | 25 ++++++ datafusion/datasource-arrow/src/source.rs | 80 ++++++++++++++++--- 2 files changed, 93 insertions(+), 12 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 4c6818af559c..b0eef70d6664 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -23,8 +23,10 @@ use std::any::Any; use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{self, Debug}; +use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; +use arrow::array::{RecordBatch, RecordBatchReader}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; @@ -461,6 +463,29 @@ async fn collect_at_least_n_bytes( Ok(buf) } +/// Creates a [`RecordBatchReader`] for Arrow IPC formatted data from a +/// [`Read`] and [`Seek`] reader. Detects whether `reader`'s format +/// is IPC file or IPC stream using the `ARROW1` magic number. +pub fn try_new_record_batch_reader( + mut reader: R, + projection: Option>, +) -> Result> + Send>> { + let stream_position = reader.stream_position()?; + let mut buffer = [0; 6]; + if reader.read(&mut buffer)? < 6 { + return Err(ArrowError::ParseError( + "Unexpected end of byte stream for Arrow IPC file".to_string(), + ))?; + } + reader.seek(SeekFrom::Start(stream_position))?; + + if buffer == ARROW_MAGIC { + return Ok(Box::new(FileReader::try_new(reader, projection)?)); + } + + Ok(Box::new(StreamReader::try_new(reader, projection)?)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index f254b7e3ff30..6e78eff65950 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -37,6 +37,8 @@ use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; +use crate::try_new_record_batch_reader; + /// Arrow configuration struct that is given to DataSourceExec /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow #[derive(Clone, Default)] @@ -136,22 +138,22 @@ impl FileOpener for ArrowOpener { match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { - let arrow_reader = arrow::ipc::reader::FileReader::try_new( - file, projection, - )?; - Ok(futures::stream::iter(arrow_reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + Ok(futures::stream::iter(try_new_record_batch_reader( + file.try_clone()?, + projection.clone(), + )?) + .map(|r| r.map_err(Into::into)) + .boxed()) } GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; let cursor = std::io::Cursor::new(bytes); - let arrow_reader = arrow::ipc::reader::FileReader::try_new( - cursor, projection, - )?; - Ok(futures::stream::iter(arrow_reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + Ok(futures::stream::iter(try_new_record_batch_reader( + cursor, + projection.clone(), + )?) + .map(|r| r.map_err(Into::into)) + .boxed()) } } } @@ -259,3 +261,57 @@ impl FileOpener for ArrowOpener { })) } } + +#[cfg(test)] +mod tests { + use std::{fs::File, io::Read}; + + use arrow_ipc::reader::{FileReader, StreamReader}; + use bytes::Bytes; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_execution::object_store::ObjectStoreUrl; + use object_store::memory::InMemory; + + use super::*; + + #[tokio::test] + async fn test_file_opener_with_ipc_file() -> Result<()> { + for filename in ["example.arrow", "example_stream.arrow"] { + let path = format!("tests/data/{}", filename); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new(filename, file_size); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = match FileReader::try_new(File::open(path_str)?, None) { + Ok(reader) => reader.schema(), + Err(_) => StreamReader::try_new(File::open(path_str)?, None)?.schema(), + }; + + let source = Arc::new(ArrowSource::default()); + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let mut stream = file_opener.open(partitioned_file)?.await?; + + assert!(stream.next().await.is_some()); + } + + Ok(()) + } +} From 34ccba4210f239e24d33a9caadd0ea6f80f6b6d7 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 20:06:13 -0600 Subject: [PATCH 03/26] Split reading file vs stream because repartitioning + ranges --- .../core/src/datasource/physical_plan/mod.rs | 2 +- .../datasource-arrow/src/file_format.rs | 75 ++--- datafusion/datasource-arrow/src/source.rs | 270 ++++++++++++++++-- 3 files changed, 287 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 1ac292e260fd..13b2c26353ab 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -37,7 +37,7 @@ pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFac pub use json::{JsonOpener, JsonSource}; -pub use arrow::{ArrowOpener, ArrowSource}; +pub use arrow::{ArrowFileOpener, ArrowFileSource, ArrowStreamOpener, ArrowStreamSource}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index b0eef70d6664..f16e7600d2b1 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -23,10 +23,8 @@ use std::any::Any; use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use arrow::array::{RecordBatch, RecordBatchReader}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; @@ -51,7 +49,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use crate::source::ArrowSource; +use crate::source::{ArrowFileSource, ArrowStreamSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::file_compression_type::FileCompressionType; @@ -63,7 +61,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use futures::stream::BoxStream; use futures::StreamExt; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use object_store::{GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore}; use tokio::io::AsyncWriteExt; /// Initial writing buffer size. Note this is just a size hint for efficiency. It @@ -177,10 +175,39 @@ impl FileFormat for ArrowFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result> { - let source = Arc::new(ArrowSource::default()); + let is_stream_format = if let Some(first_group) = conf.file_groups.first() { + if let Some(first_file) = first_group.files().first() { + let object_store = + state.runtime_env().object_store(&conf.object_store_url)?; + + let get_opts = GetOptions { + range: Some(GetRange::Bounded(0..6)), + ..Default::default() + }; + let result = object_store + .get_opts(&first_file.object_meta.location, get_opts) + .await?; + let bytes = result.bytes().await?; + + // assume stream format if the file is too short + // or the file does not start with the magic number + bytes.len() < 6 || bytes[0..6] != ARROW_MAGIC + } else { + false // no files, default to file format + } + } else { + false // no file groups, default to file format + }; + + let source: Arc = if is_stream_format { + Arc::new(ArrowStreamSource::default()) + } else { + Arc::new(ArrowFileSource::default()) + }; + let config = FileScanConfigBuilder::from(conf) .with_source(source) .build(); @@ -205,7 +232,9 @@ impl FileFormat for ArrowFormat { } fn file_source(&self) -> Arc { - Arc::new(ArrowSource::default()) + // defaulting to the file format source since it's + // more capable in general + Arc::new(ArrowFileSource::default()) } } @@ -346,8 +375,8 @@ impl DataSink for ArrowFileSink { } } -/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. -/// See +// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. +// See const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; @@ -389,7 +418,7 @@ async fn infer_ipc_schema( 0 }; - return infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await; + infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await } /// Infer schema from IPC format, ignoring the preamble bytes @@ -458,34 +487,12 @@ async fn collect_at_least_n_bytes( if buf.len() < n { return Err(ArrowError::ParseError( "Unexpected end of byte stream for Arrow IPC file".to_string(), - ))?; + ) + .into()); } Ok(buf) } -/// Creates a [`RecordBatchReader`] for Arrow IPC formatted data from a -/// [`Read`] and [`Seek`] reader. Detects whether `reader`'s format -/// is IPC file or IPC stream using the `ARROW1` magic number. -pub fn try_new_record_batch_reader( - mut reader: R, - projection: Option>, -) -> Result> + Send>> { - let stream_position = reader.stream_position()?; - let mut buffer = [0; 6]; - if reader.read(&mut buffer)? < 6 { - return Err(ArrowError::ParseError( - "Unexpected end of byte stream for Arrow IPC file".to_string(), - ))?; - } - reader.seek(SeekFrom::Start(stream_position))?; - - if buffer == ARROW_MAGIC { - return Ok(Box::new(FileReader::try_new(reader, projection)?)); - } - - Ok(Box::new(StreamReader::try_new(reader, projection)?)) -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 6e78eff65950..463c02780a03 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::sync::Arc; +use std::{any::Any, io::Cursor}; use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::TableSchema; use arrow::buffer::Buffer; -use arrow_ipc::reader::FileDecoder; +use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; use datafusion_common::{exec_datafusion_err, Statistics}; use datafusion_datasource::file::FileSource; @@ -37,31 +37,29 @@ use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; -use crate::try_new_record_batch_reader; - -/// Arrow configuration struct that is given to DataSourceExec +/// Arrow IPC File format source - supports range-based parallel reading /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow #[derive(Clone, Default)] -pub struct ArrowSource { +pub struct ArrowFileSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, } -impl From for Arc { - fn from(source: ArrowSource) -> Self { +impl From for Arc { + fn from(source: ArrowFileSource) -> Self { as_file_source(source) } } -impl FileSource for ArrowSource { +impl FileSource for ArrowFileSource { fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, _partition: usize, ) -> Arc { - Arc::new(ArrowOpener { + Arc::new(ArrowFileOpener { object_store, projection: base_config.file_column_projection_indices(), }) @@ -78,6 +76,7 @@ impl FileSource for ArrowSource { fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(Self { ..self.clone() }) } + fn with_statistics(&self, statistics: Statistics) -> Arc { let mut conf = self.clone(); conf.projected_statistics = Some(statistics); @@ -118,13 +117,150 @@ impl FileSource for ArrowSource { } } -/// The struct arrow that implements `[FileOpener]` trait -pub struct ArrowOpener { +/// Arrow IPC Stream format source - supports only sequential reading +#[derive(Clone, Default)] +pub struct ArrowStreamSource { + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, + schema_adapter_factory: Option>, +} + +impl From for Arc { + fn from(source: ArrowStreamSource) -> Self { + as_file_source(source) + } +} + +impl FileSource for ArrowStreamSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + _partition: usize, + ) -> Arc { + Arc::new(ArrowStreamOpener { + object_store, + projection: base_config.file_column_projection_indices(), + }) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn with_schema(&self, _schema: TableSchema) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn repartitioned( + &self, + _target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + _config: &FileScanConfig, + ) -> Result> { + // Stream format doesn't support range-based parallel reading + // because it lacks a footer that would be needed to make range-based + // seeking practical. Without that, you would either need to read + // the entire file and index it up front before doing parallel reading + // or else each partition would need to read the entire file up to the + // correct offset which is a lot of duplicate I/O. We're opting to avoid + // that entirely by only acting on a single partition and reading sequentially. + Ok(None) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) + } + + fn file_type(&self) -> &str { + "arrow_stream" + } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Result> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } +} + +/// FileOpener for Arrow IPC Stream format - always reads sequentially +pub struct ArrowStreamOpener { pub object_store: Arc, pub projection: Option>, } -impl FileOpener for ArrowOpener { +impl FileOpener for ArrowStreamOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + if partitioned_file.range.is_some() { + return Err(exec_datafusion_err!( + "ArrowStreamOpener does not support range-based reading" + )); + } + let object_store = self.object_store.clone(); + let projection = self.projection.clone(); + Ok(Box::pin(async move { + let r = object_store + .get(&partitioned_file.object_meta.location) + .await?; + match r.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, _) => Ok(futures::stream::iter( + StreamReader::try_new(file.try_clone()?, projection.clone())?, + ) + .map(|r| r.map_err(Into::into)) + .boxed()), + GetResultPayload::Stream(_) => { + let bytes = r.bytes().await?; + let cursor = Cursor::new(bytes); + Ok(futures::stream::iter(StreamReader::try_new( + cursor, + projection.clone(), + )?) + .map(|r| r.map_err(Into::into)) + .boxed()) + } + } + })) + } +} + +/// The struct arrow that implements `[FileOpener]` trait for Arrow IPC File format +pub struct ArrowFileOpener { + pub object_store: Arc, + pub projection: Option>, +} + +impl FileOpener for ArrowFileOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); @@ -137,18 +273,15 @@ impl FileOpener for ArrowOpener { .await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(file, _) => { - Ok(futures::stream::iter(try_new_record_batch_reader( - file.try_clone()?, - projection.clone(), - )?) - .map(|r| r.map_err(Into::into)) - .boxed()) - } + GetResultPayload::File(file, _) => Ok(futures::stream::iter( + FileReader::try_new(file.try_clone()?, projection.clone())?, + ) + .map(|r| r.map_err(Into::into)) + .boxed()), GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; - let cursor = std::io::Cursor::new(bytes); - Ok(futures::stream::iter(try_new_record_batch_reader( + let cursor = Cursor::new(bytes); + Ok(futures::stream::iter(FileReader::try_new( cursor, projection.clone(), )?) @@ -275,7 +408,7 @@ mod tests { use super::*; #[tokio::test] - async fn test_file_opener_with_ipc_file() -> Result<()> { + async fn test_file_opener_without_ranges() -> Result<()> { for filename in ["example.arrow", "example_stream.arrow"] { let path = format!("tests/data/{}", filename); let path_str = path.as_str(); @@ -297,7 +430,11 @@ mod tests { Err(_) => StreamReader::try_new(File::open(path_str)?, None)?.schema(), }; - let source = Arc::new(ArrowSource::default()); + let source: Arc = if filename.contains("stream") { + Arc::new(ArrowStreamSource::default()) + } else { + Arc::new(ArrowFileSource::default()) + }; let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), @@ -314,4 +451,87 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_file_opener_with_ranges() -> Result<()> { + let filename = "example.arrow"; + let path = format!("tests/data/{}", filename); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new_with_range( + filename.into(), + file_size, + 0, + (file_size - 1) as i64, + ); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = FileReader::try_new(File::open(path_str)?, None)?.schema(); + + let source = Arc::new(ArrowFileSource::default()); + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let mut stream = file_opener.open(partitioned_file)?.await?; + + assert!(stream.next().await.is_some()); + + Ok(()) + } + + #[tokio::test] + async fn test_stream_opener_errors_with_ranges() -> Result<()> { + let filename = "example_stream.arrow"; + let path = format!("tests/data/{}", filename); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new_with_range( + filename.into(), + file_size, + 0, + (file_size - 1) as i64, + ); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = StreamReader::try_new(File::open(path_str)?, None)?.schema(); + + let source = Arc::new(ArrowStreamSource::default()); + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let result = file_opener.open(partitioned_file); + assert!(result.is_err()); + + Ok(()) + } } From 99ebe628c188b03fc9c205f6521f2f79a5716aea Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 20:23:25 -0600 Subject: [PATCH 04/26] Fix rewind bug --- .../datasource-arrow/src/file_format.rs | 9 ++- .../sqllogictest/test_files/arrow_files.slt | 73 +++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index f16e7600d2b1..12ecbc6d9297 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -23,6 +23,7 @@ use std::any::Any; use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{self, Debug}; +use std::io::{Seek, SeekFrom}; use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; @@ -152,7 +153,13 @@ impl FileFormat for ArrowFormat { GetResultPayload::File(mut file, _) => { match FileReader::try_new(&mut file, None) { Ok(reader) => reader.schema(), - Err(_) => StreamReader::try_new(&mut file, None)?.schema(), + Err(_) => { + // not in the file format, but FileReader read some bytes + // while trying to parse the file and so we need to rewind + // it to the beginning of the file + file.seek(SeekFrom::Start(0))?; + StreamReader::try_new(&mut file, None)?.schema() + } } } GetResultPayload::Stream(stream) => infer_ipc_schema(stream).await?, diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index b3975e0c3f47..5368871126af 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -128,3 +128,76 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/ # Errors in partition filters should be reported query error Divide by zero error SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1; + +############# +## Arrow IPC stream format support +############# + +# Test CREATE EXTERNAL TABLE with stream format +statement ok +CREATE EXTERNAL TABLE arrow_stream +STORED AS ARROW +LOCATION '../datasource-arrow/tests/data/example_stream.arrow'; + +# physical plan for stream format +query TT +EXPLAIN SELECT * FROM arrow_stream +---- +logical_plan TableScan: arrow_stream projection=[f0, f1, f2] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example_stream.arrow]]}, projection=[f0, f1, f2], file_type=arrow_stream + +# stream format should return same data as file format +query ITB +SELECT * FROM arrow_stream +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +# Verify both file and stream formats return identical results +query ITB +SELECT * FROM arrow_simple ORDER BY f0 +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +query ITB +SELECT * FROM arrow_stream ORDER BY f0 +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +# Both formats should support projection pushdown +query IT +SELECT f0, f1 FROM arrow_simple ORDER BY f0 +---- +1 foo +2 bar +3 baz +4 NULL + +query IT +SELECT f0, f1 FROM arrow_stream ORDER BY f0 +---- +1 foo +2 bar +3 baz +4 NULL + +# Both formats should support filtering +query ITB +SELECT * FROM arrow_simple WHERE f0 > 2 ORDER BY f0 +---- +3 baz false +4 NULL true + +query ITB +SELECT * FROM arrow_stream WHERE f0 > 2 ORDER BY f0 +---- +3 baz false +4 NULL true From 936b2e3375cb5e509bab0e20387d2f8ce8440660 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 20:56:40 -0600 Subject: [PATCH 05/26] Remove a comment that isn't needed anymore --- datafusion/datasource-arrow/src/file_format.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 12ecbc6d9297..62da43129ad2 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -428,7 +428,6 @@ async fn infer_ipc_schema( infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await } -/// Infer schema from IPC format, ignoring the preamble bytes async fn infer_ipc_schema_ignoring_preamble_bytes( bytes: Vec, preamble_size: usize, From a8bc19d39373b3b70e7faaa1f8f6f1bbc78bbc0b Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 2 Nov 2025 21:28:33 -0600 Subject: [PATCH 06/26] Stray reference left over from Rename Symbol fail --- .../schema_adapter/schema_adapter_integration_tests.rs | 6 +++--- datafusion/datasource/src/file.rs | 3 ++- datafusion/datasource/src/source.rs | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index c3c92a9028d6..1f7c3a871de4 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -23,7 +23,7 @@ use bytes::{BufMut, BytesMut}; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ - ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource, + ArrowFileSource, CsvSource, FileSource, JsonSource, ParquetSource, }; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -282,9 +282,9 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // Create a test factory let factory = Arc::new(UppercaseAdapterFactory {}); - // Test ArrowSource + // Test ArrowFileSource { - let source = ArrowSource::default(); + let source = ArrowFileSource::default(); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index d6ade3b8b210..2f9708228181 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -44,7 +44,8 @@ pub fn as_file_source(source: T) -> Arc /// file format specific behaviors for elements in [`DataSource`] /// /// See more details on specific implementations: -/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html) +/// * [`ArrowFileSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowFileSource.html) +/// * [`ArrowStreamSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowStreamSource.html) /// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html) /// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html) /// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 11a8a3867b80..fb2f1c00454d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -95,7 +95,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ ┊ ┊ /// │ ┊ ┊ /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ -/// │ │ │ ArrowSource │ +/// │ │ │ ArrowFileSource │ /// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │ /// │ │ │ ParquetSource │ /// └─────────────────────┘ └─────────────────────┘ @@ -104,7 +104,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ /// │ /// ┌──────────▼──────────┐ -/// │ ArrowSource │ +/// │ ArrowFileSource │ /// │ ... │ /// │ ParquetSource │ /// └─────────────────────┘ From 3c003957df41160d66c90e076788a53fcaacb7c8 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Mon, 3 Nov 2025 19:50:15 -0600 Subject: [PATCH 07/26] Address clippy error --- datafusion/datasource-arrow/src/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 463c02780a03..f6e7fec33d7e 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -226,7 +226,7 @@ impl FileOpener for ArrowStreamOpener { "ArrowStreamOpener does not support range-based reading" )); } - let object_store = self.object_store.clone(); + let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { let r = object_store From 917c6c339526c191b030babac61e32d826117276 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Mon, 3 Nov 2025 19:56:17 -0600 Subject: [PATCH 08/26] Address additional clippy errors --- datafusion/datasource-arrow/src/file_format.rs | 4 ++-- datafusion/datasource-arrow/src/source.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 62da43129ad2..ac3930500d1d 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -592,7 +592,7 @@ mod tests { #[tokio::test] async fn test_infer_schema_stream() -> Result<()> { for file in ["example.arrow", "example_stream.arrow"] { - let mut bytes = std::fs::read(format!("tests/data/{}", file))?; + let mut bytes = std::fs::read(format!("tests/data/{file}"))?; bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file let location = Path::parse(file)?; let in_memory_store: Arc = Arc::new(InMemory::new()); @@ -636,7 +636,7 @@ mod tests { #[tokio::test] async fn test_infer_schema_short_stream() -> Result<()> { for file in ["example.arrow", "example_stream.arrow"] { - let mut bytes = std::fs::read(format!("tests/data/{}", file))?; + let mut bytes = std::fs::read(format!("tests/data/{file}"))?; bytes.truncate(20); // should cause error that file shorter than expected let location = Path::parse(file)?; let in_memory_store: Arc = Arc::new(InMemory::new()); diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index f6e7fec33d7e..701d0321ef54 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -410,7 +410,7 @@ mod tests { #[tokio::test] async fn test_file_opener_without_ranges() -> Result<()> { for filename in ["example.arrow", "example_stream.arrow"] { - let path = format!("tests/data/{}", filename); + let path = format!("tests/data/{filename}"); let path_str = path.as_str(); let mut file = File::open(path_str)?; let file_size = file.metadata()?.len(); @@ -455,7 +455,7 @@ mod tests { #[tokio::test] async fn test_file_opener_with_ranges() -> Result<()> { let filename = "example.arrow"; - let path = format!("tests/data/{}", filename); + let path = format!("tests/data/{filename}"); let path_str = path.as_str(); let mut file = File::open(path_str)?; let file_size = file.metadata()?.len(); @@ -497,7 +497,7 @@ mod tests { #[tokio::test] async fn test_stream_opener_errors_with_ranges() -> Result<()> { let filename = "example_stream.arrow"; - let path = format!("tests/data/{}", filename); + let path = format!("tests/data/{filename}"); let path_str = path.as_str(); let mut file = File::open(path_str)?; let file_size = file.metadata()?.len(); From 07593b4048861a8f87130447cee0b4a20ac57218 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 17:59:56 -0600 Subject: [PATCH 09/26] Pull out the stream format check into an independent function --- .../datasource-arrow/src/file_format.rs | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index ac3930500d1d..acc519f41821 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -62,7 +62,9 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use futures::stream::BoxStream; use futures::StreamExt; -use object_store::{GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore}; +use object_store::{ + path::Path, GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, +}; use tokio::io::AsyncWriteExt; /// Initial writing buffer size. Note this is just a size hint for efficiency. It @@ -185,35 +187,24 @@ impl FileFormat for ArrowFormat { state: &dyn Session, conf: FileScanConfig, ) -> Result> { - let is_stream_format = if let Some(first_group) = conf.file_groups.first() { - if let Some(first_file) = first_group.files().first() { - let object_store = - state.runtime_env().object_store(&conf.object_store_url)?; - - let get_opts = GetOptions { - range: Some(GetRange::Bounded(0..6)), - ..Default::default() - }; - let result = object_store - .get_opts(&first_file.object_meta.location, get_opts) - .await?; - let bytes = result.bytes().await?; - - // assume stream format if the file is too short - // or the file does not start with the magic number - bytes.len() < 6 || bytes[0..6] != ARROW_MAGIC - } else { - false // no files, default to file format - } - } else { - false // no file groups, default to file format - }; - - let source: Arc = if is_stream_format { - Arc::new(ArrowStreamSource::default()) - } else { - Arc::new(ArrowFileSource::default()) - }; + let object_store = state.runtime_env().object_store(&conf.object_store_url)?; + let object_location = &conf + .file_groups + .first() + .ok_or_else(|| internal_datafusion_err!("No files found in file group"))? + .files() + .first() + .ok_or_else(|| internal_datafusion_err!("No files found in file group"))? + .object_meta + .location; + + let source: Arc = + match is_object_in_arrow_ipc_file_format(object_store, object_location).await + { + Ok(true) => Arc::new(ArrowFileSource::default()), + Ok(false) => Arc::new(ArrowStreamSource::default()), + Err(e) => Err(e)?, + }; let config = FileScanConfigBuilder::from(conf) .with_source(source) @@ -499,6 +490,22 @@ async fn collect_at_least_n_bytes( Ok(buf) } +async fn is_object_in_arrow_ipc_file_format( + store: Arc, + object_location: &Path, +) -> Result { + let get_opts = GetOptions { + range: Some(GetRange::Bounded(0..6)), + ..Default::default() + }; + let bytes = store + .get_opts(object_location, get_opts) + .await? + .bytes() + .await?; + Ok(bytes.len() >= 6 && bytes[0..6] == ARROW_MAGIC) +} + #[cfg(test)] mod tests { use super::*; From 0446c32f52099ffdb233613c30c69d69519c784c Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 20:38:51 -0600 Subject: [PATCH 10/26] Refactor schema inference --- .../datasource-arrow/src/file_format.rs | 111 +++++++++--------- 1 file changed, 53 insertions(+), 58 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index acc519f41821..549301c0e2cb 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -20,7 +20,6 @@ //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) use std::any::Any; -use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{self, Debug}; use std::io::{Seek, SeekFrom}; @@ -164,7 +163,7 @@ impl FileFormat for ArrowFormat { } } } - GetResultPayload::Stream(stream) => infer_ipc_schema(stream).await?, + GetResultPayload::Stream(stream) => infer_stream_schema(stream).await?, }; schemas.push(schema.as_ref().clone()); } @@ -379,7 +378,7 @@ impl DataSink for ArrowFileSink { const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; -async fn infer_ipc_schema( +async fn infer_stream_schema( mut stream: BoxStream<'static, object_store::Result>, ) -> Result { // Expected IPC format is either: @@ -395,11 +394,12 @@ async fn infer_ipc_schema( // - 2 bytes // - // Perform the initial read such that we always have the metadata size - let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; + // 16 bytes covers the preamble and metadata length no matter + // which version or format is used + let bytes = extend_bytes_to_n_length_from_stream(vec![], 16, &mut stream).await?; - // The preamble size is everything before the metadata size - let preamble_size = if bytes[0..6] == ARROW_MAGIC { + // The preamble length is everything before the metadata length + let preamble_len = if bytes[0..6] == ARROW_MAGIC { // File format starts with magic number "ARROW1" if bytes[8..12] == CONTINUATION_MARKER { // Continuation marker was added in v0.15.0 @@ -416,77 +416,72 @@ async fn infer_ipc_schema( 0 }; - infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await -} + let meta_len_bytes: [u8; 4] = bytes[preamble_len..preamble_len + 4] + .try_into() + .map_err(|err| { + ArrowError::ParseError(format!( + "Unable to read IPC message metadata length: {err:?}" + )) + })?; -async fn infer_ipc_schema_ignoring_preamble_bytes( - bytes: Vec, - preamble_size: usize, - mut stream: BoxStream<'static, object_store::Result>, -) -> Result { - let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = ( - bytes[preamble_size..preamble_size + 4] - .try_into() - .map_err(|err| { - ArrowError::ParseError(format!( - "Unable to read IPC message as metadata length: {err:?}" - )) - })?, - preamble_size + 4, - ); - - let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; - let meta_len = i32::from_le_bytes(meta_len); - - // Read bytes for Schema message - let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { - // Need to read more bytes to decode Message - let mut block_data = Vec::with_capacity(meta_len as usize); - // In case we had some spare bytes in our initial read chunk - block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); - let size_to_read = meta_len as usize - block_data.len(); - let block_data = - collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; - Cow::Owned(block_data) - } else { - // Already have the bytes we need - let end_index = meta_len as usize + rest_of_bytes_start_index; - let block_data = &bytes[rest_of_bytes_start_index..end_index]; - Cow::Borrowed(block_data) - }; + let meta_len = i32::from_le_bytes([ + meta_len_bytes[0], + meta_len_bytes[1], + meta_len_bytes[2], + meta_len_bytes[3], + ]); + + if meta_len < 0 { + return Err(ArrowError::ParseError( + "IPC message metadata length is negative".to_string(), + ) + .into()); + } + + let bytes = extend_bytes_to_n_length_from_stream( + bytes, + preamble_len + 4 + (meta_len as usize), + &mut stream, + ) + .await?; - // Decode Schema message - let message = root_as_message(&block_data).map_err(|err| { - ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) + let message = root_as_message(&bytes[preamble_len + 4..]).map_err(|err| { + ArrowError::ParseError(format!("Unable to read IPC message metadata: {err:?}")) })?; - let ipc_schema = message.header_as_schema().ok_or_else(|| { - ArrowError::IpcError("Unable to read IPC message as schema".to_string()) + let fb_schema = message.header_as_schema().ok_or_else(|| { + ArrowError::IpcError("Unable to read IPC message schema".to_string()) })?; - let schema = fb_to_schema(ipc_schema); + let schema = fb_to_schema(fb_schema); Ok(Arc::new(schema)) } -async fn collect_at_least_n_bytes( - stream: &mut BoxStream<'static, object_store::Result>, +async fn extend_bytes_to_n_length_from_stream( + bytes: Vec, n: usize, - extend_from: Option>, + stream: &mut BoxStream<'static, object_store::Result>, ) -> Result> { - let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); - // If extending existing buffer then ensure we read n additional bytes - let n = n + buf.len(); - while let Some(bytes) = stream.next().await.transpose()? { - buf.extend_from_slice(&bytes); + if bytes.len() >= n { + return Ok(bytes); + } + + let mut buf = bytes; + + while let Some(b) = stream.next().await.transpose()? { + buf.extend_from_slice(&b); + if buf.len() >= n { break; } } + if buf.len() < n { return Err(ArrowError::ParseError( "Unexpected end of byte stream for Arrow IPC file".to_string(), ) .into()); } + Ok(buf) } From 7409462651dfedaf7a511c28960c867bdda0d783 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 20:51:44 -0600 Subject: [PATCH 11/26] Let's move the `into()` outside the parens --- datafusion/datasource-arrow/src/file_format.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 549301c0e2cb..f6e889e89c68 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -478,8 +478,8 @@ async fn extend_bytes_to_n_length_from_stream( if buf.len() < n { return Err(ArrowError::ParseError( "Unexpected end of byte stream for Arrow IPC file".to_string(), - ) - .into()); + )) + .into(); } Ok(buf) From 3f72b0c4cf2db0138c8eedab6293d74202e4e5cf Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 20:53:07 -0600 Subject: [PATCH 12/26] Err, no, on the inside --- datafusion/datasource-arrow/src/file_format.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index f6e889e89c68..549301c0e2cb 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -478,8 +478,8 @@ async fn extend_bytes_to_n_length_from_stream( if buf.len() < n { return Err(ArrowError::ParseError( "Unexpected end of byte stream for Arrow IPC file".to_string(), - )) - .into(); + ) + .into()); } Ok(buf) From c6d4a06823beabad14e13d2261c980da51791bf6 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 21:49:41 -0600 Subject: [PATCH 13/26] Also include a test for arrow stream source --- .../schema_adapter_integration_tests.rs | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 1f7c3a871de4..a990600e41fc 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -23,7 +23,7 @@ use bytes::{BufMut, BytesMut}; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ - ArrowFileSource, CsvSource, FileSource, JsonSource, ParquetSource, + ArrowFileSource, ArrowStreamSource, CsvSource, FileSource, JsonSource, ParquetSource, }; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -301,6 +301,25 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { ); } + // Test ArrowStreamSource + { + let source = ArrowStreamSource::default(); + let source_with_adapter = source + .clone() + .with_schema_adapter_factory(factory.clone()) + .unwrap(); + + let base_source: Arc = source.into(); + assert!(base_source.schema_adapter_factory().is_none()); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); + assert_eq!( + format!("{:?}", retrieved_factory.as_ref()), + format!("{:?}", factory.as_ref()) + ); + } + // Test ParquetSource #[cfg(feature = "parquet")] { From 4e28ef9d067b5479bc504585664e2fde3210721a Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Thu, 6 Nov 2025 22:46:05 -0600 Subject: [PATCH 14/26] Add a bunch more tests --- .../part=123/data.arrow | Bin 0 -> 1608 bytes .../part=456/data.arrow | Bin 0 -> 1608 bytes datafusion/core/tests/execution/mod.rs | 1 + .../core/tests/execution/register_arrow.rs | 90 +++++++++++ .../datasource-arrow/src/file_format.rs | 62 +++++++ datafusion/datasource-arrow/src/source.rs | 66 ++++++++ .../sqllogictest/test_files/arrow_files.slt | 152 ++++++++++++++++++ 7 files changed, 371 insertions(+) create mode 100644 datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow create mode 100644 datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow create mode 100644 datafusion/core/tests/execution/register_arrow.rs diff --git a/datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow new file mode 100644 index 0000000000000000000000000000000000000000..bad9e3de4a57fc4c0bd169f0275e9ca8b1b9d656 GIT binary patch literal 1608 zcmbtT&2G~`5T3T7L=Y7Ym530BXwMxS4Nu*H!FR0@95pJM+!XuV?DIuFsE%9ul8>bV38VPd#u%rxv3FmImPa9`A$y zen@nH+u;MNW1Q(5bR}pbQXQ-F*}`P2r7p#*(ff_~3=Etoq&`z(JQ7+i6#%`;GDu}5 ziwa_3d6R>UEUTCew;v!le>3Y`dADsZNvg7V*2Z$FV_D9sb$Nm!w-oy&3neR4*am0mypYA3 z`x(MN)M!S-=aXO(2=qYsz&pxl5=}y}rDM#W(-L8<=_6VbY>)_JCR zW--0T?k4&L9OFt!MD!E<5WIhL$L$5%^N+~B3wd8oK_~RUqJ6tRY&QbI_u&)#_u-|7 zZQ`Anf^c|coR5GxB801dj7l+U`yZ8n;+j3sb;ktKa z#{VYso4fqyPZM_P<)#l89sQ-3n_jvK$$ke`O}(SW-$h5Rz5Tvd13A0wt{MrudWr@ z^oyF6T-idEGM1%;`C2Fq^F&{;?}ztqpT8EHono3*hiWLn5# z#U0L9vM93e9POJbmPw`=4PTCeQ6SI-;ZpA?qfs;p#Rl_t3bJ}4j5d7+4M82yFU&j7 zHP0<3_|)D+e}H3TX&w>%2HywoZ0_0JV1ND*8M}~o<{WfJj||#1JH&ni5Mm!c!+#f1 zI@l=Qi!lg?*T%*Oh>)uO3z>8D=qdH-3GCU+#vuf-D`FPuwTs!|mFMcZ@{w{|cU)LeN d-iK@Ob$INy_f-D4t?6F7yVieKf1Y11>>p)(?fC!z literal 0 HcmV?d00001 diff --git a/datafusion/core/tests/execution/mod.rs b/datafusion/core/tests/execution/mod.rs index 8770b2a20105..f33ef87aa302 100644 --- a/datafusion/core/tests/execution/mod.rs +++ b/datafusion/core/tests/execution/mod.rs @@ -18,3 +18,4 @@ mod coop; mod datasource_split; mod logical_plan; +mod register_arrow; diff --git a/datafusion/core/tests/execution/register_arrow.rs b/datafusion/core/tests/execution/register_arrow.rs new file mode 100644 index 000000000000..4ce16dc0906c --- /dev/null +++ b/datafusion/core/tests/execution/register_arrow.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for register_arrow API + +use datafusion::{execution::options::ArrowReadOptions, prelude::*}; +use datafusion_common::Result; + +#[tokio::test] +async fn test_register_arrow_auto_detects_format() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_arrow( + "file_format", + "../../datafusion/datasource-arrow/tests/data/example.arrow", + ArrowReadOptions::default(), + ) + .await?; + + ctx.register_arrow( + "stream_format", + "../../datafusion/datasource-arrow/tests/data/example_stream.arrow", + ArrowReadOptions::default(), + ) + .await?; + + let file_result = ctx.sql("SELECT * FROM file_format ORDER BY f0").await?; + let stream_result = ctx.sql("SELECT * FROM stream_format ORDER BY f0").await?; + + let file_batches = file_result.collect().await?; + let stream_batches = stream_result.collect().await?; + + assert_eq!(file_batches.len(), stream_batches.len()); + assert_eq!(file_batches[0].schema(), stream_batches[0].schema()); + + let file_rows: usize = file_batches.iter().map(|b| b.num_rows()).sum(); + let stream_rows: usize = stream_batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(file_rows, stream_rows); + + Ok(()) +} + +#[tokio::test] +async fn test_register_arrow_join_file_and_stream() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_arrow( + "file_table", + "../../datafusion/datasource-arrow/tests/data/example.arrow", + ArrowReadOptions::default(), + ) + .await?; + + ctx.register_arrow( + "stream_table", + "../../datafusion/datasource-arrow/tests/data/example_stream.arrow", + ArrowReadOptions::default(), + ) + .await?; + + let result = ctx + .sql( + "SELECT a.f0, a.f1, b.f0, b.f1 + FROM file_table a + JOIN stream_table b ON a.f0 = b.f0 + WHERE a.f0 <= 2 + ORDER BY a.f0", + ) + .await?; + let batches = result.collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + Ok(()) +} diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 549301c0e2cb..c7c123f339d5 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -670,4 +670,66 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_format_detection_file_format() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("test.arrow"); + + let file_bytes = std::fs::read("tests/data/example.arrow")?; + store.put(&path, file_bytes.into()).await?; + + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + assert!(is_file, "Should detect file format"); + Ok(()) + } + + #[tokio::test] + async fn test_format_detection_stream_format() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("test_stream.arrow"); + + let stream_bytes = std::fs::read("tests/data/example_stream.arrow")?; + store.put(&path, stream_bytes.into()).await?; + + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + + assert!(!is_file, "Should detect stream format (not file)"); + + Ok(()) + } + + #[tokio::test] + async fn test_format_detection_corrupted_file() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("corrupted.arrow"); + + store + .put(&path, Bytes::from(vec![0x43, 0x4f, 0x52, 0x41]).into()) + .await?; + + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + + assert!( + !is_file, + "Corrupted file should not be detected as file format" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_format_detection_empty_file() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("empty.arrow"); + + store.put(&path, Bytes::new().into()).await?; + + let result = is_object_in_arrow_ipc_file_format(store.clone(), &path).await; + + // currently errors because it tries to read 0..6 from an empty file + assert!(result.is_err(), "Empty file should error"); + + Ok(()) + } } diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 701d0321ef54..e2a6d5c02242 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -399,6 +399,7 @@ impl FileOpener for ArrowFileOpener { mod tests { use std::{fs::File, io::Read}; + use arrow::datatypes::{DataType, Field, Schema}; use arrow_ipc::reader::{FileReader, StreamReader}; use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; @@ -534,4 +535,69 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_arrow_stream_repartitioning_not_supported() -> Result<()> { + let source = ArrowStreamSource::default(); + let schema = + Arc::new(Schema::new(vec![Field::new("f0", DataType::Int64, false)])); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + Arc::new(source.clone()) as Arc, + ) + .build(); + + for target_partitions in [2, 4, 8, 16] { + let result = + source.repartitioned(target_partitions, 1024 * 1024, None, &config)?; + + assert!( + result.is_none(), + "Stream format should not support repartitioning with {target_partitions} partitions", + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_stream_opener_with_projection() -> Result<()> { + let filename = "example_stream.arrow"; + let path = format!("tests/data/{filename}"); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new(filename, file_size); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let opener = ArrowStreamOpener { + object_store, + projection: Some(vec![0]), // just the first column + }; + + let mut stream = opener.open(partitioned_file)?.await?; + + if let Some(batch) = stream.next().await { + let batch = batch?; + assert_eq!( + batch.num_columns(), + 1, + "Projection should result in 1 column" + ); + } else { + panic!("Expected at least one batch"); + } + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index 5368871126af..6392c61c4b8c 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -201,3 +201,155 @@ SELECT * FROM arrow_stream WHERE f0 > 2 ORDER BY f0 ---- 3 baz false 4 NULL true + +# Test aggregations on stream format +query I +SELECT COUNT(*) FROM arrow_stream +---- +4 + +query I +SELECT SUM(f0) FROM arrow_stream +---- +10 + +query I +SELECT MAX(f0) FROM arrow_stream +---- +4 + +query I +SELECT MIN(f0) FROM arrow_stream WHERE f0 IS NOT NULL +---- +1 + +# Test aggregations on file format for comparison +query I +SELECT COUNT(*) FROM arrow_simple +---- +4 + +query I +SELECT SUM(f0) FROM arrow_simple +---- +10 + +# Test joins between file and stream formats +query ITBITB +SELECT a.f0, a.f1, a.f2, b.f0, b.f1, b.f2 +FROM arrow_simple a +JOIN arrow_stream b ON a.f0 = b.f0 +WHERE a.f0 <= 2 +ORDER BY a.f0 +---- +1 foo true 1 foo true +2 bar NULL 2 bar NULL + +# Test that both formats work in UNION +query ITB +SELECT * FROM arrow_simple WHERE f0 = 1 +UNION ALL +SELECT * FROM arrow_stream WHERE f0 = 2 +ORDER BY f0 +---- +1 foo true +2 bar NULL + +# Test GROUP BY on stream format +query BI +SELECT f2, COUNT(*) as cnt FROM arrow_stream GROUP BY f2 ORDER BY f2 +---- +false 1 +true 2 +NULL 1 + +# Test DISTINCT on stream format +query B +SELECT DISTINCT f2 FROM arrow_stream ORDER BY f2 +---- +false +true +NULL + +# Test subquery with stream format +query I +SELECT f0 FROM arrow_simple WHERE f0 IN (SELECT f0 FROM arrow_stream WHERE f0 < 3) ORDER BY f0 +---- +1 +2 + +# ARROW partitioned table (stream format) +statement ok +CREATE EXTERNAL TABLE arrow_partitioned_stream ( + part Int, + f0 Bigint, + f1 String, + f2 Boolean +) +STORED AS ARROW +LOCATION '../core/tests/data/partitioned_table_arrow_stream/' +PARTITIONED BY (part); + +# select wildcard +query ITBI +SELECT * FROM arrow_partitioned_stream ORDER BY f0; +---- +1 foo true 123 +2 bar false 123 +3 baz true 456 +4 NULL NULL 456 + +# select all fields +query IITB +SELECT part, f0, f1, f2 FROM arrow_partitioned_stream ORDER BY f0; +---- +123 1 foo true +123 2 bar false +456 3 baz true +456 4 NULL NULL + +# select without partition column +query IB +SELECT f0, f2 FROM arrow_partitioned_stream ORDER BY f0 +---- +1 true +2 false +3 true +4 NULL + +# select only partition column +query I +SELECT part FROM arrow_partitioned_stream ORDER BY part +---- +123 +123 +456 +456 + +# select without any table-related columns in projection +query I +SELECT 1 FROM arrow_partitioned_stream +---- +1 +1 +1 +1 + +# select with partition filter +query I +SELECT f0 FROM arrow_partitioned_stream WHERE part = 123 ORDER BY f0 +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT f0 FROM arrow_partitioned_stream WHERE part = 456 +---- +logical_plan TableScan: arrow_partitioned_stream projection=[f0], full_filters=[arrow_partitioned_stream.part = Int32(456)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow]]}, projection=[f0], file_type=arrow_stream + + +# Errors in partition filters should be reported +query error Divide by zero error +SELECT f0 FROM arrow_partitioned_stream WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1; From 589a1d26f25859f998167b802c1c3c9371a39d2a Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 21:47:48 -0600 Subject: [PATCH 15/26] Document rename of `ArrowSource` and addition of `ArrowStreamSource` --- docs/source/library-user-guide/upgrading.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 0b227000f73d..0d182a4f29d7 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,6 +125,14 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. +### Renaming of `ArrowSource` to `ArrowFileSource` and adding `ArrowStreamSource` + +Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To better support this feature, `ArrowSource` has been split into two separate sources: `ArrowFileSource` and `ArrowStreamSource`. + +See [issue #16688] for details. + +[issue #16688]: https://github.com/apache/datafusion/issues/16688 + ### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs` The `projection` field in `FileScanConfig` has been renamed to `projection_exprs` and its type has changed from `Option>` to `Option`. This change enables more powerful projection pushdown capabilities by supporting arbitrary physical expressions rather than just column indices. From 0ac2f6d9ae53f1e34c717278382b8427defb262f Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 21:53:33 -0600 Subject: [PATCH 16/26] Rename ArrowStreamSource to ArrowStreamFileSource --- .../core/src/datasource/physical_plan/mod.rs | 4 +++- .../schema_adapter_integration_tests.rs | 7 ++++--- datafusion/datasource-arrow/src/file_format.rs | 4 ++-- datafusion/datasource-arrow/src/source.rs | 14 +++++++------- datafusion/datasource/src/file.rs | 2 +- docs/source/library-user-guide/upgrading.md | 4 ++-- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 13b2c26353ab..948c5bc2eb15 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -37,7 +37,9 @@ pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFac pub use json::{JsonOpener, JsonSource}; -pub use arrow::{ArrowFileOpener, ArrowFileSource, ArrowStreamOpener, ArrowStreamSource}; +pub use arrow::{ + ArrowFileOpener, ArrowFileSource, ArrowStreamFileSource, ArrowStreamOpener, +}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index a990600e41fc..87fdf8b4d20d 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -23,7 +23,8 @@ use bytes::{BufMut, BytesMut}; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ - ArrowFileSource, ArrowStreamSource, CsvSource, FileSource, JsonSource, ParquetSource, + ArrowFileSource, ArrowStreamFileSource, CsvSource, FileSource, JsonSource, + ParquetSource, }; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -301,9 +302,9 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { ); } - // Test ArrowStreamSource + // Test ArrowStreamFileSource { - let source = ArrowStreamSource::default(); + let source = ArrowStreamFileSource::default(); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index c7c123f339d5..ae0aaf5d50b3 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -49,7 +49,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use crate::source::{ArrowFileSource, ArrowStreamSource}; +use crate::source::{ArrowFileSource, ArrowStreamFileSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::file_compression_type::FileCompressionType; @@ -201,7 +201,7 @@ impl FileFormat for ArrowFormat { match is_object_in_arrow_ipc_file_format(object_store, object_location).await { Ok(true) => Arc::new(ArrowFileSource::default()), - Ok(false) => Arc::new(ArrowStreamSource::default()), + Ok(false) => Arc::new(ArrowStreamFileSource::default()), Err(e) => Err(e)?, }; diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index e2a6d5c02242..49d3878d06b9 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -119,19 +119,19 @@ impl FileSource for ArrowFileSource { /// Arrow IPC Stream format source - supports only sequential reading #[derive(Clone, Default)] -pub struct ArrowStreamSource { +pub struct ArrowStreamFileSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, } -impl From for Arc { - fn from(source: ArrowStreamSource) -> Self { +impl From for Arc { + fn from(source: ArrowStreamFileSource) -> Self { as_file_source(source) } } -impl FileSource for ArrowStreamSource { +impl FileSource for ArrowStreamFileSource { fn create_file_opener( &self, object_store: Arc, @@ -432,7 +432,7 @@ mod tests { }; let source: Arc = if filename.contains("stream") { - Arc::new(ArrowStreamSource::default()) + Arc::new(ArrowStreamFileSource::default()) } else { Arc::new(ArrowFileSource::default()) }; @@ -520,7 +520,7 @@ mod tests { let schema = StreamReader::try_new(File::open(path_str)?, None)?.schema(); - let source = Arc::new(ArrowStreamSource::default()); + let source = Arc::new(ArrowStreamFileSource::default()); let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), @@ -538,7 +538,7 @@ mod tests { #[tokio::test] async fn test_arrow_stream_repartitioning_not_supported() -> Result<()> { - let source = ArrowStreamSource::default(); + let source = ArrowStreamFileSource::default(); let schema = Arc::new(Schema::new(vec![Field::new("f0", DataType::Int64, false)])); diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 2f9708228181..27f9e4fb6fe1 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -45,7 +45,7 @@ pub fn as_file_source(source: T) -> Arc /// /// See more details on specific implementations: /// * [`ArrowFileSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowFileSource.html) -/// * [`ArrowStreamSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowStreamSource.html) +/// * [`ArrowStreamFileSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowStreamFileSource.html) /// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html) /// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html) /// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 0d182a4f29d7..1312d50fc4a5 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,9 +125,9 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. -### Renaming of `ArrowSource` to `ArrowFileSource` and adding `ArrowStreamSource` +### Renaming of `ArrowSource` to `ArrowFileSource` and adding `ArrowStreamFileSource` -Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To better support this feature, `ArrowSource` has been split into two separate sources: `ArrowFileSource` and `ArrowStreamSource`. +Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To better support this feature, `ArrowSource` has been split into two separate sources: `ArrowFileSource` and `ArrowStreamFileSource`. See [issue #16688] for details. From c8e8bd1b8bb6b656072888ef332072bb848d6c53 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 22:56:56 -0600 Subject: [PATCH 17/26] Keep same public interface but switch between formats --- .../core/src/datasource/physical_plan/mod.rs | 4 +- .../schema_adapter_integration_tests.rs | 7 +- .../datasource-arrow/src/file_format.rs | 8 +- datafusion/datasource-arrow/src/source.rs | 125 ++++++++++++++++-- datafusion/datasource/src/file.rs | 3 +- datafusion/datasource/src/source.rs | 4 +- docs/source/library-user-guide/upgrading.md | 4 +- 7 files changed, 127 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 948c5bc2eb15..1ac292e260fd 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -37,9 +37,7 @@ pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFac pub use json::{JsonOpener, JsonSource}; -pub use arrow::{ - ArrowFileOpener, ArrowFileSource, ArrowStreamFileSource, ArrowStreamOpener, -}; +pub use arrow::{ArrowOpener, ArrowSource}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 87fdf8b4d20d..a80cc48a46ce 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -23,8 +23,7 @@ use bytes::{BufMut, BytesMut}; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ - ArrowFileSource, ArrowStreamFileSource, CsvSource, FileSource, JsonSource, - ParquetSource, + ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource, }; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; @@ -285,7 +284,7 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // Test ArrowFileSource { - let source = ArrowFileSource::default(); + let source = ArrowSource::default_file_source(); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) @@ -304,7 +303,7 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // Test ArrowStreamFileSource { - let source = ArrowStreamFileSource::default(); + let source = ArrowSource::default_stream_file_source(); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index ae0aaf5d50b3..8bc52d3914d1 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -49,7 +49,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use crate::source::{ArrowFileSource, ArrowStreamFileSource}; +use crate::source::ArrowSource; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::file_compression_type::FileCompressionType; @@ -200,8 +200,8 @@ impl FileFormat for ArrowFormat { let source: Arc = match is_object_in_arrow_ipc_file_format(object_store, object_location).await { - Ok(true) => Arc::new(ArrowFileSource::default()), - Ok(false) => Arc::new(ArrowStreamFileSource::default()), + Ok(true) => Arc::new(ArrowSource::default_file_source()), + Ok(false) => Arc::new(ArrowSource::default_stream_file_source()), Err(e) => Err(e)?, }; @@ -231,7 +231,7 @@ impl FileFormat for ArrowFormat { fn file_source(&self) -> Arc { // defaulting to the file format source since it's // more capable in general - Arc::new(ArrowFileSource::default()) + ArrowSource::default_file_source().into() } } diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 49d3878d06b9..bb91ba5c5fa9 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -40,7 +40,7 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; /// Arrow IPC File format source - supports range-based parallel reading /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow #[derive(Clone, Default)] -pub struct ArrowFileSource { +pub(crate) struct ArrowFileSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, @@ -119,7 +119,7 @@ impl FileSource for ArrowFileSource { /// Arrow IPC Stream format source - supports only sequential reading #[derive(Clone, Default)] -pub struct ArrowStreamFileSource { +pub(crate) struct ArrowStreamFileSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, @@ -138,7 +138,7 @@ impl FileSource for ArrowStreamFileSource { base_config: &FileScanConfig, _partition: usize, ) -> Arc { - Arc::new(ArrowStreamOpener { + Arc::new(ArrowStreamFileOpener { object_store, projection: base_config.file_column_projection_indices(), }) @@ -214,12 +214,12 @@ impl FileSource for ArrowStreamFileSource { } /// FileOpener for Arrow IPC Stream format - always reads sequentially -pub struct ArrowStreamOpener { - pub object_store: Arc, - pub projection: Option>, +pub(crate) struct ArrowStreamFileOpener { + object_store: Arc, + projection: Option>, } -impl FileOpener for ArrowStreamOpener { +impl FileOpener for ArrowStreamFileOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { if partitioned_file.range.is_some() { return Err(exec_datafusion_err!( @@ -255,9 +255,9 @@ impl FileOpener for ArrowStreamOpener { } /// The struct arrow that implements `[FileOpener]` trait for Arrow IPC File format -pub struct ArrowFileOpener { - pub object_store: Arc, - pub projection: Option>, +pub(crate) struct ArrowFileOpener { + object_store: Arc, + projection: Option>, } impl FileOpener for ArrowFileOpener { @@ -395,6 +395,109 @@ impl FileOpener for ArrowFileOpener { } } +#[derive(Clone)] +pub struct ArrowSource { + pub inner: Arc, +} + +impl Default for ArrowSource { + fn default() -> Self { + Self::default_file_source() + } +} + +impl ArrowSource { + pub fn new(inner: Arc) -> Self { + Self { inner } + } + + pub fn default_file_source() -> Self { + Self { + inner: Arc::new(ArrowFileSource::default()), + } + } + + pub fn default_stream_file_source() -> Self { + Self { + inner: Arc::new(ArrowStreamFileSource::default()), + } + } +} + +impl FileSource for ArrowSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Arc { + self.inner + .create_file_opener(object_store, base_config, partition) + } + + fn as_any(&self) -> &dyn Any { + self.inner.as_any() + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + Arc::new(Self { + inner: self.inner.with_batch_size(batch_size), + }) + } + + fn with_schema(&self, schema: TableSchema) -> Arc { + Arc::new(Self { + inner: self.inner.with_schema(schema), + }) + } + + fn with_projection(&self, config: &FileScanConfig) -> Arc { + Arc::new(Self { + inner: self.inner.with_projection(config), + }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(Self { + inner: self.inner.with_statistics(statistics), + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + self.inner.metrics() + } + + fn statistics(&self) -> Result { + self.inner.statistics() + } + + fn file_type(&self) -> &str { + self.inner.file_type() + } +} + +pub struct ArrowOpener { + pub inner: Arc, +} + +impl FileOpener for ArrowOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + self.inner.open(partitioned_file) + } +} + +impl ArrowOpener { + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + +impl From for Arc { + fn from(source: ArrowSource) -> Self { + as_file_source(source) + } +} + #[cfg(test)] mod tests { use std::{fs::File, io::Read}; @@ -580,7 +683,7 @@ mod tests { .put(&partitioned_file.object_meta.location, bytes.into()) .await?; - let opener = ArrowStreamOpener { + let opener = ArrowStreamFileOpener { object_store, projection: Some(vec![0]), // just the first column }; diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 27f9e4fb6fe1..d6ade3b8b210 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -44,8 +44,7 @@ pub fn as_file_source(source: T) -> Arc /// file format specific behaviors for elements in [`DataSource`] /// /// See more details on specific implementations: -/// * [`ArrowFileSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowFileSource.html) -/// * [`ArrowStreamFileSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowStreamFileSource.html) +/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html) /// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html) /// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html) /// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index fb2f1c00454d..e9639901590c 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -95,7 +95,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ ┊ ┊ /// │ ┊ ┊ /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ -/// │ │ │ ArrowFileSource │ +/// │ │ │ ArrowSource │ /// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │ /// │ │ │ ParquetSource │ /// └─────────────────────┘ └─────────────────────┘ @@ -104,7 +104,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ /// │ /// ┌──────────▼──────────┐ -/// │ ArrowFileSource │ +/// │ ArrowSource │ /// │ ... │ /// │ ParquetSource │ /// └─────────────────────┘ diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 1312d50fc4a5..ab6b81d831bc 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,9 +125,9 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. -### Renaming of `ArrowSource` to `ArrowFileSource` and adding `ArrowStreamFileSource` +### Added Arrow IPC stream format support -Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To better support this feature, `ArrowSource` has been split into two separate sources: `ArrowFileSource` and `ArrowStreamFileSource`. +Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). See [issue #16688] for details. From 6f01b80a43e63d2bbc5a519c9533c66afee2116d Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:00:47 -0600 Subject: [PATCH 18/26] whitespace fixes --- datafusion/datasource/src/source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index e9639901590c..11a8a3867b80 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -95,7 +95,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ ┊ ┊ /// │ ┊ ┊ /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ -/// │ │ │ ArrowSource │ +/// │ │ │ ArrowSource │ /// │ FileSource(trait) ◄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄│ ... │ /// │ │ │ ParquetSource │ /// └─────────────────────┘ └─────────────────────┘ @@ -104,7 +104,7 @@ use datafusion_physical_plan::filter_pushdown::{ /// │ /// │ /// ┌──────────▼──────────┐ -/// │ ArrowSource │ +/// │ ArrowSource │ /// │ ... │ /// │ ParquetSource │ /// └─────────────────────┘ From 6b5ba4bf028fc8a4c684fcc2a6781c4b87091d5c Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:01:37 -0600 Subject: [PATCH 19/26] Remove note from upgrade guide --- docs/source/library-user-guide/upgrading.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index ab6b81d831bc..d6ef5c3d32df 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,12 +125,6 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. -### Added Arrow IPC stream format support - -Support has been added for querying files in the [Arrow IPC stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). - -See [issue #16688] for details. - [issue #16688]: https://github.com/apache/datafusion/issues/16688 ### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs` From 2e857c7028b04ae2e83bbfe407212c6efc1b96d5 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:19:44 -0600 Subject: [PATCH 20/26] Improve the docs a little --- datafusion/datasource-arrow/src/file_format.rs | 18 +++--------------- datafusion/datasource-arrow/src/mod.rs | 2 ++ datafusion/datasource-arrow/src/source.rs | 15 ++++++++++----- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 8bc52d3914d1..451ba6441fea 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -73,8 +73,8 @@ const INITIAL_BUFFER_BYTES: usize = 1048576; /// If the buffered Arrow data exceeds this size, it is flushed to object store const BUFFER_FLUSH_BYTES: usize = 1024000; +/// Factory struct used to create [`ArrowFormat`] #[derive(Default, Debug)] -/// Factory struct used to create [ArrowFormat] pub struct ArrowFormatFactory; impl ArrowFormatFactory { @@ -109,7 +109,7 @@ impl GetExt for ArrowFormatFactory { } } -/// Arrow `FileFormat` implementation. +/// Arrow [`FileFormat`] implementation. #[derive(Default, Debug)] pub struct ArrowFormat; @@ -235,7 +235,7 @@ impl FileFormat for ArrowFormat { } } -/// Implements [`FileSink`] for writing to arrow_ipc files +/// Implements [`FileSink`] for Arrow IPC files struct ArrowFileSink { config: FileSinkConfig, } @@ -381,18 +381,6 @@ const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; async fn infer_stream_schema( mut stream: BoxStream<'static, object_store::Result>, ) -> Result { - // Expected IPC format is either: - // - // stream: - // - 4 bytes (added in v0.15.0+) - // - 4 bytes - // - // - // - // file: - // - 6 bytes - // - 2 bytes - // // 16 bytes covers the preamble and metadata length no matter // which version or format is used diff --git a/datafusion/datasource-arrow/src/mod.rs b/datafusion/datasource-arrow/src/mod.rs index 18bb8792c3ff..0f38579d6f0c 100644 --- a/datafusion/datasource-arrow/src/mod.rs +++ b/datafusion/datasource-arrow/src/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`ArrowFormat`]: Apache Arrow file format abstractions + // Make sure fast / cheap clones on Arc are explicit: // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index bb91ba5c5fa9..f2f2b77cfe89 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Execution plan for reading Arrow IPC files + use std::sync::Arc; use std::{any::Any, io::Cursor}; @@ -37,8 +39,7 @@ use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; -/// Arrow IPC File format source - supports range-based parallel reading -/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow +/// `FileSource` for Arrow IPC file format. Supports range-based parallel reading. #[derive(Clone, Default)] pub(crate) struct ArrowFileSource { metrics: ExecutionPlanMetricsSet, @@ -117,7 +118,7 @@ impl FileSource for ArrowFileSource { } } -/// Arrow IPC Stream format source - supports only sequential reading +/// `FileSource` for Arrow IPC stream format. Supports only sequential reading. #[derive(Clone, Default)] pub(crate) struct ArrowStreamFileSource { metrics: ExecutionPlanMetricsSet, @@ -213,7 +214,6 @@ impl FileSource for ArrowStreamFileSource { } } -/// FileOpener for Arrow IPC Stream format - always reads sequentially pub(crate) struct ArrowStreamFileOpener { object_store: Arc, projection: Option>, @@ -254,7 +254,6 @@ impl FileOpener for ArrowStreamFileOpener { } } -/// The struct arrow that implements `[FileOpener]` trait for Arrow IPC File format pub(crate) struct ArrowFileOpener { object_store: Arc, projection: Option>, @@ -395,6 +394,7 @@ impl FileOpener for ArrowFileOpener { } } +/// `FileSource` wrapper for both Arrow IPC file and stream formats #[derive(Clone)] pub struct ArrowSource { pub inner: Arc, @@ -407,16 +407,19 @@ impl Default for ArrowSource { } impl ArrowSource { + /// Creates a new [`ArrowSource`] pub fn new(inner: Arc) -> Self { Self { inner } } + /// Creates an [`ArrowSource`] for file format pub fn default_file_source() -> Self { Self { inner: Arc::new(ArrowFileSource::default()), } } + /// Creates an [`ArrowSource`] for stream format pub fn default_stream_file_source() -> Self { Self { inner: Arc::new(ArrowStreamFileSource::default()), @@ -476,6 +479,7 @@ impl FileSource for ArrowSource { } } +/// `FileOpener` wrapper for both Arrow IPC file and stream formats pub struct ArrowOpener { pub inner: Arc, } @@ -487,6 +491,7 @@ impl FileOpener for ArrowOpener { } impl ArrowOpener { + /// Creates a new [`ArrowOpener`] pub fn new(inner: Arc) -> Self { Self { inner } } From c67c881e063a19b2dd042841a695b0f5265ab296 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:23:31 -0600 Subject: [PATCH 21/26] Add a note about the confusing naming --- datafusion/datasource-arrow/src/source.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index f2f2b77cfe89..e9aa7b84e40d 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -16,6 +16,18 @@ // under the License. //! Execution plan for reading Arrow IPC files +//! +//! # Naming Note +//! +//! The naming in this module can be confusing: +//! - `ArrowFileSource` / `ArrowFileOpener` handle the Arrow IPC **file format** +//! (with footer, supports parallel reading) +//! - `ArrowStreamFileSource` / `ArrowStreamFileOpener` handle the Arrow IPC **stream format** +//! (without footer, sequential only) +//! +//! Despite the name "ArrowStreamFileSource", it still reads from files - the "Stream" +//! refers to the Arrow IPC stream format, not streaming I/O. Both formats can be stored +//! in files on disk or object storage. use std::sync::Arc; use std::{any::Any, io::Cursor}; From 882e9faf40035829218d378d946632d2601dc7f5 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:25:44 -0600 Subject: [PATCH 22/26] Stray line in the upgrade doc --- docs/source/library-user-guide/upgrading.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index d6ef5c3d32df..0b227000f73d 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -125,8 +125,6 @@ Users may need to update their paths to account for these changes. See [issue #17713] for more details. -[issue #16688]: https://github.com/apache/datafusion/issues/16688 - ### `FileScanConfig::projection` renamed to `FileScanConfig::projection_exprs` The `projection` field in `FileScanConfig` has been renamed to `projection_exprs` and its type has changed from `Option>` to `Option`. This change enables more powerful projection pushdown capabilities by supporting arbitrary physical expressions rather than just column indices. From 9144d9ef4efe691dcb07c8059040208229364ae7 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:35:42 -0600 Subject: [PATCH 23/26] Add initializers and fill out shared interface --- datafusion/datasource-arrow/src/source.rs | 57 ++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index e9aa7b84e40d..4313a2cdfb56 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -43,6 +43,7 @@ use datafusion_common::{exec_datafusion_err, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_datasource::file_stream::FileOpenFuture; @@ -183,7 +184,7 @@ impl FileSource for ArrowStreamFileSource { &self, _target_partitions: usize, _repartition_file_min_size: usize, - _output_ordering: Option, + _output_ordering: Option, _config: &FileScanConfig, ) -> Result> { // Stream format doesn't support range-based parallel reading @@ -489,6 +490,36 @@ impl FileSource for ArrowSource { fn file_type(&self) -> &str { self.inner.file_type() } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Result> { + Ok(Arc::new(Self { + inner: self + .inner + .with_schema_adapter_factory(schema_adapter_factory)?, + })) + } + + fn schema_adapter_factory(&self) -> Option> { + self.inner.schema_adapter_factory() + } + + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, + ) -> Result> { + self.inner.repartitioned( + target_partitions, + repartition_file_min_size, + output_ordering, + config, + ) + } } /// `FileOpener` wrapper for both Arrow IPC file and stream formats @@ -507,6 +538,30 @@ impl ArrowOpener { pub fn new(inner: Arc) -> Self { Self { inner } } + + pub fn new_file_opener( + object_store: Arc, + projection: Option>, + ) -> Self { + Self { + inner: Arc::new(ArrowFileOpener { + object_store, + projection, + }), + } + } + + pub fn new_stream_file_opener( + object_store: Arc, + projection: Option>, + ) -> Self { + Self { + inner: Arc::new(ArrowStreamFileOpener { + object_store, + projection, + }), + } + } } impl From for Arc { From d5122f0d79d0638a83f9cbb07db38a1e01402dbb Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sat, 8 Nov 2025 23:38:54 -0600 Subject: [PATCH 24/26] Formatting --- datafusion/datasource-arrow/src/file_format.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 451ba6441fea..3d18560ae1d9 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -381,7 +381,6 @@ const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; async fn infer_stream_schema( mut stream: BoxStream<'static, object_store::Result>, ) -> Result { - // 16 bytes covers the preamble and metadata length no matter // which version or format is used let bytes = extend_bytes_to_n_length_from_stream(vec![], 16, &mut stream).await?; From 8b314a5698c8008314ee876c193dfc1fb7a2b738 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 9 Nov 2025 01:44:56 -0600 Subject: [PATCH 25/26] Type error and linting fix --- .../tests/schema_adapter/schema_adapter_integration_tests.rs | 2 +- datafusion/datasource-arrow/src/source.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 5131fa650b6f..191529816481 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -289,7 +289,7 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let table_schema = TableSchema::new(schema, vec![]); - let source = ArrowSource::new(table_schema); + let source = ArrowSource::new_file_source(table_schema); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index edd637bbabe2..9be58a99418f 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -535,7 +535,7 @@ impl FileSource for ArrowSource { } fn table_schema(&self) -> &TableSchema { - &self.inner.table_schema() + self.inner.table_schema() } } From 20c8377e31d176393b8644fe8f163ac1c1f8f0f6 Mon Sep 17 00:00:00 2001 From: Cora Sutton Date: Sun, 9 Nov 2025 01:47:25 -0600 Subject: [PATCH 26/26] Docs --- datafusion/datasource-arrow/src/source.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 9be58a99418f..b1c7eb81d1a3 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -252,6 +252,7 @@ impl FileSource for ArrowStreamFileSource { } } +/// `FileOpener` for Arrow IPC stream format. Supports only sequential reading. pub(crate) struct ArrowStreamFileOpener { object_store: Arc, projection: Option>, @@ -292,6 +293,7 @@ impl FileOpener for ArrowStreamFileOpener { } } +/// `FileOpener` for Arrow IPC file format. Supports range-based parallel reading. pub(crate) struct ArrowFileOpener { object_store: Arc, projection: Option>,