diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index b83335a628..0dcb8309f7 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -15,22 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use arrow_array::{RecordBatch, UInt64Array}; -use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use arrow_schema::Schema as ArrowSchema; use futures::channel::mpsc::channel; use futures::stream::select; use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; -use crate::arrow::{ - ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH, - RESERVED_FIELD_ID_POS, StreamsInto, -}; +use crate::arrow::{ArrowReader, StreamsInto}; use crate::delete_vector::DeleteVector; use crate::io::FileIO; use crate::runtime::spawn; @@ -43,19 +38,6 @@ use crate::{Error, ErrorKind, Result}; /// Default batch size for incremental delete operations. const DEFAULT_BATCH_SIZE: usize = 1024; -/// Creates the schema for positional delete records containing the "pos" column. -/// The pos field includes the reserved field ID as metadata. -fn create_pos_delete_schema() -> Arc { - let pos_field = - Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([ - ( - PARQUET_FIELD_ID_META_KEY.to_string(), - RESERVED_FIELD_ID_POS.to_string(), - ), - ])); - Arc::new(ArrowSchema::new(vec![pos_field])) -} - /// The type of incremental batch: appended data or deleted records. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum IncrementalBatchType { @@ -254,10 +236,15 @@ async fn process_incremental_append_task( record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering + // that come back from the file, such as type promotion, default column insertion, + // column re-ordering, and virtual field addition (like _file) let mut record_batch_transformer = - RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids).build(); + RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids) + .with_constant( + crate::metadata_columns::RESERVED_FIELD_ID_FILE, + crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()), + )? + .build(); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -295,7 +282,9 @@ fn process_incremental_delete_task( delete_vector: DeleteVector, batch_size: Option, ) -> Result { - let schema = create_pos_delete_schema(); + let schema = Arc::new(ArrowSchema::new(vec![Arc::clone( + crate::metadata_columns::pos_field(), + )])); let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); @@ -315,14 +304,7 @@ fn process_incremental_delete_task( "Failed to create RecordBatch for DeleteVector", ) }) - .and_then(|batch| { - ArrowReader::add_file_path_column( - batch, - &file_path, - RESERVED_COL_NAME_FILE_PATH, - RESERVED_FIELD_ID_FILE_PATH, - ) - }) + .and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path)) }); Ok(Box::pin(stream) as ArrowRecordBatchStream) @@ -333,7 +315,9 @@ fn process_incremental_deleted_file_task( total_records: u64, batch_size: Option, ) -> Result { - let schema = create_pos_delete_schema(); + let schema = Arc::new(ArrowSchema::new(vec![Arc::clone( + crate::metadata_columns::pos_field(), + )])); let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); @@ -352,14 +336,7 @@ fn process_incremental_deleted_file_task( "Failed to create RecordBatch for deleted file", ) }) - .and_then(|batch| { - ArrowReader::add_file_path_column( - batch, - &file_path, - RESERVED_COL_NAME_FILE_PATH, - RESERVED_FIELD_ID_FILE_PATH, - ) - }) + .and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path)) }); Ok(Box::pin(stream) as ArrowRecordBatchStream) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index db85cd5730..15b386109d 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -37,6 +37,8 @@ mod incremental; pub use incremental::*; pub use reader::*; pub use value::*; + +// Re-export delete file constants for convenience /// Partition value calculator for computing partition values pub mod partition_value_calculator; pub use partition_value_calculator::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 0dc1ccf68f..f5d5f5682a 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -24,13 +24,12 @@ use std::sync::Arc; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; use arrow_array::{ - Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar, - StringArray, + Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar, StringArray, }; use arrow_cast::cast::cast; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ - ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use arrow_string::like::starts_with; use bytes::Bytes; @@ -57,33 +56,12 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; -/// Reserved field ID for the file path (_file) column per Iceberg spec -/// This is dead code for now but will be used when we add the _file column support. -#[allow(dead_code)] -pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; - -/// Column name for the file path metadata column per Iceberg spec -/// This is dead code for now but will be used when we add the _file column support. -#[allow(dead_code)] -pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; - -/// Reserved field ID for the file path column used in delete file reading. -pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; - -/// Column name for the file path metadata column used in delete file reading. -pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path"; - -/// Reserved field ID for the position column used in delete file reading. -pub(crate) const RESERVED_FIELD_ID_POS: i32 = 2147483545; - -/// Column name for the position metadata column used in delete file reading. -pub(crate) const RESERVED_COL_NAME_POS: &str = "pos"; - /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -282,12 +260,20 @@ impl ArrowReader { initial_stream_builder }; + // Filter out metadata fields for Parquet projection (they don't exist in files) + let project_field_ids_without_metadata: Vec = task + .project_field_ids + .iter() + .filter(|&&id| !is_metadata_field(id)) + .copied() + .collect(); + // Create projection mask based on field IDs // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( - &task.project_field_ids, + &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -298,16 +284,20 @@ impl ArrowReader { record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering. + // that come back from the file, such as type promotion, default column insertion, + // column re-ordering, partition constants, and virtual field addition (like _file) let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()) + .with_constant( + RESERVED_FIELD_ID_FILE, + PrimitiveLiteral::String(task.data_file_path.clone()), + )?; if let (Some(partition_spec), Some(partition_data)) = (task.partition_spec.clone(), task.partition.clone()) { record_batch_transformer_builder = - record_batch_transformer_builder.with_partition(partition_spec, partition_data); + record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } let mut record_batch_transformer = record_batch_transformer_builder.build(); @@ -448,7 +438,10 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + // Process the record batch (type promotion, column reordering, virtual fields, etc.) + record_batch_transformer.process_record_batch(batch) + } Err(err) => Err(err.into()), }); @@ -597,103 +590,35 @@ impl ArrowReader { Ok(results.into()) } - /// Helper function to add a `_file` column to a RecordBatch. - /// Takes the array and field to add, reducing code duplication. - fn create_file_field( - batch: RecordBatch, - file_array: ArrayRef, - file_field: Field, - field_id: i32, - ) -> Result { + /// Adds a file_path column to the RecordBatch containing the file path. + /// Used for positional delete records to track which file each delete applies to. + /// Materializes the file path string for each row (no compression). + pub(crate) fn add_file_path_column(batch: RecordBatch, file_path: &str) -> Result { + let num_rows = batch.num_rows(); + + // Create a StringArray with the file path repeated num_rows times + let file_array = StringArray::from(vec![file_path; num_rows]); + + // Use the lazy field definition for file_path (from metadata_columns) + let file_field = crate::metadata_columns::file_path_field(); + + // Add the column using the predefined field (already has metadata) let mut columns = batch.columns().to_vec(); - columns.push(file_array); + columns.push(Arc::new(file_array)); let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); - fields.push(Arc::new(file_field.with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - field_id.to_string(), - )])))); + fields.push(Arc::clone(file_field)); let schema = Arc::new(ArrowSchema::new(fields)); RecordBatch::try_new(schema, columns).map_err(|e| { Error::new( ErrorKind::Unexpected, - "Failed to add _file column to RecordBatch", + "Failed to add file_path column to RecordBatch", ) .with_source(e) }) } - /// Adds a `_file` column to the RecordBatch containing the file path. - /// Uses Run-End Encoding (REE) for maximum memory efficiency when the same - /// file path is repeated across all rows. - /// Note: This is only used in tests for now, for production usage we use the - /// non-REE version as it is Julia-compatible. - #[allow(dead_code)] - pub(crate) fn add_file_path_column_ree( - batch: RecordBatch, - file_path: &str, - field_name: &str, - field_id: i32, - ) -> Result { - let num_rows = batch.num_rows(); - - // Use Run-End Encoded array for optimal memory efficiency - // For a constant value repeated num_rows times, this stores: - // - run_ends: [num_rows] (one i32) for non-empty batches, or [] for empty batches - // - values: [file_path] (one string) for non-empty batches, or [] for empty batches - let run_ends = if num_rows == 0 { - Int32Array::from(Vec::::new()) - } else { - Int32Array::from(vec![num_rows as i32]) - }; - let values = if num_rows == 0 { - StringArray::from(Vec::<&str>::new()) - } else { - StringArray::from(vec![file_path]) - }; - - let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to create RunArray for _file column", - ) - .with_source(e) - })?; - - // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE - // DataType is RunEndEncoded with Int32 run ends and Utf8 values - // Note: values field is nullable to match what RunArray::try_new(..) expects. - let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); - let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); - let file_field = Field::new( - field_name, - DataType::RunEndEncoded(run_ends_field, values_field), - false, - ); - - Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) - } - - /// Adds a `_file` column to the RecordBatch containing the file path. - /// Materializes the file path string for each row (no compression). - pub(crate) fn add_file_path_column( - batch: RecordBatch, - file_path: &str, - field_name: &str, - field_id: i32, - ) -> Result { - let num_rows = batch.num_rows(); - - // Create a StringArray with the file path repeated num_rows times - let file_array = StringArray::from(vec![file_path; num_rows]); - - // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE - let file_field = Field::new(field_name, DataType::Utf8, false); - - Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) - } - fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, @@ -1868,7 +1793,6 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; - use as_any::Downcast; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::{ArrowWriter, ProjectionMask}; @@ -1882,9 +1806,7 @@ mod tests { use crate::ErrorKind; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; - use crate::arrow::{ - ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, - }; + use crate::arrow::{ArrowReader, ArrowReaderBuilder}; use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; @@ -2695,170 +2617,6 @@ message schema { assert!(col_b.is_null(2)); } - #[test] - fn test_add_file_path_column_ree() { - use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; - use arrow_schema::{DataType, Field, Schema}; - - // Create a simple test batch with 2 columns and 3 rows - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ])); - - let id_array = Int32Array::from(vec![1, 2, 3]); - let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); - - let batch = RecordBatch::try_new(schema.clone(), vec![ - Arc::new(id_array), - Arc::new(name_array), - ]) - .unwrap(); - - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 3); - - // Add file path column with REE - let file_path = "/path/to/data/file.parquet"; - let result = ArrowReader::add_file_path_column_ree( - batch, - file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - ); - assert!(result.is_ok(), "Should successfully add file path column"); - - let new_batch = result.unwrap(); - - // Verify the new batch has 3 columns - assert_eq!(new_batch.num_columns(), 3); - assert_eq!(new_batch.num_rows(), 3); - - // Verify schema has the _file column - let schema = new_batch.schema(); - assert_eq!(schema.fields().len(), 3); - - let file_field = schema.field(2); - assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); - assert!(!file_field.is_nullable()); - - // Verify the field has the correct metadata - let metadata = file_field.metadata(); - assert_eq!( - metadata.get(PARQUET_FIELD_ID_META_KEY), - Some(&RESERVED_FIELD_ID_FILE.to_string()) - ); - - // Verify the data type is RunEndEncoded - match file_field.data_type() { - DataType::RunEndEncoded(run_ends_field, values_field) => { - assert_eq!(run_ends_field.name(), "run_ends"); - assert_eq!(run_ends_field.data_type(), &DataType::Int32); - assert!(!run_ends_field.is_nullable()); - - assert_eq!(values_field.name(), "values"); - assert_eq!(values_field.data_type(), &DataType::Utf8); - } - _ => panic!("Expected RunEndEncoded data type for _file column"), - } - - // Verify the original columns are intact - let id_col = new_batch - .column(0) - .as_primitive::(); - assert_eq!(id_col.values(), &[1, 2, 3]); - - let name_col = new_batch.column(1).as_string::(); - assert_eq!(name_col.value(0), "Alice"); - assert_eq!(name_col.value(1), "Bob"); - assert_eq!(name_col.value(2), "Charlie"); - - // Verify the file path column contains the correct value - // The _file column is a RunArray, so we need to decode it - let file_col = new_batch.column(2); - let run_array = file_col - .as_any() - .downcast_ref::>() - .expect("Expected RunArray for _file column"); - - // Verify the run array structure (should be optimally encoded) - let run_ends = run_array.run_ends(); - assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end"); - assert_eq!( - run_ends.values()[0], - new_batch.num_rows() as i32, - "Run end should equal number of rows" - ); - - // Check that the single value in the RunArray is the expected file path - let values = run_array.values(); - let string_values = values.as_string::(); - assert_eq!(string_values.len(), 1, "Should have only 1 value"); - assert_eq!(string_values.value(0), file_path); - - assert!( - string_values - .downcast_ref::() - .unwrap() - .iter() - .all(|v| v == Some(file_path)) - ) - } - - #[test] - fn test_add_file_path_column_ree_empty_batch() { - use arrow_array::RecordBatch; - use arrow_schema::{DataType, Field, Schema}; - use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - - // Create an empty batch - let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - - let id_array = arrow_array::Int32Array::from(Vec::::new()); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); - - assert_eq!(batch.num_rows(), 0); - - // Add file path column to empty batch with REE - let file_path = "/empty/file.parquet"; - let result = ArrowReader::add_file_path_column_ree( - batch, - file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - ); - - // Should succeed with empty RunArray for empty batches - assert!(result.is_ok()); - let new_batch = result.unwrap(); - assert_eq!(new_batch.num_rows(), 0); - assert_eq!(new_batch.num_columns(), 2); - - // Verify the _file column exists with correct schema - let schema = new_batch.schema(); - let file_field = schema.field(1); - assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); - - // Should use RunEndEncoded even for empty batches - match file_field.data_type() { - DataType::RunEndEncoded(run_ends_field, values_field) => { - assert_eq!(run_ends_field.data_type(), &DataType::Int32); - assert_eq!(values_field.data_type(), &DataType::Utf8); - } - _ => panic!("Expected RunEndEncoded data type for _file column"), - } - - // Verify metadata with reserved field ID - assert_eq!( - file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), - Some(&RESERVED_FIELD_ID_FILE.to_string()) - ); - - // Verify the file path column is empty but properly structured - let file_path_column = new_batch.column(1); - assert_eq!(file_path_column.len(), 0); - } - #[test] fn test_add_file_path_column_special_characters() { use arrow_array::{Int32Array, RecordBatch}; @@ -2871,12 +2629,7 @@ message schema { // Test with file path containing special characters (materialized version) let file_path = "/path/with spaces/and-dashes/file_name.parquet"; - let result = ArrowReader::add_file_path_column( - batch, - file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - ); + let result = ArrowReader::add_file_path_column(batch, file_path); assert!(result.is_ok()); let new_batch = result.unwrap(); @@ -2912,12 +2665,7 @@ message schema { // Add file path column with materialization let file_path = "/path/to/data/file.parquet"; - let result = ArrowReader::add_file_path_column( - batch, - file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - ); + let result = ArrowReader::add_file_path_column(batch, file_path); assert!(result.is_ok(), "Should successfully add file path column"); let new_batch = result.unwrap(); @@ -2931,14 +2679,17 @@ message schema { assert_eq!(schema.fields().len(), 3); let file_field = schema.field(2); - assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert_eq!( + file_field.name(), + crate::metadata_columns::RESERVED_COL_NAME_FILE_PATH + ); assert!(!file_field.is_nullable()); // Verify the field has the correct metadata let metadata = file_field.metadata(); assert_eq!( metadata.get(PARQUET_FIELD_ID_META_KEY), - Some(&RESERVED_FIELD_ID_FILE.to_string()) + Some(&crate::metadata_columns::RESERVED_FIELD_ID_FILE_PATH.to_string()) ); // Verify the data type is Utf8 (materialized strings) @@ -2978,12 +2729,7 @@ message schema { // Add file path column to empty batch (materialized version) let file_path = "/empty/file.parquet"; - let result = ArrowReader::add_file_path_column( - batch, - file_path, - RESERVED_COL_NAME_FILE, - RESERVED_FIELD_ID_FILE, - ); + let result = ArrowReader::add_file_path_column(batch, file_path); // Should succeed with empty StringArray assert!(result.is_ok()); @@ -2994,7 +2740,10 @@ message schema { // Verify the _file column exists with correct schema let schema = new_batch.schema(); let file_field = schema.field(1); - assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert_eq!( + file_field.name(), + crate::metadata_columns::RESERVED_COL_NAME_FILE_PATH + ); // Should use Utf8 (materialized strings) assert_eq!(file_field.data_type(), &DataType::Utf8); @@ -3002,7 +2751,7 @@ message schema { // Verify metadata with reserved field ID assert_eq!( file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), - Some(&RESERVED_FIELD_ID_FILE.to_string()) + Some(&crate::metadata_columns::RESERVED_FIELD_ID_FILE_PATH.to_string()) ); // Verify the file path column is empty but properly structured diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index a20adb6a5a..8a3018b66a 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -19,18 +19,19 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_array::{ - Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, - Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, - StructArray, + Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, + Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, + RunArray, StringArray, StructArray, }; use arrow_buffer::NullBuffer; use arrow_cast::cast; use arrow_schema::{ - DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; +use crate::metadata_columns::get_metadata_field; use crate::spec::{ Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -146,13 +147,13 @@ enum SchemaComparison { /// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters. /// -/// See [`RecordBatchTransformer`] for details on partition spec and partition data. +/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and +/// identity-partitioned fields to avoid duplicate work during batch processing. #[derive(Debug)] pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - partition_spec: Option>, - partition_data: Option, + constant_fields: HashMap, } impl RecordBatchTransformerBuilder { @@ -163,32 +164,49 @@ impl RecordBatchTransformerBuilder { Self { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), - partition_spec: None, - partition_data: None, + constant_fields: HashMap::new(), } } + /// Add a constant value for a specific field ID. + /// This is used for virtual/metadata fields like _file that have constant values per batch. + /// + /// # Arguments + /// * `field_id` - The field ID to associate with the constant + /// * `value` - The constant value for this field + pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + Ok(self) + } + /// Set partition spec and data together for identifying identity-transformed partition columns. /// /// Both partition_spec and partition_data must be provided together since the spec defines /// which fields are identity-partitioned, and the data provides their constant values. - /// One without the other cannot produce a valid constants map. + /// This method computes the partition constants and merges them into constant_fields. pub(crate) fn with_partition( mut self, partition_spec: Arc, partition_data: Struct, - ) -> Self { - self.partition_spec = Some(partition_spec); - self.partition_data = Some(partition_data); - self + ) -> Result { + // Compute partition constants for identity-transformed fields + let partition_constants = constants_map(&partition_spec, &partition_data); + + // Add partition constants to constant_fields (compute REE types from literals) + for (field_id, value) in partition_constants { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + } + + Ok(self) } pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, - partition_spec: self.partition_spec, - partition_data: self.partition_data, + constant_fields: self.constant_fields, batch_transform: None, } } @@ -228,16 +246,10 @@ impl RecordBatchTransformerBuilder { pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - - /// Partition spec for identifying identity-transformed partition columns (spec rule #1). - /// Only fields with identity transforms use partition data constants; non-identity transforms - /// (bucket, truncate, etc.) must read source columns from data files. - partition_spec: Option>, - - /// Partition data providing constant values for identity-transformed partition columns (spec rule #1). - /// For example, in a file at path `dept=engineering/file.parquet`, this would contain - /// the value "engineering" for the dept field. - partition_data: Option, + // Pre-computed constant field information: field_id -> (arrow_type, value) + // Includes both virtual/metadata fields (like _file) and identity-partitioned fields + // Avoids type conversions during batch processing + constant_fields: HashMap, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -279,8 +291,7 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, - self.partition_spec.as_ref().map(|s| s.as_ref()), - self.partition_data.as_ref(), + &self.constant_fields, )?); self.process_record_batch(record_batch)? @@ -299,8 +310,7 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - partition_spec: Option<&PartitionSpec>, - partition_data: Option<&Struct>, + constant_fields: &HashMap, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -311,22 +321,39 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { - Ok(field_id_to_mapped_schema_map - .get(field_id) - .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? - .0 - .clone()) + // Check if this is a constant field (virtual or partition) + if constant_fields.contains_key(field_id) { + // For metadata/virtual fields (like _file), get name from metadata_columns + // For partition fields, get name from schema (they exist in schema) + if let Ok(field) = get_metadata_field(*field_id) { + // This is a metadata/virtual field - use the predefined field + Ok(field) + } else { + // This is a partition constant field (exists in schema but uses constant value) + let field = &field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0; + let (arrow_type, _) = constant_fields.get(field_id).unwrap(); + // Use the type from constant_fields (REE for constants) + let constant_field = + Field::new(field.name(), arrow_type.clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()); + Ok(Arc::new(constant_field)) + } + } else { + // Regular field - use schema as-is + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + } }) .collect(); let target_schema = Arc::new(ArrowSchema::new(fields?)); - let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { - constants_map(spec, data) - } else { - HashMap::new() - }; - match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -336,8 +363,7 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, - constants_map, - partition_spec, + constant_fields, )?, target_schema, }), @@ -394,8 +420,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, - constants_map: HashMap, - _partition_spec: Option<&PartitionSpec>, + constant_fields: &HashMap, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -403,6 +428,17 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Check if this is a constant field (metadata/virtual or identity-partitioned) + // Constant fields always use their pre-computed constant values, regardless of whether + // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata + // is authoritative and should be preferred over file data. + if let Some((arrow_type, value)) = constant_fields.get(field_id) { + return Ok(ColumnSource::Add { + value: Some(value.clone()), + target_type: arrow_type.clone(), + }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) @@ -451,13 +487,8 @@ impl RecordBatchTransformer { ); // Apply spec's fallback steps for "not present" fields. - let column_source = if let Some(constant_value) = constants_map.get(field_id) { - // Rule #1: Identity partition constant - ColumnSource::Add { - value: Some(constant_value.clone()), - target_type: target_type.clone(), - } - } else if let Some(source) = field_by_id { + // Rule #1 (constants) is handled at the beginning of this function + let column_source = if let Some(source) = field_by_id { source } else { // Rules #2, #3 and #4: @@ -471,6 +502,7 @@ impl RecordBatchTransformer { None } }); + ColumnSource::Add { value: default_value, target_type: target_type.clone(), @@ -539,83 +571,253 @@ impl RecordBatchTransformer { prim_lit: &Option, num_rows: usize, ) -> Result { - Ok(match (target_type, prim_lit) { - (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { - Arc::new(BooleanArray::from(vec![*value; num_rows])) - } - (DataType::Boolean, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BooleanArray::from(vals)) - } - (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Int32Array::from(vec![*value; num_rows])) - } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) - } - (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Date32Array::from(vec![*value; num_rows])) - } - (DataType::Date32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Date32Array::from(vals)) - } - (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { - Arc::new(Int64Array::from(vec![*value; num_rows])) - } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) - } - (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { - Arc::new(Float32Array::from(vec![value.0; num_rows])) - } - (DataType::Float32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float32Array::from(vals)) - } - (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { - Arc::new(Float64Array::from(vec![value.0; num_rows])) - } - (DataType::Float64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float64Array::from(vals)) - } - (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { - Arc::new(StringArray::from(vec![value.clone(); num_rows])) - } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) - } - (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { - Arc::new(BinaryArray::from_vec(vec![value; num_rows])) - } - (DataType::Binary, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BinaryArray::from_opt_vec(vals)) - } - (DataType::Struct(fields), None) => { - // Create a StructArray filled with nulls. Per Iceberg spec, optional struct fields - // default to null when added to the schema. We defer non-null default struct values - // and leave them as not implemented yet. - let null_arrays: Vec = fields - .iter() - .map(|field| Self::create_column(field.data_type(), &None, num_rows)) - .collect::>>()?; - - Arc::new(StructArray::new( - fields.clone(), - null_arrays, - Some(NullBuffer::new_null(num_rows)), + // Check if this is a RunEndEncoded type (for constant fields) + if let DataType::RunEndEncoded(_, values_field) = target_type { + // Helper to create a Run-End Encoded array + let create_ree_array = |values_array: ArrayRef| -> Result { + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + Ok(Arc::new( + RunArray::try_new(&run_ends, &values_array).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for constant value", + ) + .with_source(e) + })?, )) - } - (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), - (dt, _) => { + }; + + // Create the values array based on the literal value + let values_array: ArrayRef = match (values_field.data_type(), prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { + Arc::new(BooleanArray::from(vec![*v])) + } + (DataType::Boolean, None) => { + Arc::new(BooleanArray::from(vec![Option::::None])) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Int32Array::from(vec![*v])) + } + (DataType::Int32, None) => Arc::new(Int32Array::from(vec![Option::::None])), + (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Date32Array::from(vec![*v])) + } + (DataType::Date32, None) => Arc::new(Date32Array::from(vec![Option::::None])), + (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { + Arc::new(Int64Array::from(vec![*v])) + } + (DataType::Int64, None) => Arc::new(Int64Array::from(vec![Option::::None])), + (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { + Arc::new(Float32Array::from(vec![v.0])) + } + (DataType::Float32, None) => { + Arc::new(Float32Array::from(vec![Option::::None])) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { + Arc::new(Float64Array::from(vec![v.0])) + } + (DataType::Float64, None) => { + Arc::new(Float64Array::from(vec![Option::::None])) + } + (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { + Arc::new(StringArray::from(vec![v.as_str()])) + } + (DataType::Utf8, None) => Arc::new(StringArray::from(vec![Option::<&str>::None])), + (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { + Arc::new(BinaryArray::from_vec(vec![v.as_slice()])) + } + (DataType::Binary, None) => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![*v as i128])) + } + (DataType::Decimal128(_, _), None) => { + Arc::new(arrow_array::Decimal128Array::from(vec![ + Option::::None, + ])) + } + (DataType::Struct(fields), None) => { + // Create a single-element StructArray with nulls + let null_arrays: Vec = fields + .iter() + .map(|f| { + // Recursively create null arrays for struct fields + // For primitive fields in structs, use simple null arrays (not REE within struct) + match f.data_type() { + DataType::Boolean => { + Arc::new(BooleanArray::from(vec![Option::::None])) + as ArrayRef + } + DataType::Int32 | DataType::Date32 => { + Arc::new(Int32Array::from(vec![Option::::None])) + } + DataType::Int64 => { + Arc::new(Int64Array::from(vec![Option::::None])) + } + DataType::Float32 => { + Arc::new(Float32Array::from(vec![Option::::None])) + } + DataType::Float64 => { + Arc::new(Float64Array::from(vec![Option::::None])) + } + DataType::Utf8 => { + Arc::new(StringArray::from(vec![Option::<&str>::None])) + } + DataType::Binary => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) + } + _ => panic!("Unsupported struct field type: {:?}", f.data_type()), + } + }) + .collect(); + Arc::new(arrow_array::StructArray::new( + fields.clone(), + null_arrays, + Some(arrow_buffer::NullBuffer::new_null(1)), + )) + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Unsupported constant type combination: {:?} with {:?}", + values_field.data_type(), + prim_lit + ), + )); + } + }; + + // Wrap in Run-End Encoding + create_ree_array(values_array) + } else { + // Non-REE type (simple arrays for non-constant fields) + Ok(match (target_type, prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { + Arc::new(BooleanArray::from(vec![*value; num_rows])) + } + (DataType::Boolean, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BooleanArray::from(vals)) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Int32Array::from(vec![*value; num_rows])) + } + (DataType::Int32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int32Array::from(vals)) + } + (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Date32Array::from(vec![*value; num_rows])) + } + (DataType::Date32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Date32Array::from(vals)) + } + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Int64Array::from(vec![*value; num_rows])) + } + (DataType::Int64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int64Array::from(vals)) + } + (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { + Arc::new(Float32Array::from(vec![value.0; num_rows])) + } + (DataType::Float32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float32Array::from(vals)) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { + Arc::new(Float64Array::from(vec![value.0; num_rows])) + } + (DataType::Float64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float64Array::from(vals)) + } + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { + Arc::new(StringArray::from(vec![value.clone(); num_rows])) + } + (DataType::Utf8, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(StringArray::from(vals)) + } + (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(BinaryArray::from_vec(vec![value; num_rows])) + } + (DataType::Binary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BinaryArray::from_opt_vec(vals)) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) => { + Arc::new(Decimal128Array::from(vec![*value; num_rows])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(value))) => { + Arc::new(Decimal128Array::from(vec![*value as i128; num_rows])) + } + (DataType::Decimal128(_, _), None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Decimal128Array::from(vals)) + } + (DataType::Struct(fields), None) => { + // Create a StructArray filled with nulls + let null_arrays: Vec = fields + .iter() + .map(|field| Self::create_column(field.data_type(), &None, num_rows)) + .collect::>>()?; + + Arc::new(StructArray::new( + fields.clone(), + null_arrays, + Some(NullBuffer::new_null(num_rows)), + )) + } + (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), + (dt, _) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("unexpected target column type {}", dt), + )); + } + }) + } + } + + /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. + /// This is used for constant fields to determine the Arrow type. + /// For constant values, we use Run-End Encoding for all types to save memory. + fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result { + // Helper to create REE type with the given values type + // Note: values field is nullable as Arrow expects this when building the + // final Arrow schema with `RunArray::try_new`. + let make_ree = |values_type: DataType| -> DataType { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", values_type, true)); + DataType::RunEndEncoded(run_ends_field, values_field) + }; + + Ok(match literal { + PrimitiveLiteral::Boolean(_) => make_ree(DataType::Boolean), + PrimitiveLiteral::Int(_) => make_ree(DataType::Int32), + PrimitiveLiteral::Long(_) => make_ree(DataType::Int64), + PrimitiveLiteral::Float(_) => make_ree(DataType::Float32), + PrimitiveLiteral::Double(_) => make_ree(DataType::Float64), + PrimitiveLiteral::String(_) => make_ree(DataType::Utf8), + PrimitiveLiteral::Binary(_) => make_ree(DataType::Binary), + PrimitiveLiteral::Int128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::UInt128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { return Err(Error::new( ErrorKind::Unexpected, - format!("unexpected target column type {}", dt), + "Cannot create arrow type for AboveMax/BelowMin literal", )); } }) @@ -639,6 +841,54 @@ mod test { }; use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; + /// Helper to extract string values from either StringArray or RunEndEncoded + /// Returns empty string for null values + fn get_string_value(array: &dyn Array, index: usize) -> String { + if let Some(string_array) = array.as_any().downcast_ref::() { + if string_array.is_null(index) { + String::new() + } else { + string_array.value(index).to_string() + } + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let string_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be StringArray"); + // For REE, all rows have the same value (index 0 in the values array) + if string_values.is_null(0) { + String::new() + } else { + string_values.value(0).to_string() + } + } else { + panic!("Expected StringArray or RunEndEncoded"); + } + } + + /// Helper to extract int values from either Int32Array or RunEndEncoded + fn get_int_value(array: &dyn Array, index: usize) -> i32 { + if let Some(int_array) = array.as_any().downcast_ref::() { + int_array.value(index) + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let int_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be Int32Array"); + int_values.value(0) + } else { + panic!("Expected Int32Array or RunEndEncoded"); + } + } + #[test] fn build_field_id_to_source_schema_map_works() { let arrow_schema = arrow_schema_already_same_as_target(); @@ -1137,6 +1387,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1257,6 +1508,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1271,30 +1523,23 @@ mod test { assert_eq!(result.num_columns(), 3); assert_eq!(result.num_rows(), 2); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - // This value MUST come from partition metadata (constant) - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // dept column comes from partition metadata (constant) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - let name_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(name_column.value(0), "Alice"); - assert_eq!(name_column.value(1), "Bob"); + // name column comes from file + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob"); } /// Test bucket partitioning with renamed source column. @@ -1372,6 +1617,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1476,6 +1722,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1492,48 +1739,37 @@ mod test { // Verify each column demonstrates the correct spec rule: // Normal case: id from Parquet by field ID - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); - - // Rule #1: dept from partition metadata (identity transform) - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); + + // Rule #1: dept from partition metadata (identity transform) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - // Rule #2: data from Parquet via name mapping - let data_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(data_column.value(0), "value1"); - assert_eq!(data_column.value(1), "value2"); + // Rule #2: data from Parquet via name mapping - will be regular array + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2"); - // Rule #3: category from initial_default - let category_column = result - .column(3) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(category_column.value(0), "default_category"); - assert_eq!(category_column.value(1), "default_category"); + // Rule #3: category from initial_default - will be REE + assert_eq!( + get_string_value(result.column(3).as_ref(), 0), + "default_category" + ); + assert_eq!( + get_string_value(result.column(3).as_ref(), 1), + "default_category" + ); - // Rule #4: notes is null (no default, not in Parquet, not in partition) - let notes_column = result - .column(4) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(notes_column.is_null(0)); - assert!(notes_column.is_null(1)); + // Rule #4: notes is null (no default, not in Parquet, not in partition) - will be REE with null + // For null REE arrays, we still use the helper which handles extraction + assert_eq!(get_string_value(result.column(4).as_ref(), 0), ""); + assert_eq!(get_string_value(result.column(4).as_ref(), 1), ""); } } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 9c9c7460fb..64f3fcc708 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -96,6 +96,7 @@ mod utils; pub mod writer; mod delete_vector; +pub mod metadata_columns; pub mod puffin; /// Utility functions and modules. diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs new file mode 100644 index 0000000000..388c449f4f --- /dev/null +++ b/crates/iceberg/src/metadata_columns.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata columns (virtual/reserved fields) for Iceberg tables. +//! +//! This module defines metadata columns that can be requested in projections +//! but are not stored in data files. Instead, they are computed on-the-fly +//! during reading. Examples include the _file column (file path) and future +//! columns like partition values or row numbers. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::{DataType, Field}; +use once_cell::sync::Lazy; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::{Error, ErrorKind, Result}; + +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1; + +/// Reserved column name for the file path metadata column +pub const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Reserved field ID for the file_path column used in delete file reading (positional deletes) +pub const RESERVED_FIELD_ID_FILE_PATH: i32 = i32::MAX - 200; + +/// Column name for the file_path column used in delete file reading (positional deletes) +pub const RESERVED_COL_NAME_FILE_PATH: &str = "file_path"; + +/// Reserved field ID for the pos column used in delete file reading (positional deletes) +pub const RESERVED_FIELD_ID_POS: i32 = i32::MAX - 201; + +/// Column name for the pos column used in delete file reading (positional deletes) +pub const RESERVED_COL_NAME_POS: &str = "pos"; + +/// Lazy-initialized Arrow Field definition for the _file metadata column. +/// Uses Run-End Encoding for memory efficiency. +static FILE_FIELD: Lazy> = Lazy::new(|| { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + Arc::new( + Field::new( + RESERVED_COL_NAME_FILE, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_FILE.to_string(), + )])), + ) +}); + +/// Returns the Arrow Field definition for the _file metadata column. +/// +/// # Returns +/// A reference to the _file field definition (RunEndEncoded type) +pub fn file_field() -> &'static Arc { + &FILE_FIELD +} + +/// Lazy-initialized Arrow Field definition for the pos metadata column. +/// Used in positional delete records. +static POS_FIELD: Lazy> = Lazy::new(|| { + Arc::new( + Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_POS.to_string(), + ), + ])), + ) +}); + +/// Returns the Arrow Field definition for the pos metadata column. +/// +/// # Returns +/// A reference to the pos field definition +pub fn pos_field() -> &'static Arc { + &POS_FIELD +} + +/// Lazy-initialized Arrow Field definition for the file_path metadata column. +/// Used in positional delete records to track which file each delete applies to. +static FILE_PATH_FIELD: Lazy> = Lazy::new(|| { + Arc::new( + Field::new(RESERVED_COL_NAME_FILE_PATH, DataType::Utf8, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_FILE_PATH.to_string(), + )]), + ), + ) +}); + +/// Returns the Arrow Field definition for the file_path metadata column. +/// +/// # Returns +/// A reference to the file_path field definition +pub fn file_path_field() -> &'static Arc { + &FILE_PATH_FIELD +} + +/// Returns the Arrow Field definition for a metadata field ID. +/// +/// # Arguments +/// * `field_id` - The metadata field ID +/// +/// # Returns +/// The Arrow Field definition for the metadata column, or an error if not a metadata field +pub fn get_metadata_field(field_id: i32) -> Result> { + match field_id { + RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())), + RESERVED_FIELD_ID_FILE_PATH => Ok(Arc::clone(file_path_field())), + RESERVED_FIELD_ID_POS => Ok(Arc::clone(pos_field())), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Field ID {} is not a (supported) metadata field", field_id), + )), + } +} + +/// Returns the field ID for a metadata column name. +/// +/// # Arguments +/// * `column_name` - The metadata column name +/// +/// # Returns +/// The field ID of the metadata column, or an error if the column name is not recognized +pub fn get_metadata_field_id(column_name: &str) -> Result { + match column_name { + RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), + RESERVED_COL_NAME_FILE_PATH => Ok(RESERVED_FIELD_ID_FILE_PATH), + RESERVED_COL_NAME_POS => Ok(RESERVED_FIELD_ID_POS), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Unknown metadata column name: {column_name}"), + )), + } +} + +/// Checks if a field ID is a metadata field. +/// +/// # Arguments +/// * `field_id` - The field ID to check +/// +/// # Returns +/// `true` if the field ID is a metadata field, `false` otherwise +pub fn is_metadata_field(field_id: i32) -> bool { + matches!( + field_id, + RESERVED_FIELD_ID_FILE | RESERVED_FIELD_ID_FILE_PATH | RESERVED_FIELD_ID_POS + ) +} + +/// Checks if a column name is a metadata column. +/// +/// # Arguments +/// * `column_name` - The column name to check +/// +/// # Returns +/// `true` if the column name is a metadata column, `false` otherwise +pub fn is_metadata_column_name(column_name: &str) -> bool { + get_metadata_field_id(column_name).is_ok() +} diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index e074f49c98..d8e981ffd3 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -28,6 +28,9 @@ use crate::arrow::{ }; use crate::delete_file_index::DeleteFileIndex; use crate::io::FileIO; +use crate::metadata_columns::{ + RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name, +}; use crate::scan::DeleteFileContext; use crate::scan::cache::ExpressionEvaluatorCache; use crate::scan::context::ManifestEntryContext; @@ -111,6 +114,45 @@ impl<'a> IncrementalTableScanBuilder<'a> { self } + /// Include the _file metadata column in the incremental scan. + /// + /// This is a convenience method that adds the _file column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _file. + /// If specific columns are selected, this adds _file to that selection. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _file for incremental scan + /// let scan = table + /// .incremental_scan(None, None) + /// .select(["id", "name"]) + /// .with_file_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_file_column(mut self) -> Self { + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _file column + columns.push(RESERVED_COL_NAME_FILE.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the `from_snapshot_id` for the incremental scan. pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { self.from_snapshot_id = Some(from_snapshot_id); @@ -216,8 +258,13 @@ impl<'a> IncrementalTableScanBuilder<'a> { let schema = snapshot_to.schema(self.table.metadata())?; + // Check that all column names exist in the schema (skip metadata columns) if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip metadata columns that don't exist in the schema + if is_metadata_column_name(column_name) { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -241,6 +288,12 @@ impl<'a> IncrementalTableScanBuilder<'a> { }); for column_name in column_names.iter() { + // Handle metadata columns (like "_file") + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/scan/incremental/task.rs b/crates/iceberg/src/scan/incremental/task.rs index e05703ae28..01ee4997f0 100644 --- a/crates/iceberg/src/scan/incremental/task.rs +++ b/crates/iceberg/src/scan/incremental/task.rs @@ -39,7 +39,7 @@ pub struct BaseIncrementalFileScanTask { /// The format of the data file to scan. pub data_file_format: DataFileFormat, /// The schema of the data file to scan. - pub schema: crate::spec::SchemaRef, + pub schema: SchemaRef, /// The field ids to project. pub project_field_ids: Vec, } diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 276ab9ef12..afa46b69b0 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -31,6 +31,7 @@ use uuid::Uuid; use crate::TableIdent; use crate::io::{FileIO, OutputFile}; +use crate::metadata_columns::RESERVED_COL_NAME_FILE; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, PartitionSpec, SchemaRef, Struct, TableMetadata, @@ -2455,3 +2456,86 @@ async fn test_incremental_scan_includes_root_when_from_is_none() { "Scan with from=None should include root snapshot data" ); } + +#[tokio::test] +async fn test_incremental_scan_with_file_column() { + // Test that the _file metadata column works correctly in incremental scans + + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), + Operation::Add( + vec![(1, "data1".to_string()), (2, "data2".to_string())], + "file1.parquet".to_string(), + ), + Operation::Add( + vec![(10, "data10".to_string())], + "file2.parquet".to_string(), + ), + ]) + .await; + + // Scan with _file column using the builder helper + let scan = fixture + .table + .incremental_scan(Some(1), Some(3)) + .select(vec!["n", "data"]) + .with_file_column() + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Get append batches + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify we have data and the _file column + assert!(!append_batches.is_empty(), "Should have append batches"); + + for batch in append_batches { + // Should have 3 columns: n, data, _file + assert_eq!( + batch.num_columns(), + 3, + "Should have n, data, and _file columns" + ); + + // Verify _file column exists + let file_column = batch.column_by_name(RESERVED_COL_NAME_FILE); + assert!(file_column.is_some(), "_file column should exist"); + + // Verify _file column contains a file path + let file_col = file_column.unwrap(); + + // The _file column will be a RunEndEncoded array with the file path + if let Some(run_array) = file_col + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let string_values = values + .as_any() + .downcast_ref::() + .unwrap(); + let file_path = string_values.value(0); + + // Verify file path ends with .parquet and contains the table location + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet: {}", + file_path + ); + assert!( + file_path.contains("/data/"), + "File path should contain /data/: {}", + file_path + ); + } else { + panic!("_file column should be RunEndEncoded array"); + } + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 7acefadb47..96bbe249b2 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -39,6 +39,9 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; +use crate::metadata_columns::{ + RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name, +}; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; @@ -127,6 +130,45 @@ impl<'a> TableScanBuilder<'a> { self } + /// Include the _file metadata column in the scan. + /// + /// This is a convenience method that adds the _file column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _file. + /// If specific columns are selected, this adds _file to that selection. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _file + /// let scan = table + /// .scan() + /// .select(["id", "name"]) + /// .with_file_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_file_column(mut self) -> Self { + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _file column + columns.push(RESERVED_COL_NAME_FILE.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the snapshot to scan. When not set, it uses current snapshot. pub fn snapshot_id(mut self, snapshot_id: i64) -> Self { self.snapshot_id = Some(snapshot_id); @@ -220,9 +262,13 @@ impl<'a> TableScanBuilder<'a> { let schema = snapshot.schema(self.table.metadata())?; - // Check that all column names exist in the schema. + // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip reserved columns that don't exist in the schema + if is_metadata_column_name(column_name) { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -243,6 +289,12 @@ impl<'a> TableScanBuilder<'a> { }); for column_name in column_names.iter() { + // Handle metadata columns (like "_file") + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -257,10 +309,10 @@ impl<'a> TableScanBuilder<'a> { Error::new( ErrorKind::FeatureUnsupported, format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; field_ids.push(field_id); } @@ -562,8 +614,10 @@ pub mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use futures::{TryStreamExt, stream}; use minijinja::value::Value; @@ -578,6 +632,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metadata_columns::RESERVED_COL_NAME_FILE; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -1803,4 +1858,319 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_select_with_file_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _file column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _file + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _file column exists + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE); + assert!( + file_col.is_some(), + "_file column should be present in the batch" + ); + + // Verify the _file column contains a file path + let file_col = file_col.unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + + // Decode the RunArray to verify it contains the file path + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have a single file path"); + + let file_path = string_values.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet, got: {}", + file_path + ); + } + + #[tokio::test] + async fn test_select_file_column_position() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select columns in specific order: x, _file, z + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify column order: x at position 0, _file at position 1, z at position 2 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(2).name(), "z"); + + // Verify columns by name also works + assert!(batches[0].column_by_name("x").is_some()); + assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batches[0].column_by_name("z").is_some()); + } + + #[tokio::test] + async fn test_select_file_column_only() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select only the _file column + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Should have exactly 1 column + assert_eq!(batches[0].num_columns(), 1); + + // Verify it's the _file column + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + + // Verify the batch has the correct number of rows + // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted) + // Each file has 1024 rows, so total is 2048 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2048); + } + + #[tokio::test] + async fn test_file_column_with_multiple_files() { + use std::collections::HashSet; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select x and _file columns + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Collect all unique file paths from the batches + let mut file_paths = HashSet::new(); + for batch in &batches { + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + for i in 0..string_values.len() { + file_paths.insert(string_values.value(i).to_string()); + } + } + + // We should have multiple files (the test creates 1.parquet and 3.parquet) + assert!(!file_paths.is_empty(), "Should have at least one file path"); + + // All paths should end with .parquet + for path in &file_paths { + assert!( + path.ends_with(".parquet"), + "All file paths should end with .parquet, got: {}", + path + ); + } + } + + #[tokio::test] + async fn test_file_column_at_start() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the start + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE, "x", "y"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 0 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(1).name(), "x"); + assert_eq!(schema.field(2).name(), "y"); + } + + #[tokio::test] + async fn test_file_column_at_end() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the end + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 2 (the end) + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + } + + #[tokio::test] + async fn test_select_with_repeated_column_names() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select with repeated column names - both regular columns and virtual columns + // Repeated columns should appear multiple times in the result (duplicates are allowed) + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_FILE, + "x", // x repeated + "y", + RESERVED_COL_NAME_FILE, // _file repeated + "y", // y repeated + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have exactly 6 columns (duplicates are allowed and preserved) + assert_eq!( + batches[0].num_columns(), + 6, + "Should have exactly 6 columns with duplicates" + ); + + let schema = batches[0].schema(); + + // Verify columns appear in the exact order requested: x, _file, x, y, _file, y + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Column 1 should be _file" + ); + assert_eq!( + schema.field(2).name(), + "x", + "Column 2 should be x (duplicate)" + ); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + assert_eq!( + schema.field(4).name(), + RESERVED_COL_NAME_FILE, + "Column 4 should be _file (duplicate)" + ); + assert_eq!( + schema.field(5).name(), + "y", + "Column 5 should be y (duplicate)" + ); + + // Verify all columns have correct data types + assert!( + matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64), + "Column x should be Int64" + ); + assert!( + matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64), + "Column x (duplicate) should be Int64" + ); + assert!( + matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64), + "Column y should be Int64" + ); + assert!( + matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64), + "Column y (duplicate) should be Int64" + ); + assert!( + matches!( + schema.field(1).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + assert!( + matches!( + schema.field(4).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column (duplicate) should use RunEndEncoded type" + ); + } }