Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4dedab8
upgrade arrow-rs (temp to github url until 57.1.0 is released)
vustef Nov 19, 2025
c1b3f85
Add cargo.lock; Disable datafusion since of dependency mismatch
vustef Nov 19, 2025
f430f94
fix unit tests
vustef Nov 19, 2025
47ee5e1
ignore datafusion crates with rustfmt too
vustef Nov 19, 2025
e3c5d48
make ignored cargos standalone
vustef Nov 19, 2025
5536adc
-||-
vustef Nov 19, 2025
e634cdb
ignore in rustfmt requires nightly
vustef Nov 19, 2025
31f5c92
comment out dependencies
vustef Nov 19, 2025
140c916
cargo fmt
vustef Nov 19, 2025
ed9e98c
-||-
vustef Nov 19, 2025
789c3de
Fix clippy
vustef Nov 19, 2025
cf59465
fix clippy
vustef Nov 19, 2025
dbfde33
clippy in CI
vustef Nov 19, 2025
c616e01
fix CI clippy command
vustef Nov 19, 2025
2fe3774
switch working directory
vustef Nov 19, 2025
b118fbc
Disable python bindings CI tests
vustef Nov 19, 2025
7d8199b
Disable only certain jobs in python bindings CI
vustef Nov 19, 2025
cb2150a
Remove unused dependency
vustef Nov 19, 2025
ac7dbdd
forgot cargo.lock
vustef Nov 19, 2025
9712482
Support for _pos using arrow reader row numbers feature
vustef Nov 19, 2025
a708f11
clippy and fmt
vustef Nov 19, 2025
dcb3960
update arrow-rs
vustef Nov 20, 2025
cf315bb
Merge branch 'vs-upgrade-arrow-57-1' of github.com:RelationalAI/icebe…
vustef Nov 20, 2025
879b6e1
pos in incremental scan too
vustef Nov 20, 2025
57b1d54
with_virtual for adding to hash map
vustef Nov 20, 2025
1a6d349
remove todo comment, it's fine
vustef Nov 20, 2025
8a228f5
Merge branch 'main' of github.com:RelationalAI/iceberg-rust into vs-pos
vustef Nov 20, 2025
aa28a83
revert back some changes
vustef Nov 20, 2025
ace9baf
-||-
vustef Nov 20, 2025
d75584d
add `with_pos_column` in tests
vustef Nov 20, 2025
5003e5d
cargo fmt
vustef Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use arrow_schema::Schema as ArrowSchema;
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{Stream, StreamExt, TryStreamExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;

use crate::arrow::reader::process_record_batch_stream;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{ArrowReader, StreamsInto};
use crate::delete_vector::DeleteVector;
use crate::io::FileIO;
use crate::metadata_columns::{RESERVED_FIELD_ID_UNDERSCORE_POS, row_pos_field};
use crate::runtime::spawn;
use crate::scan::ArrowRecordBatchStream;
use crate::scan::incremental::{
Expand Down Expand Up @@ -172,11 +174,29 @@ async fn process_incremental_append_task(
batch_size: Option<usize>,
file_io: FileIO,
) -> Result<ArrowRecordBatchStream> {
let mut virtual_columns = Vec::new();

// Check if _pos column is requested and add it as a virtual column
let has_pos_column = task
.base
.project_field_ids
.contains(&RESERVED_FIELD_ID_UNDERSCORE_POS);
if has_pos_column {
// Add _pos as a virtual column to be produced by the Parquet reader
virtual_columns.push(Arc::clone(row_pos_field()));
}

let arrow_reader_options = if !virtual_columns.is_empty() {
Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?)
} else {
None
};

let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
&task.base.data_file_path,
file_io,
true,
None, // arrow_reader_options
arrow_reader_options,
)
.await?;

Expand All @@ -194,13 +214,19 @@ async fn process_incremental_append_task(
// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let mut record_batch_transformer =
let mut record_batch_transformer_builder =
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
.with_constant(
crate::metadata_columns::RESERVED_FIELD_ID_FILE,
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
)?
.build();
)?;

if has_pos_column {
record_batch_transformer_builder =
record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?;
}

let mut record_batch_transformer = record_batch_transformer_builder.build();

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down
26 changes: 23 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::metadata_columns::{
RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_UNDERSCORE_POS, is_metadata_field, row_pos_field,
};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type};
Expand Down Expand Up @@ -246,13 +248,24 @@ impl ArrowReader {
let delete_filter_rx =
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));

let mut virtual_columns = Vec::new();

// Check if _pos column is requested and add it as a virtual column
let has_pos_column = task
.project_field_ids
.contains(&RESERVED_FIELD_ID_UNDERSCORE_POS);
if has_pos_column {
// Add _pos as a virtual column to be produced by the Parquet reader
virtual_columns.push(Arc::clone(row_pos_field()));
}

// Migrated tables lack field IDs, requiring us to inspect the schema to choose
// between field-ID-based or position-based projection
let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
None,
Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?),
)
.await?;

