diff --git a/datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow new file mode 100644 index 000000000000..bad9e3de4a57 Binary files /dev/null and b/datafusion/core/tests/data/partitioned_table_arrow_stream/part=123/data.arrow differ diff --git a/datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow b/datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow new file mode 100644 index 000000000000..4a07fbfa47f3 Binary files /dev/null and b/datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow differ diff --git a/datafusion/core/tests/execution/mod.rs b/datafusion/core/tests/execution/mod.rs index 8770b2a20105..f33ef87aa302 100644 --- a/datafusion/core/tests/execution/mod.rs +++ b/datafusion/core/tests/execution/mod.rs @@ -18,3 +18,4 @@ mod coop; mod datasource_split; mod logical_plan; +mod register_arrow; diff --git a/datafusion/core/tests/execution/register_arrow.rs b/datafusion/core/tests/execution/register_arrow.rs new file mode 100644 index 000000000000..4ce16dc0906c --- /dev/null +++ b/datafusion/core/tests/execution/register_arrow.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for register_arrow API + +use datafusion::{execution::options::ArrowReadOptions, prelude::*}; +use datafusion_common::Result; + +#[tokio::test] +async fn test_register_arrow_auto_detects_format() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_arrow( + "file_format", + "../../datafusion/datasource-arrow/tests/data/example.arrow", + ArrowReadOptions::default(), + ) + .await?; + + ctx.register_arrow( + "stream_format", + "../../datafusion/datasource-arrow/tests/data/example_stream.arrow", + ArrowReadOptions::default(), + ) + .await?; + + let file_result = ctx.sql("SELECT * FROM file_format ORDER BY f0").await?; + let stream_result = ctx.sql("SELECT * FROM stream_format ORDER BY f0").await?; + + let file_batches = file_result.collect().await?; + let stream_batches = stream_result.collect().await?; + + assert_eq!(file_batches.len(), stream_batches.len()); + assert_eq!(file_batches[0].schema(), stream_batches[0].schema()); + + let file_rows: usize = file_batches.iter().map(|b| b.num_rows()).sum(); + let stream_rows: usize = stream_batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(file_rows, stream_rows); + + Ok(()) +} + +#[tokio::test] +async fn test_register_arrow_join_file_and_stream() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_arrow( + "file_table", + "../../datafusion/datasource-arrow/tests/data/example.arrow", + ArrowReadOptions::default(), + ) + .await?; + + ctx.register_arrow( + "stream_table", + "../../datafusion/datasource-arrow/tests/data/example_stream.arrow", + ArrowReadOptions::default(), + ) + .await?; + + let result = ctx + .sql( + "SELECT a.f0, a.f1, b.f0, b.f1 + FROM file_table a + JOIN stream_table b ON a.f0 = b.f0 + WHERE a.f0 <= 2 + ORDER BY a.f0", + ) + .await?; + let batches = result.collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + Ok(()) +} diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs index 0b093485c1ce..191529816481 100644 --- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -284,12 +284,12 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> { // Create a test factory let factory = Arc::new(UppercaseAdapterFactory {}); - // Test ArrowSource + // Test ArrowFileSource { let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); let table_schema = TableSchema::new(schema, vec![]); - let source = ArrowSource::new(table_schema); + let source = ArrowSource::new_file_source(table_schema); let source_with_adapter = source .clone() .with_schema_adapter_factory(factory.clone()) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index dc1f5cf72da7..594517177164 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -20,15 +20,15 @@ //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) use std::any::Any; -use std::borrow::Cow; use std::collections::HashMap; use std::fmt::{self, Debug}; +use std::io::{Seek, SeekFrom}; use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; -use arrow::ipc::reader::FileReader; +use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::ipc::writer::IpcWriteOptions; use arrow::ipc::{root_as_message, CompressionType}; use datafusion_common::error::Result; @@ -62,7 +62,9 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use futures::stream::BoxStream; use futures::StreamExt; -use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; +use object_store::{ + path::Path, GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, +}; use tokio::io::AsyncWriteExt; /// Initial writing buffer size. Note this is just a size hint for efficiency. It @@ -72,8 +74,8 @@ const INITIAL_BUFFER_BYTES: usize = 1048576; /// If the buffered Arrow data exceeds this size, it is flushed to object store const BUFFER_FLUSH_BYTES: usize = 1024000; +/// Factory struct used to create [`ArrowFormat`] #[derive(Default, Debug)] -/// Factory struct used to create [ArrowFormat] pub struct ArrowFormatFactory; impl ArrowFormatFactory { @@ -108,7 +110,7 @@ impl GetExt for ArrowFormatFactory { } } -/// Arrow `FileFormat` implementation. +/// Arrow [`FileFormat`] implementation. #[derive(Default, Debug)] pub struct ArrowFormat; @@ -151,12 +153,18 @@ impl FileFormat for ArrowFormat { let schema = match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let reader = FileReader::try_new(&mut file, None)?; - reader.schema() - } - GetResultPayload::Stream(stream) => { - infer_schema_from_file_stream(stream).await? + match FileReader::try_new(&mut file, None) { + Ok(reader) => reader.schema(), + Err(_) => { + // not in the file format, but FileReader read some bytes + // while trying to parse the file and so we need to rewind + // it to the beginning of the file + file.seek(SeekFrom::Start(0))?; + StreamReader::try_new(&mut file, None)?.schema() + } + } } + GetResultPayload::Stream(stream) => infer_stream_schema(stream).await?, }; schemas.push(schema.as_ref().clone()); } @@ -176,14 +184,33 @@ impl FileFormat for ArrowFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result> { + let object_store = state.runtime_env().object_store(&conf.object_store_url)?; + let object_location = &conf + .file_groups + .first() + .ok_or_else(|| internal_datafusion_err!("No files found in file group"))? + .files() + .first() + .ok_or_else(|| internal_datafusion_err!("No files found in file group"))? + .object_meta + .location; + let table_schema = TableSchema::new( Arc::clone(conf.file_schema()), conf.table_partition_cols().clone(), ); - let source = Arc::new(ArrowSource::new(table_schema)); + + let source: Arc = + match is_object_in_arrow_ipc_file_format(object_store, object_location).await + { + Ok(true) => Arc::new(ArrowSource::new_file_source(table_schema)), + Ok(false) => Arc::new(ArrowSource::new_stream_file_source(table_schema)), + Err(e) => Err(e)?, + }; + let config = FileScanConfigBuilder::from(conf) .with_source(source) .build(); @@ -208,11 +235,11 @@ impl FileFormat for ArrowFormat { } fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(ArrowSource::new(table_schema)) + Arc::new(ArrowSource::new_file_source(table_schema)) } } -/// Implements [`FileSink`] for writing to arrow_ipc files +/// Implements [`FileSink`] for Arrow IPC files struct ArrowFileSink { config: FileSinkConfig, } @@ -349,94 +376,122 @@ impl DataSink for ArrowFileSink { } } +// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. +// See + const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; -/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. -/// See -async fn infer_schema_from_file_stream( +async fn infer_stream_schema( mut stream: BoxStream<'static, object_store::Result>, ) -> Result { - // Expected format: - // - 6 bytes - // - 2 bytes - // - 4 bytes, not present below v0.15.0 - // - 4 bytes - // - // - - // So in first read we need at least all known sized sections, - // which is 6 + 2 + 4 + 4 = 16 bytes. - let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; - - // Files should start with these magic bytes - if bytes[0..6] != ARROW_MAGIC { - return Err(ArrowError::ParseError( - "Arrow file does not contain correct header".to_string(), - ))?; - } - - // Since continuation marker bytes added in later versions - let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { - (&bytes[12..16], 16) + // 16 bytes covers the preamble and metadata length no matter + // which version or format is used + let bytes = extend_bytes_to_n_length_from_stream(vec![], 16, &mut stream).await?; + + // The preamble length is everything before the metadata length + let preamble_len = if bytes[0..6] == ARROW_MAGIC { + // File format starts with magic number "ARROW1" + if bytes[8..12] == CONTINUATION_MARKER { + // Continuation marker was added in v0.15.0 + 12 + } else { + // File format before v0.15.0 + 8 + } + } else if bytes[0..4] == CONTINUATION_MARKER { + // Stream format after v0.15.0 starts with continuation marker + 4 } else { - (&bytes[8..12], 12) + // Stream format before v0.15.0 does not have a preamble + 0 }; - let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; - let meta_len = i32::from_le_bytes(meta_len); - - // Read bytes for Schema message - let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { - // Need to read more bytes to decode Message - let mut block_data = Vec::with_capacity(meta_len as usize); - // In case we had some spare bytes in our initial read chunk - block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); - let size_to_read = meta_len as usize - block_data.len(); - let block_data = - collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; - Cow::Owned(block_data) - } else { - // Already have the bytes we need - let end_index = meta_len as usize + rest_of_bytes_start_index; - let block_data = &bytes[rest_of_bytes_start_index..end_index]; - Cow::Borrowed(block_data) - }; + let meta_len_bytes: [u8; 4] = bytes[preamble_len..preamble_len + 4] + .try_into() + .map_err(|err| { + ArrowError::ParseError(format!( + "Unable to read IPC message metadata length: {err:?}" + )) + })?; + + let meta_len = i32::from_le_bytes([ + meta_len_bytes[0], + meta_len_bytes[1], + meta_len_bytes[2], + meta_len_bytes[3], + ]); + + if meta_len < 0 { + return Err(ArrowError::ParseError( + "IPC message metadata length is negative".to_string(), + ) + .into()); + } + + let bytes = extend_bytes_to_n_length_from_stream( + bytes, + preamble_len + 4 + (meta_len as usize), + &mut stream, + ) + .await?; - // Decode Schema message - let message = root_as_message(&block_data).map_err(|err| { - ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) + let message = root_as_message(&bytes[preamble_len + 4..]).map_err(|err| { + ArrowError::ParseError(format!("Unable to read IPC message metadata: {err:?}")) })?; - let ipc_schema = message.header_as_schema().ok_or_else(|| { - ArrowError::IpcError("Unable to read IPC message as schema".to_string()) + let fb_schema = message.header_as_schema().ok_or_else(|| { + ArrowError::IpcError("Unable to read IPC message schema".to_string()) })?; - let schema = fb_to_schema(ipc_schema); + let schema = fb_to_schema(fb_schema); Ok(Arc::new(schema)) } -async fn collect_at_least_n_bytes( - stream: &mut BoxStream<'static, object_store::Result>, +async fn extend_bytes_to_n_length_from_stream( + bytes: Vec, n: usize, - extend_from: Option>, + stream: &mut BoxStream<'static, object_store::Result>, ) -> Result> { - let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); - // If extending existing buffer then ensure we read n additional bytes - let n = n + buf.len(); - while let Some(bytes) = stream.next().await.transpose()? { - buf.extend_from_slice(&bytes); + if bytes.len() >= n { + return Ok(bytes); + } + + let mut buf = bytes; + + while let Some(b) = stream.next().await.transpose()? { + buf.extend_from_slice(&b); + if buf.len() >= n { break; } } + if buf.len() < n { return Err(ArrowError::ParseError( "Unexpected end of byte stream for Arrow IPC file".to_string(), - ))?; + ) + .into()); } + Ok(buf) } +async fn is_object_in_arrow_ipc_file_format( + store: Arc, + object_location: &Path, +) -> Result { + let get_opts = GetOptions { + range: Some(GetRange::Bounded(0..6)), + ..Default::default() + }; + let bytes = store + .get_opts(object_location, get_opts) + .await? + .bytes() + .await?; + Ok(bytes.len() >= 6 && bytes[0..6] == ARROW_MAGIC) +} + #[cfg(test)] mod tests { use super::*; @@ -529,80 +584,143 @@ mod tests { #[tokio::test] async fn test_infer_schema_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let state = MockSession::new(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; - - // Test chunk sizes where too small so we keep having to read more bytes - // And when large enough that first read contains all we need - for chunk_size in [7, 3000] { - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); - let inferred_schema = arrow_format + for file in ["example.arrow", "example_stream.arrow"] { + let mut bytes = std::fs::read(format!("tests/data/{file}"))?; + bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file + let location = Path::parse(file)?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let arrow_format = ArrowFormat {}; + let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; + + // Test chunk sizes where too small so we keep having to read more bytes + // And when large enough that first read contains all we need + for chunk_size in [7, 3000] { + let store = + Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); + let inferred_schema = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + std::slice::from_ref(&object_meta), + ) + .await?; + let actual_fields = inferred_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(expected, actual_fields); + } + } + Ok(()) + } + + #[tokio::test] + async fn test_infer_schema_short_stream() -> Result<()> { + for file in ["example.arrow", "example_stream.arrow"] { + let mut bytes = std::fs::read(format!("tests/data/{file}"))?; + bytes.truncate(20); // should cause error that file shorter than expected + let location = Path::parse(file)?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let state = MockSession::new(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let arrow_format = ArrowFormat {}; + + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); + let err = arrow_format .infer_schema( &state, &(store.clone() as Arc), std::slice::from_ref(&object_meta), ) - .await?; - let actual_fields = inferred_schema - .fields() - .iter() - .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); - assert_eq!(expected, actual_fields); + .await; + + assert!(err.is_err()); + assert_eq!( "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", err.unwrap_err().to_string().lines().next().unwrap()); } Ok(()) } #[tokio::test] - async fn test_infer_schema_short_stream() -> Result<()> { - let mut bytes = std::fs::read("tests/data/example.arrow")?; - bytes.truncate(20); // should cause error that file shorter than expected - let location = Path::parse("example.arrow")?; - let in_memory_store: Arc = Arc::new(InMemory::new()); - in_memory_store.put(&location, bytes.into()).await?; - - let state = MockSession::new(); - let object_meta = ObjectMeta { - location, - last_modified: DateTime::default(), - size: u64::MAX, - e_tag: None, - version: None, - }; - - let arrow_format = ArrowFormat {}; - - let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); - let err = arrow_format - .infer_schema( - &state, - &(store.clone() as Arc), - std::slice::from_ref(&object_meta), - ) - .await; + async fn test_format_detection_file_format() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("test.arrow"); + + let file_bytes = std::fs::read("tests/data/example.arrow")?; + store.put(&path, file_bytes.into()).await?; + + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + assert!(is_file, "Should detect file format"); + Ok(()) + } + + #[tokio::test] + async fn test_format_detection_stream_format() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("test_stream.arrow"); + + let stream_bytes = std::fs::read("tests/data/example_stream.arrow")?; + store.put(&path, stream_bytes.into()).await?; - assert!(err.is_err()); - assert_eq!( - "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", - err.unwrap_err().to_string().lines().next().unwrap() + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + + assert!(!is_file, "Should detect stream format (not file)"); + + Ok(()) + } + + #[tokio::test] + async fn test_format_detection_corrupted_file() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("corrupted.arrow"); + + store + .put(&path, Bytes::from(vec![0x43, 0x4f, 0x52, 0x41]).into()) + .await?; + + let is_file = is_object_in_arrow_ipc_file_format(store.clone(), &path).await?; + + assert!( + !is_file, + "Corrupted file should not be detected as file format" ); Ok(()) } + + #[tokio::test] + async fn test_format_detection_empty_file() -> Result<()> { + let store = Arc::new(InMemory::new()); + let path = Path::from("empty.arrow"); + + store.put(&path, Bytes::new().into()).await?; + + let result = is_object_in_arrow_ipc_file_format(store.clone(), &path).await; + + // currently errors because it tries to read 0..6 from an empty file + assert!(result.is_err(), "Empty file should error"); + + Ok(()) + } } diff --git a/datafusion/datasource-arrow/src/mod.rs b/datafusion/datasource-arrow/src/mod.rs index 18bb8792c3ff..0f38579d6f0c 100644 --- a/datafusion/datasource-arrow/src/mod.rs +++ b/datafusion/datasource-arrow/src/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! [`ArrowFormat`]: Apache Arrow file format abstractions + // Make sure fast / cheap clones on Arc are explicit: // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index b3253d43f49a..b1c7eb81d1a3 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -15,19 +15,34 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; +//! Execution plan for reading Arrow IPC files +//! +//! # Naming Note +//! +//! The naming in this module can be confusing: +//! - `ArrowFileSource` / `ArrowFileOpener` handle the Arrow IPC **file format** +//! (with footer, supports parallel reading) +//! - `ArrowStreamFileSource` / `ArrowStreamFileOpener` handle the Arrow IPC **stream format** +//! (without footer, sequential only) +//! +//! Despite the name "ArrowStreamFileSource", it still reads from files - the "Stream" +//! refers to the Arrow IPC stream format, not streaming I/O. Both formats can be stored +//! in files on disk or object storage. + use std::sync::Arc; +use std::{any::Any, io::Cursor}; -use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::{as_file_source, TableSchema}; use arrow::buffer::Buffer; -use arrow_ipc::reader::FileDecoder; +use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; use datafusion_common::{exec_datafusion_err, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_datasource::file_stream::FileOpenFuture; @@ -36,19 +51,18 @@ use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; -/// Arrow configuration struct that is given to DataSourceExec -/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow +/// `FileSource` for Arrow IPC file format. Supports range-based parallel reading. #[derive(Clone)] -pub struct ArrowSource { - table_schema: datafusion_datasource::TableSchema, +pub(crate) struct ArrowFileSource { + table_schema: TableSchema, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, } -impl ArrowSource { - /// Initialize an ArrowSource with the provided schema - pub fn new(table_schema: impl Into) -> Self { +impl ArrowFileSource { + /// Initialize an ArrowFileSource with the provided schema + pub fn new(table_schema: impl Into) -> Self { Self { table_schema: table_schema.into(), metrics: ExecutionPlanMetricsSet::new(), @@ -58,20 +72,20 @@ impl ArrowSource { } } -impl From for Arc { - fn from(source: ArrowSource) -> Self { +impl From for Arc { + fn from(source: ArrowFileSource) -> Self { as_file_source(source) } } -impl FileSource for ArrowSource { +impl FileSource for ArrowFileSource { fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, _partition: usize, ) -> Arc { - Arc::new(ArrowOpener { + Arc::new(ArrowFileOpener { object_store, projection: base_config.file_column_projection_indices(), }) @@ -81,7 +95,7 @@ impl FileSource for ArrowSource { self } - fn table_schema(&self) -> &datafusion_datasource::TableSchema { + fn table_schema(&self) -> &TableSchema { &self.table_schema } @@ -129,13 +143,163 @@ impl FileSource for ArrowSource { } } -/// The struct arrow that implements `[FileOpener]` trait -pub struct ArrowOpener { - pub object_store: Arc, - pub projection: Option>, +/// `FileSource` for Arrow IPC stream format. Supports only sequential reading. +#[derive(Clone)] +pub(crate) struct ArrowStreamFileSource { + table_schema: TableSchema, + metrics: ExecutionPlanMetricsSet, + projected_statistics: Option, + schema_adapter_factory: Option>, } -impl FileOpener for ArrowOpener { +impl ArrowStreamFileSource { + /// Initialize an ArrowStreamFileSource with the provided schema + pub fn new(table_schema: impl Into) -> Self { + Self { + table_schema: table_schema.into(), + metrics: ExecutionPlanMetricsSet::new(), + projected_statistics: None, + schema_adapter_factory: None, + } + } +} + +impl From for Arc { + fn from(source: ArrowStreamFileSource) -> Self { + as_file_source(source) + } +} + +impl FileSource for ArrowStreamFileSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + _partition: usize, + ) -> Arc { + Arc::new(ArrowStreamFileOpener { + object_store, + projection: base_config.file_column_projection_indices(), + }) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, _batch_size: usize) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + let mut conf = self.clone(); + conf.projected_statistics = Some(statistics); + Arc::new(conf) + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc { + Arc::new(Self { ..self.clone() }) + } + + fn repartitioned( + &self, + _target_partitions: usize, + _repartition_file_min_size: usize, + _output_ordering: Option, + _config: &FileScanConfig, + ) -> Result> { + // Stream format doesn't support range-based parallel reading + // because it lacks a footer that would be needed to make range-based + // seeking practical. Without that, you would either need to read + // the entire file and index it up front before doing parallel reading + // or else each partition would need to read the entire file up to the + // correct offset which is a lot of duplicate I/O. We're opting to avoid + // that entirely by only acting on a single partition and reading sequentially. + Ok(None) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> Result { + let statistics = &self.projected_statistics; + Ok(statistics + .clone() + .expect("projected_statistics must be set")) + } + + fn file_type(&self) -> &str { + "arrow_stream" + } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Result> { + Ok(Arc::new(Self { + schema_adapter_factory: Some(schema_adapter_factory), + ..self.clone() + })) + } + + fn schema_adapter_factory(&self) -> Option> { + self.schema_adapter_factory.clone() + } + + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } +} + +/// `FileOpener` for Arrow IPC stream format. Supports only sequential reading. +pub(crate) struct ArrowStreamFileOpener { + object_store: Arc, + projection: Option>, +} + +impl FileOpener for ArrowStreamFileOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + if partitioned_file.range.is_some() { + return Err(exec_datafusion_err!( + "ArrowStreamOpener does not support range-based reading" + )); + } + let object_store = Arc::clone(&self.object_store); + let projection = self.projection.clone(); + Ok(Box::pin(async move { + let r = object_store + .get(&partitioned_file.object_meta.location) + .await?; + match r.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, _) => Ok(futures::stream::iter( + StreamReader::try_new(file.try_clone()?, projection.clone())?, + ) + .map(|r| r.map_err(Into::into)) + .boxed()), + GetResultPayload::Stream(_) => { + let bytes = r.bytes().await?; + let cursor = Cursor::new(bytes); + Ok(futures::stream::iter(StreamReader::try_new( + cursor, + projection.clone(), + )?) + .map(|r| r.map_err(Into::into)) + .boxed()) + } + } + })) + } +} + +/// `FileOpener` for Arrow IPC file format. Supports range-based parallel reading. +pub(crate) struct ArrowFileOpener { + object_store: Arc, + projection: Option>, +} + +impl FileOpener for ArrowFileOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); @@ -148,23 +312,20 @@ impl FileOpener for ArrowOpener { .await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(file, _) => { - let arrow_reader = arrow::ipc::reader::FileReader::try_new( - file, projection, - )?; - Ok(futures::stream::iter(arrow_reader) - .map(|r| r.map_err(Into::into)) - .boxed()) - } + GetResultPayload::File(file, _) => Ok(futures::stream::iter( + FileReader::try_new(file.try_clone()?, projection.clone())?, + ) + .map(|r| r.map_err(Into::into)) + .boxed()), GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; - let cursor = std::io::Cursor::new(bytes); - let arrow_reader = arrow::ipc::reader::FileReader::try_new( - cursor, projection, - )?; - Ok(futures::stream::iter(arrow_reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + let cursor = Cursor::new(bytes); + Ok(futures::stream::iter(FileReader::try_new( + cursor, + projection.clone(), + )?) + .map(|r| r.map_err(Into::into)) + .boxed()) } } } @@ -272,3 +433,361 @@ impl FileOpener for ArrowOpener { })) } } + +/// `FileSource` wrapper for both Arrow IPC file and stream formats +#[derive(Clone)] +pub struct ArrowSource { + pub inner: Arc, +} + +impl ArrowSource { + /// Creates a new [`ArrowSource`] + pub fn new(inner: Arc) -> Self { + Self { inner } + } + + /// Creates an [`ArrowSource`] for file format + pub fn new_file_source(table_schema: impl Into) -> Self { + Self { + inner: Arc::new(ArrowFileSource::new(table_schema)), + } + } + + /// Creates an [`ArrowSource`] for stream format + pub fn new_stream_file_source(table_schema: impl Into) -> Self { + Self { + inner: Arc::new(ArrowStreamFileSource::new(table_schema)), + } + } +} + +impl FileSource for ArrowSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Arc { + self.inner + .create_file_opener(object_store, base_config, partition) + } + + fn as_any(&self) -> &dyn Any { + self.inner.as_any() + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + Arc::new(Self { + inner: self.inner.with_batch_size(batch_size), + }) + } + + fn with_projection(&self, config: &FileScanConfig) -> Arc { + Arc::new(Self { + inner: self.inner.with_projection(config), + }) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(Self { + inner: self.inner.with_statistics(statistics), + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + self.inner.metrics() + } + + fn statistics(&self) -> Result { + self.inner.statistics() + } + + fn file_type(&self) -> &str { + self.inner.file_type() + } + + fn with_schema_adapter_factory( + &self, + schema_adapter_factory: Arc, + ) -> Result> { + Ok(Arc::new(Self { + inner: self + .inner + .with_schema_adapter_factory(schema_adapter_factory)?, + })) + } + + fn schema_adapter_factory(&self) -> Option> { + self.inner.schema_adapter_factory() + } + + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, + ) -> Result> { + self.inner.repartitioned( + target_partitions, + repartition_file_min_size, + output_ordering, + config, + ) + } + + fn table_schema(&self) -> &TableSchema { + self.inner.table_schema() + } +} + +/// `FileOpener` wrapper for both Arrow IPC file and stream formats +pub struct ArrowOpener { + pub inner: Arc, +} + +impl FileOpener for ArrowOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + self.inner.open(partitioned_file) + } +} + +impl ArrowOpener { + /// Creates a new [`ArrowOpener`] + pub fn new(inner: Arc) -> Self { + Self { inner } + } + + pub fn new_file_opener( + object_store: Arc, + projection: Option>, + ) -> Self { + Self { + inner: Arc::new(ArrowFileOpener { + object_store, + projection, + }), + } + } + + pub fn new_stream_file_opener( + object_store: Arc, + projection: Option>, + ) -> Self { + Self { + inner: Arc::new(ArrowStreamFileOpener { + object_store, + projection, + }), + } + } +} + +impl From for Arc { + fn from(source: ArrowSource) -> Self { + as_file_source(source) + } +} + +#[cfg(test)] +mod tests { + use std::{fs::File, io::Read}; + + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_ipc::reader::{FileReader, StreamReader}; + use bytes::Bytes; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_execution::object_store::ObjectStoreUrl; + use object_store::memory::InMemory; + + use super::*; + + #[tokio::test] + async fn test_file_opener_without_ranges() -> Result<()> { + for filename in ["example.arrow", "example_stream.arrow"] { + let path = format!("tests/data/{filename}"); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new(filename, file_size); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = match FileReader::try_new(File::open(path_str)?, None) { + Ok(reader) => reader.schema(), + Err(_) => StreamReader::try_new(File::open(path_str)?, None)?.schema(), + }; + + let source: Arc = if filename.contains("stream") { + Arc::new(ArrowStreamFileSource::new(schema)) + } else { + Arc::new(ArrowFileSource::new(schema)) + }; + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let mut stream = file_opener.open(partitioned_file)?.await?; + + assert!(stream.next().await.is_some()); + } + + Ok(()) + } + + #[tokio::test] + async fn test_file_opener_with_ranges() -> Result<()> { + let filename = "example.arrow"; + let path = format!("tests/data/{filename}"); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new_with_range( + filename.into(), + file_size, + 0, + (file_size - 1) as i64, + ); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = FileReader::try_new(File::open(path_str)?, None)?.schema(); + + let source = Arc::new(ArrowFileSource::new(schema)); + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let mut stream = file_opener.open(partitioned_file)?.await?; + + assert!(stream.next().await.is_some()); + + Ok(()) + } + + #[tokio::test] + async fn test_stream_opener_errors_with_ranges() -> Result<()> { + let filename = "example_stream.arrow"; + let path = format!("tests/data/{filename}"); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new_with_range( + filename.into(), + file_size, + 0, + (file_size - 1) as i64, + ); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let schema = StreamReader::try_new(File::open(path_str)?, None)?.schema(); + + let source = Arc::new(ArrowStreamFileSource::new(schema)); + + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + source.clone(), + ) + .build(); + + let file_opener = source.create_file_opener(object_store, &scan_config, 0); + let result = file_opener.open(partitioned_file); + assert!(result.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_arrow_stream_repartitioning_not_supported() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("f0", DataType::Int64, false)])); + let source = ArrowStreamFileSource::new(schema); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(source.clone()) as Arc, + ) + .build(); + + for target_partitions in [2, 4, 8, 16] { + let result = + source.repartitioned(target_partitions, 1024 * 1024, None, &config)?; + + assert!( + result.is_none(), + "Stream format should not support repartitioning with {target_partitions} partitions", + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_stream_opener_with_projection() -> Result<()> { + let filename = "example_stream.arrow"; + let path = format!("tests/data/{filename}"); + let path_str = path.as_str(); + let mut file = File::open(path_str)?; + let file_size = file.metadata()?.len(); + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let bytes = Bytes::from(buffer); + + let object_store = Arc::new(InMemory::new()); + let partitioned_file = PartitionedFile::new(filename, file_size); + object_store + .put(&partitioned_file.object_meta.location, bytes.into()) + .await?; + + let opener = ArrowStreamFileOpener { + object_store, + projection: Some(vec![0]), // just the first column + }; + + let mut stream = opener.open(partitioned_file)?.await?; + + if let Some(batch) = stream.next().await { + let batch = batch?; + assert_eq!( + batch.num_columns(), + 1, + "Projection should result in 1 column" + ); + } else { + panic!("Expected at least one batch"); + } + + Ok(()) + } +} diff --git a/datafusion/datasource-arrow/tests/data/example_stream.arrow b/datafusion/datasource-arrow/tests/data/example_stream.arrow new file mode 100644 index 000000000000..dbe10596f3a9 Binary files /dev/null and b/datafusion/datasource-arrow/tests/data/example_stream.arrow differ diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt b/datafusion/sqllogictest/test_files/arrow_files.slt index b3975e0c3f47..6392c61c4b8c 100644 --- a/datafusion/sqllogictest/test_files/arrow_files.slt +++ b/datafusion/sqllogictest/test_files/arrow_files.slt @@ -128,3 +128,228 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/ # Errors in partition filters should be reported query error Divide by zero error SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1; + +############# +## Arrow IPC stream format support +############# + +# Test CREATE EXTERNAL TABLE with stream format +statement ok +CREATE EXTERNAL TABLE arrow_stream +STORED AS ARROW +LOCATION '../datasource-arrow/tests/data/example_stream.arrow'; + +# physical plan for stream format +query TT +EXPLAIN SELECT * FROM arrow_stream +---- +logical_plan TableScan: arrow_stream projection=[f0, f1, f2] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/datasource-arrow/tests/data/example_stream.arrow]]}, projection=[f0, f1, f2], file_type=arrow_stream + +# stream format should return same data as file format +query ITB +SELECT * FROM arrow_stream +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +# Verify both file and stream formats return identical results +query ITB +SELECT * FROM arrow_simple ORDER BY f0 +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +query ITB +SELECT * FROM arrow_stream ORDER BY f0 +---- +1 foo true +2 bar NULL +3 baz false +4 NULL true + +# Both formats should support projection pushdown +query IT +SELECT f0, f1 FROM arrow_simple ORDER BY f0 +---- +1 foo +2 bar +3 baz +4 NULL + +query IT +SELECT f0, f1 FROM arrow_stream ORDER BY f0 +---- +1 foo +2 bar +3 baz +4 NULL + +# Both formats should support filtering +query ITB +SELECT * FROM arrow_simple WHERE f0 > 2 ORDER BY f0 +---- +3 baz false +4 NULL true + +query ITB +SELECT * FROM arrow_stream WHERE f0 > 2 ORDER BY f0 +---- +3 baz false +4 NULL true + +# Test aggregations on stream format +query I +SELECT COUNT(*) FROM arrow_stream +---- +4 + +query I +SELECT SUM(f0) FROM arrow_stream +---- +10 + +query I +SELECT MAX(f0) FROM arrow_stream +---- +4 + +query I +SELECT MIN(f0) FROM arrow_stream WHERE f0 IS NOT NULL +---- +1 + +# Test aggregations on file format for comparison +query I +SELECT COUNT(*) FROM arrow_simple +---- +4 + +query I +SELECT SUM(f0) FROM arrow_simple +---- +10 + +# Test joins between file and stream formats +query ITBITB +SELECT a.f0, a.f1, a.f2, b.f0, b.f1, b.f2 +FROM arrow_simple a +JOIN arrow_stream b ON a.f0 = b.f0 +WHERE a.f0 <= 2 +ORDER BY a.f0 +---- +1 foo true 1 foo true +2 bar NULL 2 bar NULL + +# Test that both formats work in UNION +query ITB +SELECT * FROM arrow_simple WHERE f0 = 1 +UNION ALL +SELECT * FROM arrow_stream WHERE f0 = 2 +ORDER BY f0 +---- +1 foo true +2 bar NULL + +# Test GROUP BY on stream format +query BI +SELECT f2, COUNT(*) as cnt FROM arrow_stream GROUP BY f2 ORDER BY f2 +---- +false 1 +true 2 +NULL 1 + +# Test DISTINCT on stream format +query B +SELECT DISTINCT f2 FROM arrow_stream ORDER BY f2 +---- +false +true +NULL + +# Test subquery with stream format +query I +SELECT f0 FROM arrow_simple WHERE f0 IN (SELECT f0 FROM arrow_stream WHERE f0 < 3) ORDER BY f0 +---- +1 +2 + +# ARROW partitioned table (stream format) +statement ok +CREATE EXTERNAL TABLE arrow_partitioned_stream ( + part Int, + f0 Bigint, + f1 String, + f2 Boolean +) +STORED AS ARROW +LOCATION '../core/tests/data/partitioned_table_arrow_stream/' +PARTITIONED BY (part); + +# select wildcard +query ITBI +SELECT * FROM arrow_partitioned_stream ORDER BY f0; +---- +1 foo true 123 +2 bar false 123 +3 baz true 456 +4 NULL NULL 456 + +# select all fields +query IITB +SELECT part, f0, f1, f2 FROM arrow_partitioned_stream ORDER BY f0; +---- +123 1 foo true +123 2 bar false +456 3 baz true +456 4 NULL NULL + +# select without partition column +query IB +SELECT f0, f2 FROM arrow_partitioned_stream ORDER BY f0 +---- +1 true +2 false +3 true +4 NULL + +# select only partition column +query I +SELECT part FROM arrow_partitioned_stream ORDER BY part +---- +123 +123 +456 +456 + +# select without any table-related columns in projection +query I +SELECT 1 FROM arrow_partitioned_stream +---- +1 +1 +1 +1 + +# select with partition filter +query I +SELECT f0 FROM arrow_partitioned_stream WHERE part = 123 ORDER BY f0 +---- +1 +2 + +# select with partition filter should scan only one directory +query TT +EXPLAIN SELECT f0 FROM arrow_partitioned_stream WHERE part = 456 +---- +logical_plan TableScan: arrow_partitioned_stream projection=[f0], full_filters=[arrow_partitioned_stream.part = Int32(456)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow_stream/part=456/data.arrow]]}, projection=[f0], file_type=arrow_stream + + +# Errors in partition filters should be reported +query error Divide by zero error +SELECT f0 FROM arrow_partitioned_stream WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1;