diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index d9c079a2f8..e2eba958f5 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -23,12 +23,14 @@ use arrow_schema::Schema as ArrowSchema; use futures::channel::mpsc::channel; use futures::stream::select; use futures::{Stream, StreamExt, TryStreamExt}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use crate::arrow::reader::process_record_batch_stream; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::{ArrowReader, StreamsInto}; use crate::delete_vector::DeleteVector; use crate::io::FileIO; +use crate::metadata_columns::{RESERVED_FIELD_ID_UNDERSCORE_POS, row_pos_field}; use crate::runtime::spawn; use crate::scan::ArrowRecordBatchStream; use crate::scan::incremental::{ @@ -172,11 +174,29 @@ async fn process_incremental_append_task( batch_size: Option, file_io: FileIO, ) -> Result { + let mut virtual_columns = Vec::new(); + + // Check if _pos column is requested and add it as a virtual column + let has_pos_column = task + .base + .project_field_ids + .contains(&RESERVED_FIELD_ID_UNDERSCORE_POS); + if has_pos_column { + // Add _pos as a virtual column to be produced by the Parquet reader + virtual_columns.push(Arc::clone(row_pos_field())); + } + + let arrow_reader_options = if !virtual_columns.is_empty() { + Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?) + } else { + None + }; + let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder( &task.base.data_file_path, file_io, true, - None, // arrow_reader_options + arrow_reader_options, ) .await?; @@ -194,13 +214,19 @@ async fn process_incremental_append_task( // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion, // column re-ordering, and virtual field addition (like _file) - let mut record_batch_transformer = + let mut record_batch_transformer_builder = RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids) .with_constant( crate::metadata_columns::RESERVED_FIELD_ID_FILE, crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()), - )? - .build(); + )?; + + if has_pos_column { + record_batch_transformer_builder = + record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?; + } + + let mut record_batch_transformer = record_batch_transformer_builder.build(); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4bebf9b612..d6cc8ae9ca 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -57,7 +57,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; +use crate::metadata_columns::{ + RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_UNDERSCORE_POS, is_metadata_field, row_pos_field, +}; use crate::runtime::spawn; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type}; @@ -246,13 +248,24 @@ impl ArrowReader { let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); + let mut virtual_columns = Vec::new(); + + // Check if _pos column is requested and add it as a virtual column + let has_pos_column = task + .project_field_ids + .contains(&RESERVED_FIELD_ID_UNDERSCORE_POS); + if has_pos_column { + // Add _pos as a virtual column to be produced by the Parquet reader + virtual_columns.push(Arc::clone(row_pos_field())); + } + // Migrated tables lack field IDs, requiring us to inspect the schema to choose // between field-ID-based or position-based projection let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), should_load_page_index, - None, + Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?), ) .await?; @@ -298,7 +311,9 @@ impl ArrowReader { add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()) }; - let options = ArrowReaderOptions::new().with_schema(arrow_schema); + let options = ArrowReaderOptions::new() + .with_schema(arrow_schema) + .with_virtual_columns(virtual_columns)?; Self::create_parquet_record_batch_stream_builder( &task.data_file_path, @@ -345,6 +360,11 @@ impl ArrowReader { PrimitiveLiteral::String(task.data_file_path.clone()), )?; + if has_pos_column { + record_batch_transformer_builder = + record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?; + } + if let (Some(partition_spec), Some(partition_data)) = (task.partition_spec.clone(), task.partition.clone()) { diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 1cee224903..4b4010bbba 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -154,6 +154,7 @@ pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, constant_fields: HashMap, + virtual_fields: HashMap, } impl RecordBatchTransformerBuilder { @@ -165,6 +166,7 @@ impl RecordBatchTransformerBuilder { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), constant_fields: HashMap::new(), + virtual_fields: HashMap::new(), } } @@ -180,6 +182,27 @@ impl RecordBatchTransformerBuilder { Ok(self) } + /// Add a virtual field for a specific field ID. + /// This is used for virtual/metadata fields like _pos that are produced by the Parquet reader. + /// + /// # Arguments + /// * `field` - The Arrow field representing the virtual column + pub(crate) fn with_virtual_field(mut self, field: FieldRef) -> Result { + // Extract field ID from metadata + let field_id = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id_str| id_str.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Virtual field must have a field ID in metadata", + ) + })?; + self.virtual_fields.insert(field_id, field); + Ok(self) + } + /// Set partition spec and data together for identifying identity-transformed partition columns. /// /// Both partition_spec and partition_data must be provided together since the spec defines @@ -207,6 +230,7 @@ impl RecordBatchTransformerBuilder { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, constant_fields: self.constant_fields, + virtual_fields: self.virtual_fields, batch_transform: None, } } @@ -250,6 +274,10 @@ pub(crate) struct RecordBatchTransformer { // Includes both virtual/metadata fields (like _file) and identity-partitioned fields // Avoids type conversions during batch processing constant_fields: HashMap, + // Virtual fields are metadata fields that are not present in the snapshot schema, + // but are present in the source schema (arrow reader produces them) + // Map from field_id to FieldRef + virtual_fields: HashMap, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -292,6 +320,7 @@ impl RecordBatchTransformer { self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, &self.constant_fields, + &self.virtual_fields, )?); self.process_record_batch(record_batch)? @@ -311,6 +340,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], constant_fields: &HashMap, + virtual_fields: &HashMap, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -321,7 +351,12 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { - // Check if this is a constant field (virtual or partition) + // Check if this is a virtual field from Parquet reader + if let Some(virtual_field) = virtual_fields.get(field_id) { + return Ok(Arc::clone(virtual_field)); + } + + // Check if this is a constant field (metadata/virtual or partition) if constant_fields.contains_key(field_id) { // For metadata/virtual fields (like _file), get name from metadata_columns // For partition fields, get name from schema (they exist in schema) @@ -364,6 +399,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids, field_id_to_mapped_schema_map, constant_fields, + virtual_fields, )?, target_schema, }), @@ -421,6 +457,7 @@ impl RecordBatchTransformer { projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, constant_fields: &HashMap, + virtual_fields: &HashMap, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -439,6 +476,19 @@ impl RecordBatchTransformer { }); } + // Check if this is a virtual field from Parquet reader (like _pos) + // Virtual fields don't exist in snapshot schema, they come from source + if virtual_fields.contains_key(field_id) { + let source_index = field_id_to_source_schema_map + .get(field_id) + .map(|(_, idx)| *idx) + .ok_or(Error::new( + ErrorKind::Unexpected, + "Virtual field not found in source schema", + ))?; + return Ok(ColumnSource::PassThrough { source_index }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs index b80619375f..639d2028b9 100644 --- a/crates/iceberg/src/metadata_columns.rs +++ b/crates/iceberg/src/metadata_columns.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use arrow_schema::{DataType, Field}; use once_cell::sync::Lazy; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, RowNumber}; use crate::{Error, ErrorKind, Result}; @@ -37,6 +37,12 @@ pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1; /// Reserved column name for the file path metadata column pub const RESERVED_COL_NAME_FILE: &str = "_file"; +/// Reserved field ID for the row position (_pos) metadata column +pub const RESERVED_FIELD_ID_UNDERSCORE_POS: i32 = i32::MAX - 2; + +/// Reserved column name for the row position metadata column +pub const RESERVED_COL_NAME_UNDERSCORE_POS: &str = "_pos"; + /// Reserved field ID for the file_path column used in delete file reading (positional deletes) pub const RESERVED_FIELD_ID_FILE_PATH: i32 = i32::MAX - 200; @@ -67,6 +73,27 @@ pub fn file_field() -> &'static Arc { &FILE_FIELD } +/// Lazy-initialized Arrow Field definition for the _pos metadata column. +/// Used for row position within a file. +static ROW_POS_FIELD: Lazy> = Lazy::new(|| { + Arc::new( + Field::new(RESERVED_COL_NAME_UNDERSCORE_POS, DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_UNDERSCORE_POS.to_string(), + )])) + .with_extension_type(RowNumber), + ) +}); + +/// Returns the Arrow Field definition for the _pos metadata column. +/// +/// # Returns +/// A reference to the _pos field definition +pub fn row_pos_field() -> &'static Arc { + &ROW_POS_FIELD +} + /// Lazy-initialized Arrow Field definition for the pos metadata column. /// Used in positional delete records. static POS_FIELD: Lazy> = Lazy::new(|| { @@ -119,6 +146,7 @@ pub fn file_path_field() -> &'static Arc { pub fn get_metadata_field(field_id: i32) -> Result> { match field_id { RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())), + RESERVED_FIELD_ID_UNDERSCORE_POS => Ok(Arc::clone(row_pos_field())), RESERVED_FIELD_ID_FILE_PATH => Ok(Arc::clone(file_path_field())), RESERVED_FIELD_ID_POS => Ok(Arc::clone(pos_field())), _ => Err(Error::new( @@ -138,6 +166,7 @@ pub fn get_metadata_field(field_id: i32) -> Result> { pub fn get_metadata_field_id(column_name: &str) -> Result { match column_name { RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), + RESERVED_COL_NAME_UNDERSCORE_POS => Ok(RESERVED_FIELD_ID_UNDERSCORE_POS), RESERVED_COL_NAME_FILE_PATH => Ok(RESERVED_FIELD_ID_FILE_PATH), RESERVED_COL_NAME_POS => Ok(RESERVED_FIELD_ID_POS), _ => Err(Error::new( @@ -157,7 +186,10 @@ pub fn get_metadata_field_id(column_name: &str) -> Result { pub fn is_metadata_field(field_id: i32) -> bool { matches!( field_id, - RESERVED_FIELD_ID_FILE | RESERVED_FIELD_ID_FILE_PATH | RESERVED_FIELD_ID_POS + RESERVED_FIELD_ID_FILE + | RESERVED_FIELD_ID_UNDERSCORE_POS + | RESERVED_FIELD_ID_FILE_PATH + | RESERVED_FIELD_ID_POS ) } diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index d8e981ffd3..e9d5889d0f 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -29,7 +29,8 @@ use crate::arrow::{ use crate::delete_file_index::DeleteFileIndex; use crate::io::FileIO; use crate::metadata_columns::{ - RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name, + RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_UNDERSCORE_POS, get_metadata_field_id, + is_metadata_column_name, }; use crate::scan::DeleteFileContext; use crate::scan::cache::ExpressionEvaluatorCache; @@ -153,6 +154,48 @@ impl<'a> IncrementalTableScanBuilder<'a> { self } + /// Include the _pos metadata column in the incremental scan. + /// + /// This is a convenience method that adds the _pos column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _pos. + /// If specific columns are selected, this adds _pos to that selection. + /// + /// The _pos column contains the row position within the file, produced by the underlying + /// Parquet reader as a virtual column. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _pos for incremental scan + /// let scan = table + /// .incremental_scan(None, None) + /// .select(["id", "name"]) + /// .with_pos_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_pos_column(mut self) -> Self { + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _pos column + columns.push(RESERVED_COL_NAME_UNDERSCORE_POS.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the `from_snapshot_id` for the incremental scan. pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { self.from_snapshot_id = Some(from_snapshot_id); diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs index 4dad7d77e5..22fe3142cd 100644 --- a/crates/iceberg/src/scan/incremental/tests.rs +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -31,7 +31,7 @@ use uuid::Uuid; use crate::TableIdent; use crate::io::{FileIO, OutputFile}; -use crate::metadata_columns::RESERVED_COL_NAME_FILE; +use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_UNDERSCORE_POS}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, PartitionSpec, SchemaRef, Struct, TableMetadata, @@ -2529,3 +2529,240 @@ async fn test_incremental_scan_with_file_column() { } } } + +#[tokio::test] +async fn test_incremental_select_with_pos_column() { + use arrow_array::cast::AsArray; + + // Create a fixture with test data + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), // Snapshot 1: empty + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), // Snapshot 2 + ]) + .await; + + // Build an incremental scan with _pos column + let scan = fixture + .table + .incremental_scan(Some(1), Some(2)) + .select(["n", RESERVED_COL_NAME_UNDERSCORE_POS]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Get append batches (we're only appending in this test) + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify we have append batches + assert!(!append_batches.is_empty(), "Should have append batches"); + + for batch in append_batches { + // Should have 2 columns: n and _pos + assert_eq!(batch.num_columns(), 2, "Should have n and _pos columns"); + + // Verify the n column exists + assert!(batch.column_by_name("n").is_some(), "n column should exist"); + + // Verify the _pos column exists + let pos_col = batch.column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS); + assert!( + pos_col.is_some(), + "_pos column should be present in the batch" + ); + + // Verify the _pos column has correct data type (Int64 from RowNumber extension) + let pos_col = pos_col.unwrap(); + assert_eq!( + pos_col.data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Get the position values from the Int64Array + let pos_array = pos_col.as_primitive::(); + + // Verify first position is 0 + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + + // Verify positions are sequential + for i in 1..pos_array.len() { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + + // Test variant 2: Use with_pos_column() method instead of selecting by name + let scan = fixture + .table + .incremental_scan(Some(1), Some(2)) + .select(["n"]) + .with_pos_column() + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Get append batches + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify we have append batches + assert!(!append_batches.is_empty(), "Should have append batches"); + + for batch in append_batches { + // Should have 2 columns: n and _pos + assert_eq!( + batch.num_columns(), + 2, + "Should have n and _pos columns when using with_pos_column()" + ); + + // Verify the _pos column exists + let pos_col = batch.column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS); + assert!( + pos_col.is_some(), + "_pos column should be present when using with_pos_column()" + ); + + // Verify the _pos column has correct data type + let pos_col = pos_col.unwrap(); + assert_eq!( + pos_col.data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Verify positions are sequential + let pos_array = pos_col.as_primitive::(); + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + for i in 1..pos_array.len() { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + } + } +} + +#[tokio::test] +async fn test_incremental_select_with_pos_and_file_columns() { + use arrow_array::cast::AsArray; + + // Create a fixture with test data + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), // Snapshot 1: empty + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "data-1.parquet".to_string(), + ), // Snapshot 2 + ]) + .await; + + // Build an incremental scan with both _pos and _file columns + let scan = fixture + .table + .incremental_scan(Some(1), Some(2)) + .select([ + "n", + RESERVED_COL_NAME_FILE, + "data", + RESERVED_COL_NAME_UNDERSCORE_POS, + ]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Get append batches + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify we have append batches + assert!(!append_batches.is_empty(), "Should have append batches"); + + for batch in append_batches { + // Should have 4 columns: n, _file, data, _pos + assert_eq!( + batch.num_columns(), + 4, + "Should have n, _file, data, and _pos columns" + ); + + // Verify all columns exist + assert!(batch.column_by_name("n").is_some()); + assert!(batch.column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batch.column_by_name("data").is_some()); + assert!( + batch + .column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS) + .is_some() + ); + + // Verify the _pos column has correct data type + let pos_col = batch + .column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS) + .unwrap(); + assert_eq!( + pos_col.data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Verify positions are sequential starting from 0 + let pos_array = pos_col.as_primitive::(); + for i in 0..pos_array.len() { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + + // Verify _file column contains a valid file path + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let string_array = file_col.as_string::(); + for i in 0..batch.num_rows() { + let file_path = string_array.value(i); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet: {}", + file_path + ); + } + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 8ec3112015..14af028edd 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -40,7 +40,8 @@ use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metadata_columns::{ - RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name, + RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_UNDERSCORE_POS, get_metadata_field_id, + is_metadata_column_name, }; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; @@ -169,6 +170,48 @@ impl<'a> TableScanBuilder<'a> { self } + /// Include the _pos metadata column in the scan. + /// + /// This is a convenience method that adds the _pos column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _pos. + /// If specific columns are selected, this adds _pos to that selection. + /// + /// The _pos column contains the row position within the file, produced by the underlying + /// Parquet reader as a virtual column. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _pos + /// let scan = table + /// .scan() + /// .select(["id", "name"]) + /// .with_pos_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_pos_column(mut self) -> Self { + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _pos column + columns.push(RESERVED_COL_NAME_UNDERSCORE_POS.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the snapshot to scan. When not set, it uses current snapshot. pub fn snapshot_id(mut self, snapshot_id: i64) -> Self { self.snapshot_id = Some(snapshot_id); @@ -632,7 +675,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; - use crate::metadata_columns::RESERVED_COL_NAME_FILE; + use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_UNDERSCORE_POS}; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -2153,4 +2196,302 @@ pub mod tests { "_file column (duplicate) should use Utf8 type" ); } + + #[tokio::test] + async fn test_select_with_pos_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _pos column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_UNDERSCORE_POS]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _pos + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _pos column exists + let pos_col = batches[0].column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS); + assert!( + pos_col.is_some(), + "_pos column should be present in the batch" + ); + + // Verify the _pos column has correct data type (Int64 from RowNumber extension) + let pos_col = pos_col.unwrap(); + assert_eq!( + pos_col.data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Get the position values from the Int64Array + let pos_array = pos_col.as_primitive::(); + + // Verify first position is 0 + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + + // Verify positions are sequential + for i in 1..pos_array.len().min(10) { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + + // Test variant 2: Use with_pos_column() method instead of selecting by name + let table_scan = fixture + .table + .scan() + .select(["x"]) + .with_pos_column() + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _pos + assert_eq!(batches[0].num_columns(), 2); + + // Verify the _pos column exists + let pos_col = batches[0].column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS); + assert!( + pos_col.is_some(), + "_pos column should be present when using with_pos_column()" + ); + + // Verify the _pos column has correct data type + let pos_col = pos_col.unwrap(); + assert_eq!( + pos_col.data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Verify positions are sequential + let pos_array = pos_col.as_primitive::(); + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + for i in 1..pos_array.len().min(10) { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + } + + #[tokio::test] + async fn test_select_with_pos_and_file_columns() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Test 1: _file first, then _pos + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_FILE, + "y", + RESERVED_COL_NAME_UNDERSCORE_POS, + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 4 columns: x, _file, y, _pos + assert_eq!(batches[0].num_columns(), 4); + + // Verify column order + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Column 1 should be _file" + ); + assert_eq!(schema.field(2).name(), "y", "Column 2 should be y"); + assert_eq!( + schema.field(3).name(), + RESERVED_COL_NAME_UNDERSCORE_POS, + "Column 3 should be _pos" + ); + + // Verify data types + assert_eq!( + schema.field(1).data_type(), + &arrow_schema::DataType::Utf8, + "_file column should use Utf8 type" + ); + assert_eq!( + schema.field(3).data_type(), + &arrow_schema::DataType::Int64, + "_pos column should use Int64 type" + ); + + // Verify _file column has valid data + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let file_array = file_col.as_string::(); + let file_path = file_array.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet" + ); + + // Verify _pos column has valid sequential data + let pos_col = batches[0] + .column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS) + .unwrap(); + let pos_array = pos_col.as_primitive::(); + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + for i in 1..pos_array.len().min(10) { + assert_eq!( + pos_array.value(i), + i as i64, + "Row {} should have position {}", + i, + i + ); + } + + // Test 2: _pos first, then _file (reversed order) + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_UNDERSCORE_POS, + "y", + RESERVED_COL_NAME_FILE, + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 4 columns in the new order + assert_eq!(batches[0].num_columns(), 4); + + // Verify column order is now: x, _pos, y, _file + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_UNDERSCORE_POS, + "Column 1 should be _pos" + ); + assert_eq!(schema.field(2).name(), "y", "Column 2 should be y"); + assert_eq!( + schema.field(3).name(), + RESERVED_COL_NAME_FILE, + "Column 3 should be _file" + ); + + // Verify data is still correct + let pos_col = batches[0] + .column_by_name(RESERVED_COL_NAME_UNDERSCORE_POS) + .unwrap(); + let pos_array = pos_col.as_primitive::(); + assert_eq!(pos_array.value(0), 0, "First row should have position 0"); + + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let file_array = file_col.as_string::(); + let file_path = file_array.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet" + ); + + // Test 3: Both at the start + let table_scan = fixture + .table + .scan() + .select([ + RESERVED_COL_NAME_FILE, + RESERVED_COL_NAME_UNDERSCORE_POS, + "x", + "y", + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 4); + let schema = batches[0].schema(); + assert_eq!( + schema.field(0).name(), + RESERVED_COL_NAME_FILE, + "Column 0 should be _file" + ); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_UNDERSCORE_POS, + "Column 1 should be _pos" + ); + assert_eq!(schema.field(2).name(), "x", "Column 2 should be x"); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + + // Test 4: Both at the end + let table_scan = fixture + .table + .scan() + .select([ + "x", + "y", + RESERVED_COL_NAME_UNDERSCORE_POS, + RESERVED_COL_NAME_FILE, + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 4); + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!(schema.field(1).name(), "y", "Column 1 should be y"); + assert_eq!( + schema.field(2).name(), + RESERVED_COL_NAME_UNDERSCORE_POS, + "Column 2 should be _pos" + ); + assert_eq!( + schema.field(3).name(), + RESERVED_COL_NAME_FILE, + "Column 3 should be _file" + ); + } }