Expand Down Expand Up @@ -298,7 +311,9 @@ impl ArrowReader {
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
};

let options = ArrowReaderOptions::new().with_schema(arrow_schema);
let options = ArrowReaderOptions::new()
.with_schema(arrow_schema)
.with_virtual_columns(virtual_columns)?;

Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
Expand Down Expand Up @@ -345,6 +360,11 @@ impl ArrowReader {
PrimitiveLiteral::String(task.data_file_path.clone()),
)?;

if has_pos_column {
record_batch_transformer_builder =
record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?;
}

if let (Some(partition_spec), Some(partition_data)) =
(task.partition_spec.clone(), task.partition.clone())
{
Expand Down
52 changes: 51 additions & 1 deletion crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub(crate) struct RecordBatchTransformerBuilder {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,
virtual_fields: HashMap<i32, FieldRef>,
}

impl RecordBatchTransformerBuilder {
Expand All @@ -165,6 +166,7 @@ impl RecordBatchTransformerBuilder {
snapshot_schema,
projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
constant_fields: HashMap::new(),
virtual_fields: HashMap::new(),
}
}

Expand All @@ -180,6 +182,27 @@ impl RecordBatchTransformerBuilder {
Ok(self)
}

/// Add a virtual field for a specific field ID.
/// This is used for virtual/metadata fields like _pos that are produced by the Parquet reader.
///
/// # Arguments
/// * `field` - The Arrow field representing the virtual column
pub(crate) fn with_virtual_field(mut self, field: FieldRef) -> Result<Self> {
// Extract field ID from metadata
let field_id = field
.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|id_str| id_str.parse::<i32>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Virtual field must have a field ID in metadata",
)
})?;
self.virtual_fields.insert(field_id, field);
Ok(self)
}

/// Set partition spec and data together for identifying identity-transformed partition columns.
///
/// Both partition_spec and partition_data must be provided together since the spec defines
Expand Down Expand Up @@ -207,6 +230,7 @@ impl RecordBatchTransformerBuilder {
snapshot_schema: self.snapshot_schema,
projected_iceberg_field_ids: self.projected_iceberg_field_ids,
constant_fields: self.constant_fields,
virtual_fields: self.virtual_fields,
batch_transform: None,
}
}
Expand Down Expand Up @@ -250,6 +274,10 @@ pub(crate) struct RecordBatchTransformer {
// Includes both virtual/metadata fields (like _file) and identity-partitioned fields
// Avoids type conversions during batch processing
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,
// Virtual fields are metadata fields that are not present in the snapshot schema,
// but are present in the source schema (arrow reader produces them)
// Map from field_id to FieldRef
virtual_fields: HashMap<i32, FieldRef>,

// BatchTransform gets lazily constructed based on the schema of
// the first RecordBatch we receive from the file
Expand Down Expand Up @@ -292,6 +320,7 @@ impl RecordBatchTransformer {
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
&self.constant_fields,
&self.virtual_fields,
)?);

self.process_record_batch(record_batch)?
Expand All @@ -311,6 +340,7 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
virtual_fields: &HashMap<i32, FieldRef>,
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_mapped_schema_map =
Expand All @@ -321,7 +351,12 @@ impl RecordBatchTransformer {
let fields: Result<Vec<_>> = projected_iceberg_field_ids
.iter()
.map(|field_id| {
// Check if this is a constant field (virtual or partition)
// Check if this is a virtual field from Parquet reader
if let Some(virtual_field) = virtual_fields.get(field_id) {
return Ok(Arc::clone(virtual_field));
}

// Check if this is a constant field (metadata/virtual or partition)
if constant_fields.contains_key(field_id) {
// For metadata/virtual fields (like _file), get name from metadata_columns
// For partition fields, get name from schema (they exist in schema)
Expand Down Expand Up @@ -364,6 +399,7 @@ impl RecordBatchTransformer {
projected_iceberg_field_ids,
field_id_to_mapped_schema_map,
constant_fields,
virtual_fields,
)?,
target_schema,
}),
Expand Down Expand Up @@ -421,6 +457,7 @@ impl RecordBatchTransformer {
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
virtual_fields: &HashMap<i32, FieldRef>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
Expand All @@ -439,6 +476,19 @@ impl RecordBatchTransformer {
});
}

// Check if this is a virtual field from Parquet reader (like _pos)
// Virtual fields don't exist in snapshot schema, they come from source
if virtual_fields.contains_key(field_id) {
let source_index = field_id_to_source_schema_map
.get(field_id)
.map(|(_, idx)| *idx)
.ok_or(Error::new(
ErrorKind::Unexpected,
"Virtual field not found in source schema",
))?;
return Ok(ColumnSource::PassThrough { source_index });
}

let (target_field, _) =
field_id_to_mapped_schema_map
.get(field_id)
Expand Down
36 changes: 34 additions & 2 deletions crates/iceberg/src/metadata_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Field};
use once_cell::sync::Lazy;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, RowNumber};

use crate::{Error, ErrorKind, Result};

Expand All @@ -37,6 +37,12 @@ pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1;
/// Reserved column name for the file path metadata column
pub const RESERVED_COL_NAME_FILE: &str = "_file";

/// Reserved field ID for the row position (_pos) metadata column
pub const RESERVED_FIELD_ID_UNDERSCORE_POS: i32 = i32::MAX - 2;

/// Reserved column name for the row position metadata column
pub const RESERVED_COL_NAME_UNDERSCORE_POS: &str = "_pos";

/// Reserved field ID for the file_path column used in delete file reading (positional deletes)
pub const RESERVED_FIELD_ID_FILE_PATH: i32 = i32::MAX - 200;

Expand Down Expand Up @@ -67,6 +73,27 @@ pub fn file_field() -> &'static Arc<Field> {
&FILE_FIELD
}

/// Lazy-initialized Arrow Field definition for the _pos metadata column.
/// Used for row position within a file.
static ROW_POS_FIELD: Lazy<Arc<Field>> = Lazy::new(|| {
Arc::new(
Field::new(RESERVED_COL_NAME_UNDERSCORE_POS, DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
RESERVED_FIELD_ID_UNDERSCORE_POS.to_string(),
)]))
.with_extension_type(RowNumber),
)
});

/// Returns the Arrow Field definition for the _pos metadata column.
///
/// # Returns
/// A reference to the _pos field definition
pub fn row_pos_field() -> &'static Arc<Field> {
&ROW_POS_FIELD
}

/// Lazy-initialized Arrow Field definition for the pos metadata column.
/// Used in positional delete records.
static POS_FIELD: Lazy<Arc<Field>> = Lazy::new(|| {
Expand Down Expand Up @@ -119,6 +146,7 @@ pub fn file_path_field() -> &'static Arc<Field> {
pub fn get_metadata_field(field_id: i32) -> Result<Arc<Field>> {
match field_id {
RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())),
RESERVED_FIELD_ID_UNDERSCORE_POS => Ok(Arc::clone(row_pos_field())),
RESERVED_FIELD_ID_FILE_PATH => Ok(Arc::clone(file_path_field())),
RESERVED_FIELD_ID_POS => Ok(Arc::clone(pos_field())),
_ => Err(Error::new(
Expand All @@ -138,6 +166,7 @@ pub fn get_metadata_field(field_id: i32) -> Result<Arc<Field>> {
pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
match column_name {
RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
RESERVED_COL_NAME_UNDERSCORE_POS => Ok(RESERVED_FIELD_ID_UNDERSCORE_POS),
RESERVED_COL_NAME_FILE_PATH => Ok(RESERVED_FIELD_ID_FILE_PATH),
RESERVED_COL_NAME_POS => Ok(RESERVED_FIELD_ID_POS),
_ => Err(Error::new(
Expand All @@ -157,7 +186,10 @@ pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
pub fn is_metadata_field(field_id: i32) -> bool {
matches!(
field_id,
RESERVED_FIELD_ID_FILE | RESERVED_FIELD_ID_FILE_PATH | RESERVED_FIELD_ID_POS
RESERVED_FIELD_ID_FILE
| RESERVED_FIELD_ID_UNDERSCORE_POS
| RESERVED_FIELD_ID_FILE_PATH
| RESERVED_FIELD_ID_POS
)
}

Expand Down
45 changes: 44 additions & 1 deletion crates/iceberg/src/scan/incremental/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::arrow::{
use crate::delete_file_index::DeleteFileIndex;
use crate::io::FileIO;
use crate::metadata_columns::{
RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name,
RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_UNDERSCORE_POS, get_metadata_field_id,
is_metadata_column_name,
};
use crate::scan::DeleteFileContext;
use crate::scan::cache::ExpressionEvaluatorCache;
Expand Down Expand Up @@ -153,6 +154,48 @@ impl<'a> IncrementalTableScanBuilder<'a> {
self
}

/// Include the _pos metadata column in the incremental scan.
///
/// This is a convenience method that adds the _pos column to the current selection.
/// If no columns are currently selected (select_all), this will select all columns plus _pos.
/// If specific columns are selected, this adds _pos to that selection.
///
/// The _pos column contains the row position within the file, produced by the underlying
/// Parquet reader as a virtual column.
///
/// # Example
/// ```no_run
/// # use iceberg::table::Table;
/// # async fn example(table: Table) -> iceberg::Result<()> {
/// // Select id, name, and _pos for incremental scan
/// let scan = table
/// .incremental_scan(None, None)
/// .select(["id", "name"])
/// .with_pos_column()
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub fn with_pos_column(mut self) -> Self {
let mut columns = self.column_names.unwrap_or_else(|| {
// No explicit selection - get all column names from schema
self.table
.metadata()
.current_schema()
.as_struct()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
});

// Add _pos column
columns.push(RESERVED_COL_NAME_UNDERSCORE_POS.to_string());

self.column_names = Some(columns);
self
}

/// Set the `from_snapshot_id` for the incremental scan.
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
self.from_snapshot_id = Some(from_snapshot_id);
Expand Down
Loading
Loading