From 44bd80ec7c70d4aed1af6db1bcf819b9110a2a2d Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 16:03:08 +0800 Subject: [PATCH 01/23] support data evolution row id --- crates/integration_tests/tests/read_tables.rs | 124 +++++++ crates/paimon/src/arrow/reader.rs | 142 +++++++- crates/paimon/src/lib.rs | 4 +- crates/paimon/src/spec/schema.rs | 12 + crates/paimon/src/table/mod.rs | 2 +- crates/paimon/src/table/read_builder.rs | 20 ++ crates/paimon/src/table/source.rs | 340 ++++++++++++++++++ crates/paimon/src/table/table_scan.rs | 30 +- 8 files changed, 664 insertions(+), 10 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 008b081e..f38b573d 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -2000,3 +2000,127 @@ async fn test_bucket_predicate_filtering_long_string_key() { actual.len() ); } + +// --------------------------------------------------------------------------- +// Data Evolution Row ID Range Filter integration tests +// --------------------------------------------------------------------------- + +async fn scan_and_read_with_row_ranges( + table: &paimon::Table, + row_ranges: Vec, +) -> (Plan, Vec) { + let mut read_builder = table.new_read_builder(); + read_builder.with_row_ranges(row_ranges); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("Failed to plan scan"); + + let read = read_builder.new_read().expect("Failed to create read"); + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec<_> = stream + .try_collect() + .await + .expect("Failed to collect batches"); + + (plan, batches) +} + +fn extract_id_name_value(batches: &[RecordBatch]) -> Vec<(i32, String, i32)> { + let mut rows = Vec::new(); + for batch in batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let value = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("value"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), name.value(i).to_string(), value.value(i))); + } + } + rows.sort_by_key(|(id, _, _)| *id); + rows +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (full_plan, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await; + let full_rows = extract_id_name_value(&full_batches); + let full_row_count: usize = full_batches.iter().map(|b| b.num_rows()).sum(); + assert!(full_row_count > 0); + + let mut min_row_id = i64::MAX; + let mut max_row_id_exclusive = i64::MIN; + for split in full_plan.splits() { + for file in split.data_files() { + if let Some(fid) = file.first_row_id { + min_row_id = min_row_id.min(fid); + max_row_id_exclusive = max_row_id_exclusive.max(fid + file.row_count); + } + } + } + assert!(min_row_id < max_row_id_exclusive); + + let mid = min_row_id + (max_row_id_exclusive - min_row_id) / 2; + let (filtered_plan, filtered_batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(min_row_id, mid)]).await; + + let filtered_row_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum(); + let filtered_rows = extract_id_name_value(&filtered_batches); + + assert!( + filtered_row_count < full_row_count || mid >= max_row_id_exclusive, + "filtered={filtered_row_count}, full={full_row_count}" + ); + for row in &filtered_rows { + assert!( + full_rows.contains(row), + "Filtered row {row:?} not in full result" + ); + } + assert!(filtered_plan.splits().len() <= full_plan.splits().len()); +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_empty_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (plan, batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(999_999, 1_000_000)]).await; + + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(row_count, 0); + assert!(plan.splits().is_empty()); +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_full_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (_, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await; + let full_rows = extract_id_name_value(&full_batches); + + let (_, filtered_batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(0, i64::MAX)]).await; + let filtered_rows = extract_id_name_value(&filtered_batches); + + assert_eq!(filtered_rows, full_rows); +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 91a67c62..471c1b5c 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -22,9 +22,12 @@ use crate::arrow::filtering::{ use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX}; use crate::deletion_vector::{DeletionVector, DeletionVectorFactory}; use crate::io::{FileIO, FileRead, FileStatus}; -use crate::spec::{DataField, DataFileMeta, DataType, Datum, Predicate, PredicateOperator}; +use crate::spec::{ + DataField, DataFileMeta, DataType, Datum, Predicate, PredicateOperator, ROW_ID_FIELD_NAME, +}; use crate::table::schema_manager::SchemaManager; use crate::table::ArrowRecordBatchStream; +use crate::table::RowRange; use crate::{DataSplit, Error}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, @@ -183,6 +186,7 @@ impl ArrowReader { predicates: predicates.clone(), batch_size, dv, + row_ranges: None, }, )?; while let Some(batch) = stream.next().await { @@ -215,8 +219,20 @@ impl ArrowReader { let schema_manager = self.schema_manager; let table_schema_id = self.table_schema_id; + // Check if _ROW_ID is requested; if so, strip it from the file read type + // and compute it after reading. + let row_id_index = read_type.iter().position(|f| f.name() == ROW_ID_FIELD_NAME); + let file_read_type: Vec = read_type + .iter() + .filter(|f| f.name() != ROW_ID_FIELD_NAME) + .cloned() + .collect(); + let output_schema = build_target_arrow_schema(&read_type)?; + Ok(try_stream! { for split in splits { + let row_ranges = split.row_ranges().map(|r| r.to_vec()); + if split.raw_convertible() || split.data_files().len() == 1 { for file_meta in split.data_files().to_vec() { let data_fields: Option> = if file_meta.schema_id != table_schema_id { @@ -226,36 +242,65 @@ impl ArrowReader { None }; + let file_base_row_id = file_meta.first_row_id.unwrap_or(0); + let mut current_row_id = file_base_row_id; + let mut stream = read_single_file_stream( file_io.clone(), SingleFileReadRequest { split: split.clone(), file_meta, - read_type: read_type.clone(), + read_type: file_read_type.clone(), table_fields: table_fields.clone(), data_fields, predicates: Vec::new(), batch_size, dv: None, + row_ranges: row_ranges.clone(), }, )?; while let Some(batch) = stream.next().await { - yield batch?; + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = row_id_index { + let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; + yield batch; + } else { + yield batch; + } + current_row_id += num_rows as i64; } } } else { + let group_base_row_id = split + .data_files() + .iter() + .filter_map(|f| f.first_row_id) + .min() + .unwrap_or(0); + let mut current_row_id = group_base_row_id; + // Multiple files need column-wise merge. let mut merge_stream = merge_files_by_columns( &file_io, &split, - &read_type, + &file_read_type, &table_fields, schema_manager.clone(), table_schema_id, batch_size, + row_ranges.clone(), )?; while let Some(batch) = merge_stream.next().await { - yield batch?; + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = row_id_index { + let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; + yield batch; + } else { + yield batch; + } + current_row_id += num_rows as i64; } } } @@ -273,6 +318,7 @@ struct SingleFileReadRequest { predicates: Vec, batch_size: Option, dv: Option>, + row_ranges: Option>, } /// Read a single parquet file from a split, returning a lazy stream of batches. @@ -298,6 +344,7 @@ fn read_single_file_stream( predicates, batch_size, dv, + row_ranges, } = request; let target_schema = build_target_arrow_schema(&read_type)?; @@ -393,6 +440,17 @@ fn read_single_file_stream( ); } } + if let Some(ref ranges) = row_ranges { + let range_selection = build_row_ranges_selection( + batch_stream_builder.metadata().row_groups(), + ranges, + file_meta.first_row_id.unwrap_or(0), + ); + row_selection = intersect_optional_row_selections( + row_selection, + Some(range_selection), + ); + } if let Some(row_selection) = row_selection { batch_stream_builder = batch_stream_builder.with_row_selection(row_selection); } @@ -494,6 +552,7 @@ fn read_single_file_stream( /// per file. Each poll slices up to `batch_size` rows from each file's current batch, /// assembles columns from the winning files, and yields the merged batch. When a file's /// current batch is exhausted, the next batch is read from its stream on demand. +#[allow(clippy::too_many_arguments)] fn merge_files_by_columns( file_io: &FileIO, split: &DataSplit, @@ -502,6 +561,7 @@ fn merge_files_by_columns( schema_manager: SchemaManager, table_schema_id: i64, batch_size: Option, + row_ranges: Option>, ) -> crate::Result { let data_files = split.data_files(); if data_files.is_empty() { @@ -647,6 +707,7 @@ fn merge_files_by_columns( predicates: Vec::new(), batch_size, dv: None, + row_ranges: row_ranges.clone(), }, )?; file_streams.insert(file_idx, stream); @@ -1355,6 +1416,77 @@ fn exact_parquet_value<'a, T>( } } +/// Append a computed `_ROW_ID` column at the given position. Each row's value is `base_row_id + offset`. +fn append_row_id_column( + batch: RecordBatch, + base_row_id: i64, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let num_rows = batch.num_rows(); + let row_ids: Vec = (0..num_rows as i64).map(|i| base_row_id + i).collect(); + let row_id_array: Arc = Arc::new(Int64Array::from(row_ids)); + + let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); + let batch_cols: Vec> = batch.columns().to_vec(); + + for (i, col) in batch_cols.iter().enumerate() { + if i == insert_index { + columns.push(row_id_array.clone()); + } + columns.push(col.clone()); + } + if insert_index >= batch.num_columns() { + columns.push(row_id_array); + } + + RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to build RecordBatch with _ROW_ID column: {e}"), + source: Some(Box::new(e)), + }) +} + +/// Build a Parquet [RowSelection] from inclusive `[from, to]` row ranges. +fn build_row_ranges_selection( + row_group_metadata_list: &[RowGroupMetaData], + row_ranges: &[RowRange], + file_first_row_id: i64, +) -> RowSelection { + let total_rows: i64 = row_group_metadata_list.iter().map(|rg| rg.num_rows()).sum(); + + let mut local_ranges: Vec<(usize, usize)> = row_ranges + .iter() + .filter_map(|r| { + let local_start = (r.from() - file_first_row_id).max(0) as usize; + let local_end = (r.to() + 1 - file_first_row_id).max(0).min(total_rows) as usize; + if local_start < local_end { + Some((local_start, local_end)) + } else { + None + } + }) + .collect(); + local_ranges.sort_by_key(|&(s, _)| s); + + let mut selectors: Vec = Vec::new(); + let mut cursor: usize = 0; + for (start, end) in &local_ranges { + if *start > cursor { + selectors.push(RowSelector::skip(*start - cursor)); + } + let select_start = (*start).max(cursor); + if *end > select_start { + selectors.push(RowSelector::select(*end - select_start)); + } + cursor = cursor.max(*end); + } + let total = total_rows as usize; + if cursor < total { + selectors.push(RowSelector::skip(total - cursor)); + } + selectors.into() +} + /// Builds a Parquet [RowSelection] from deletion vector. /// Only rows not in the deletion vector are selected; deleted rows are skipped at read time. /// todo: Uses [DeletionVectorIterator] with [advance_to](DeletionVectorIterator::advance_to) when skipping row groups similar to iceberg-rust diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index fe340f3d..8771466e 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -39,6 +39,6 @@ pub use catalog::CatalogFactory; pub use catalog::FileSystemCatalog; pub use table::{ - DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager, - Table, TableRead, TableScan, TagManager, + DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, RowRange, + SnapshotManager, Table, TableRead, TableScan, TagManager, }; diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index c1047ad4..48c5b224 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -110,6 +110,14 @@ impl TableSchema { } } +/// Name of the virtual `_ROW_ID` column. +/// +/// Reference: Java's `SpecialFields.ROW_ID` (field id = `Integer.MAX_VALUE - 5`). +pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; + +/// Field id for the virtual `_ROW_ID` column, matching Java's `Integer.MAX_VALUE - 5`. +pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; + /// Data field for paimon table. /// /// Impl Reference: @@ -160,6 +168,10 @@ impl DataField { self } + pub fn is_row_id_field(&self) -> bool { + self.name == ROW_ID_FIELD_NAME + } + pub fn with_description(mut self, new_description: Option) -> Self { self.description = new_description; self diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 481c757b..f128bd2d 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -33,7 +33,7 @@ use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use schema_manager::SchemaManager; pub use snapshot_manager::SnapshotManager; -pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; +pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange}; pub use table_scan::TableScan; pub use tag_manager::TagManager; diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 4a405ea1..160aa429 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -25,6 +25,7 @@ use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::filtering::reader_pruning_predicates; use crate::arrow::ArrowReaderBuilder; use crate::spec::{CoreOptions, DataField, Predicate}; +use crate::table::source::RowRange; use crate::Result; use crate::{DataSplit, Error}; use std::collections::{HashMap, HashSet}; @@ -105,6 +106,7 @@ pub struct ReadBuilder<'a> { projected_fields: Option>, filter: NormalizedFilter, limit: Option, + row_ranges: Option>, } impl<'a> ReadBuilder<'a> { @@ -114,6 +116,7 @@ impl<'a> ReadBuilder<'a> { projected_fields: None, filter: NormalizedFilter::default(), limit: None, + row_ranges: None, } } @@ -145,6 +148,12 @@ impl<'a> ReadBuilder<'a> { self } + /// Set row ID ranges `[from, to]` (inclusive) for filtering in data evolution mode. + pub fn with_row_ranges(&mut self, ranges: Vec) -> &mut Self { + self.row_ranges = Some(ranges); + self + } + /// Push a row-limit hint down to scan planning. /// /// This allows the scan to generate fewer splits when possible. The hint is @@ -166,6 +175,7 @@ impl<'a> ReadBuilder<'a> { self.filter.data_predicates.clone(), self.filter.bucket_predicate.clone(), self.limit, + self.row_ranges.clone(), ) } @@ -207,6 +217,16 @@ impl<'a> ReadBuilder<'a> { }); } + if name == crate::spec::ROW_ID_FIELD_NAME { + // Virtual _ROW_ID column: computed at read time from first_row_id + offset. + resolved.push(DataField::new( + crate::spec::ROW_ID_FIELD_ID, + crate::spec::ROW_ID_FIELD_NAME.to_string(), + crate::spec::DataType::BigInt(Default::default()), + )); + continue; + } + let field = field_map .get(name.as_str()) .ok_or_else(|| Error::ColumnNotExist { diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index a137a9ca..50894166 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -22,6 +22,333 @@ use crate::spec::{BinaryRow, DataFileMeta}; use crate::table::stats_filter::group_by_overlapping_row_id; use serde::{Deserialize, Serialize}; +// ======================= RowRange =============================== + +/// An inclusive row ID range `[from, to]` for filtering reads in data evolution mode. +/// +/// Both `from` and `to` are inclusive, aligned with Java's `Range` class. +/// +/// Reference: Java's `org.apache.paimon.utils.Range` and pypaimon's `IndexedSplit` with `row_ranges`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RowRange { + from: i64, + to: i64, +} + +impl RowRange { + pub fn new(from: i64, to: i64) -> Self { + debug_assert!(from <= to, "RowRange from ({from}) must be <= to ({to})"); + Self { from, to } + } + + pub fn from(&self) -> i64 { + self.from + } + + pub fn to(&self) -> i64 { + self.to + } + + pub fn count(&self) -> i64 { + self.to - self.from + 1 + } + + pub fn is_empty(&self) -> bool { + self.from > self.to + } + + /// Check overlap with an inclusive file range `[file_start, file_end]`. + pub fn overlaps_inclusive(&self, file_start: i64, file_end_inclusive: i64) -> bool { + self.from <= file_end_inclusive && self.to >= file_start + } + + /// Intersect with an inclusive file range `[file_start, file_end]`. + pub fn intersect_inclusive( + &self, + file_start: i64, + file_end_inclusive: i64, + ) -> Option { + let from = self.from.max(file_start); + let to = self.to.min(file_end_inclusive); + if from <= to { + Some(RowRange::new(from, to)) + } else { + None + } + } +} + +/// Returns `true` (fail-open) if the file has no `first_row_id`. +pub fn any_range_overlaps_file(ranges: &[RowRange], file: &DataFileMeta) -> bool { + match file.row_id_range() { + None => true, + Some((file_start, file_end)) => ranges + .iter() + .any(|r| r.overlaps_inclusive(file_start, file_end)), + } +} + +pub fn intersect_ranges_with_file(ranges: &[RowRange], file: &DataFileMeta) -> Vec { + match file.row_id_range() { + None => Vec::new(), + Some((file_start, file_end)) => ranges + .iter() + .filter_map(|r| r.intersect_inclusive(file_start, file_end)) + .collect(), + } +} + +pub fn merge_row_ranges(mut ranges: Vec) -> Vec { + if ranges.len() <= 1 { + return ranges; + } + ranges.sort_by_key(|r| r.from); + let mut merged: Vec = Vec::with_capacity(ranges.len()); + let mut current = ranges[0].clone(); + for r in &ranges[1..] { + // Adjacent or overlapping: [0,5] and [6,10] should merge to [0,10] + if r.from <= current.to + 1 { + current.to = current.to.max(r.to); + } else { + merged.push(current); + current = r.clone(); + } + } + merged.push(current); + merged +} + +#[cfg(test)] +mod row_range_tests { + use super::*; + + fn file_meta_with_row_id(first_row_id: Option, row_count: i64) -> DataFileMeta { + DataFileMeta { + file_name: "test.parquet".into(), + file_size: 128, + row_count, + min_key: Vec::new(), + max_key: Vec::new(), + key_stats: crate::spec::stats::BinaryTableStats::new( + Vec::new(), + Vec::new(), + Vec::new(), + ), + value_stats: crate::spec::stats::BinaryTableStats::new( + Vec::new(), + Vec::new(), + Vec::new(), + ), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: Vec::new(), + creation_time: Some(chrono::Utc::now()), + delete_row_count: None, + embedded_index: None, + first_row_id, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + #[test] + fn test_row_range_overlaps_inclusive_touching() { + // [5, 10] overlaps [10, 15] because row 10 is in both + let r = RowRange::new(5, 10); + assert!(r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_adjacent_no_overlap() { + // [5, 9] does NOT overlap [10, 15] + let r = RowRange::new(5, 9); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_disjoint_before() { + let r = RowRange::new(5, 8); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_disjoint_after() { + let r = RowRange::new(20, 30); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_subset() { + assert!(RowRange::new(12, 14).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_superset() { + assert!(RowRange::new(5, 20).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_partial_left() { + assert!(RowRange::new(8, 12).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_partial_right() { + assert!(RowRange::new(14, 20).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_intersect_inclusive_no_overlap() { + assert_eq!(RowRange::new(0, 5).intersect_inclusive(10, 15), None); + } + + #[test] + fn test_row_range_intersect_inclusive_partial() { + assert_eq!( + RowRange::new(8, 12).intersect_inclusive(10, 15), + Some(RowRange::new(10, 12)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_subset() { + assert_eq!( + RowRange::new(11, 14).intersect_inclusive(10, 15), + Some(RowRange::new(11, 14)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_superset() { + assert_eq!( + RowRange::new(5, 20).intersect_inclusive(10, 15), + Some(RowRange::new(10, 15)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_touching_end() { + assert_eq!( + RowRange::new(5, 10).intersect_inclusive(10, 15), + Some(RowRange::new(10, 10)) + ); + } + + #[test] + fn test_merge_row_ranges_non_overlapping() { + let merged = merge_row_ranges(vec![RowRange::new(0, 4), RowRange::new(10, 15)]); + assert_eq!(merged, vec![RowRange::new(0, 4), RowRange::new(10, 15)]); + } + + #[test] + fn test_merge_row_ranges_overlapping() { + let merged = merge_row_ranges(vec![RowRange::new(0, 10), RowRange::new(5, 15)]); + assert_eq!(merged, vec![RowRange::new(0, 15)]); + } + + #[test] + fn test_merge_row_ranges_adjacent() { + // [0,5] and [6,10] are adjacent and should merge to [0,10] + let merged = merge_row_ranges(vec![RowRange::new(0, 5), RowRange::new(6, 10)]); + assert_eq!(merged, vec![RowRange::new(0, 10)]); + } + + #[test] + fn test_merge_row_ranges_unsorted() { + let merged = merge_row_ranges(vec![ + RowRange::new(10, 20), + RowRange::new(0, 5), + RowRange::new(3, 12), + ]); + assert_eq!(merged, vec![RowRange::new(0, 20)]); + } + + #[test] + fn test_merge_row_ranges_single() { + assert_eq!( + merge_row_ranges(vec![RowRange::new(5, 10)]), + vec![RowRange::new(5, 10)] + ); + } + + #[test] + fn test_merge_row_ranges_empty() { + assert!(merge_row_ranges(Vec::new()).is_empty()); + } + + #[test] + fn test_any_range_overlaps_file_with_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!(any_range_overlaps_file( + &[RowRange::new(0, 5), RowRange::new(12, 20)], + &file + )); + } + + #[test] + fn test_any_range_overlaps_file_no_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!(!any_range_overlaps_file( + &[RowRange::new(0, 5), RowRange::new(20, 30)], + &file + )); + } + + #[test] + fn test_any_range_overlaps_file_no_first_row_id() { + let file = file_meta_with_row_id(None, 5); + assert!(any_range_overlaps_file(&[RowRange::new(0, 5)], &file)); + } + + #[test] + fn test_intersect_ranges_with_file_partial_overlap() { + // file row_id_range = [10, 19] + let file = file_meta_with_row_id(Some(10), 10); + let result = + intersect_ranges_with_file(&[RowRange::new(5, 14), RowRange::new(18, 25)], &file); + assert_eq!(result, vec![RowRange::new(10, 14), RowRange::new(18, 19)]); + } + + #[test] + fn test_intersect_ranges_with_file_no_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!( + intersect_ranges_with_file(&[RowRange::new(0, 5), RowRange::new(20, 30)], &file) + .is_empty() + ); + } + + #[test] + fn test_intersect_ranges_with_file_full_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert_eq!( + intersect_ranges_with_file(&[RowRange::new(0, 100)], &file), + vec![RowRange::new(10, 14)] + ); + } + + #[test] + fn test_intersect_ranges_with_file_no_first_row_id() { + let file = file_meta_with_row_id(None, 5); + assert!(intersect_ranges_with_file(&[RowRange::new(0, 100)], &file).is_empty()); + } + + #[test] + fn test_row_range_count_and_empty() { + let r = RowRange::new(5, 10); + assert_eq!(r.count(), 6); // rows 5,6,7,8,9,10 + assert!(!r.is_empty()); + } +} + // ======================= DeletionFile =============================== /// Deletion file for a data file: describes a region in a file that stores deletion vector bitmap. @@ -110,6 +437,7 @@ pub struct DataSplit { /// `false` when files need column-wise merge (e.g. data evolution) or /// key-value merge (e.g. primary key tables without deletion vectors). raw_convertible: bool, + row_ranges: Option>, } impl DataSplit { @@ -143,6 +471,10 @@ impl DataSplit { self.raw_convertible } + pub fn row_ranges(&self) -> Option<&[RowRange]> { + self.row_ranges.as_deref() + } + /// Returns the deletion file for the data file at the given index, if any. `None` at that index means no deletion file. pub fn deletion_file_for_data_file_index(&self, index: usize) -> Option<&DeletionFile> { self.data_deletion_files @@ -274,6 +606,7 @@ pub struct DataSplitBuilder { /// Same length as data_files; `None` at index i = no deletion file for data_files[i]. data_deletion_files: Option>>, raw_convertible: bool, + row_ranges: Option>, } impl DataSplitBuilder { @@ -287,6 +620,7 @@ impl DataSplitBuilder { data_files: None, data_deletion_files: None, raw_convertible: false, + row_ranges: None, } } @@ -329,6 +663,11 @@ impl DataSplitBuilder { self } + pub fn with_row_ranges(mut self, row_ranges: Vec) -> Self { + self.row_ranges = Some(row_ranges); + self + } + pub fn build(self) -> crate::Result { if self.snapshot_id == -1 { return Err(crate::Error::UnexpectedError { @@ -381,6 +720,7 @@ impl DataSplitBuilder { data_files, data_deletion_files: self.data_deletion_files, raw_convertible: self.raw_convertible, + row_ranges: self.row_ranges, }) } } diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index da2dc240..8ad8384e 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -34,7 +34,10 @@ use crate::spec::{ ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, }; use crate::table::bin_pack::split_for_batch; -use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; +use crate::table::source::{ + any_range_overlaps_file, intersect_ranges_with_file, merge_row_ranges, DataSplit, + DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, +}; use crate::table::SnapshotManager; use crate::table::TagManager; use crate::Error; @@ -304,6 +307,7 @@ pub struct TableScan<'a> { /// Optional limit on the number of rows to return. /// When set, the scan will try to return only enough splits to satisfy the limit. limit: Option, + row_ranges: Option>, } impl<'a> TableScan<'a> { @@ -313,6 +317,7 @@ impl<'a> TableScan<'a> { data_predicates: Vec, bucket_predicate: Option, limit: Option, + row_ranges: Option>, ) -> Self { Self { table, @@ -320,6 +325,7 @@ impl<'a> TableScan<'a> { data_predicates, bucket_predicate, limit, + row_ranges, } } @@ -621,6 +627,16 @@ impl<'a> TableScan<'a> { .collect() }; + // Filter groups by row ID ranges. + let row_id_groups = if let Some(ref ranges) = self.row_ranges { + row_id_groups + .into_iter() + .filter(|group| group.iter().any(|f| any_range_overlaps_file(ranges, f))) + .collect() + } else { + row_id_groups + }; + let (singles, multis): (Vec<_>, Vec<_>) = row_id_groups .into_iter() .partition(|group| group.len() == 1); @@ -657,11 +673,21 @@ impl<'a> TableScan<'a> { .with_bucket(bucket) .with_bucket_path(bucket_path.clone()) .with_total_buckets(total_buckets) - .with_data_files(file_group) + .with_data_files(file_group.clone()) .with_raw_convertible(raw_convertible); if let Some(files) = data_deletion_files { builder = builder.with_data_deletion_files(files); } + if let Some(ref ranges) = self.row_ranges { + let mut split_ranges = Vec::new(); + for file in &file_group { + split_ranges.extend(intersect_ranges_with_file(ranges, file)); + } + let split_ranges = merge_row_ranges(split_ranges); + if !split_ranges.is_empty() { + builder = builder.with_row_ranges(split_ranges); + } + } splits.push(builder.build()?); } } From d4f8cad6fd9b01bc45548a2ed2f0ac59465ebd1d Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 16:33:06 +0800 Subject: [PATCH 02/23] clean code --- crates/integration_tests/tests/read_tables.rs | 52 +++++++++++++++++++ crates/paimon/src/arrow/reader.rs | 4 +- crates/paimon/src/spec/schema.rs | 4 -- crates/paimon/src/table/read_builder.rs | 2 +- 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index f38b573d..b910ca26 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -2124,3 +2124,55 @@ async fn test_read_data_evolution_table_with_full_row_ranges() { assert_eq!(filtered_rows, full_rows); } + +#[tokio::test] +async fn test_read_data_evolution_table_with_row_id_projection() { + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + // Project _ROW_ID along with regular columns + let mut read_builder = table.new_read_builder(); + read_builder.with_projection(&["_ROW_ID", "id", "name"]); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("Failed to plan scan"); + + let read = read_builder.new_read().expect("Failed to create read"); + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec = stream + .try_collect() + .await + .expect("Failed to collect batches"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0, "Should have rows"); + + // Verify _ROW_ID column exists and contains non-negative values + let mut row_ids: Vec = Vec::new(); + for batch in &batches { + let row_id_col = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID column should exist"); + let row_id_array = row_id_col + .as_any() + .downcast_ref::() + .expect("_ROW_ID should be Int64"); + for i in 0..batch.num_rows() { + row_ids.push(row_id_array.value(i)); + } + } + + assert_eq!(row_ids.len(), total_rows); + assert!( + row_ids.iter().all(|&id| id >= 0), + "All _ROW_ID values should be non-negative" + ); + // _ROW_ID values should be unique + let unique: std::collections::HashSet = row_ids.iter().copied().collect(); + assert_eq!( + unique.len(), + row_ids.len(), + "_ROW_ID values should be unique" + ); +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 471c1b5c..9f88c5bb 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -1458,7 +1458,9 @@ fn build_row_ranges_selection( .iter() .filter_map(|r| { let local_start = (r.from() - file_first_row_id).max(0) as usize; - let local_end = (r.to() + 1 - file_first_row_id).max(0).min(total_rows) as usize; + // Clamp to() to file boundary before +1 to avoid i64 overflow when to() == i64::MAX + let clamped_to = r.to().min(file_first_row_id + total_rows - 1); + let local_end = (clamped_to + 1 - file_first_row_id) as usize; if local_start < local_end { Some((local_start, local_end)) } else { diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 48c5b224..bd1244c0 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -110,12 +110,8 @@ impl TableSchema { } } -/// Name of the virtual `_ROW_ID` column. -/// -/// Reference: Java's `SpecialFields.ROW_ID` (field id = `Integer.MAX_VALUE - 5`). pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; -/// Field id for the virtual `_ROW_ID` column, matching Java's `Integer.MAX_VALUE - 5`. pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; /// Data field for paimon table. diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 160aa429..af43fc3e 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -222,7 +222,7 @@ impl<'a> ReadBuilder<'a> { resolved.push(DataField::new( crate::spec::ROW_ID_FIELD_ID, crate::spec::ROW_ID_FIELD_NAME.to_string(), - crate::spec::DataType::BigInt(Default::default()), + crate::spec::DataType::BigInt(crate::spec::BigIntType::with_nullable(false)), )); continue; } From 32f3f0389b66401c2516c7bb9ef63b113bee9aed Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 17:45:04 +0800 Subject: [PATCH 03/23] fix: skip parquet-level row_ranges filtering when _ROW_ID is projected --- crates/paimon/src/arrow/reader.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 9f88c5bb..06d3207a 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -232,6 +232,10 @@ impl ArrowReader { Ok(try_stream! { for split in splits { let row_ranges = split.row_ranges().map(|r| r.to_vec()); + // When _ROW_ID is requested, skip Parquet-level row_ranges filtering + // because RowSelection changes which rows appear in the output, + // making positional _ROW_ID tracking incorrect. + let file_row_ranges = if row_id_index.is_some() { None } else { row_ranges.clone() }; if split.raw_convertible() || split.data_files().len() == 1 { for file_meta in split.data_files().to_vec() { @@ -256,7 +260,7 @@ impl ArrowReader { predicates: Vec::new(), batch_size, dv: None, - row_ranges: row_ranges.clone(), + row_ranges: file_row_ranges.clone(), }, )?; while let Some(batch) = stream.next().await { @@ -289,7 +293,7 @@ impl ArrowReader { schema_manager.clone(), table_schema_id, batch_size, - row_ranges.clone(), + file_row_ranges.clone(), )?; while let Some(batch) = merge_stream.next().await { let batch = batch?; From d5e99b510c444ad5b09471a95db379cfdf207a45 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 17:49:47 +0800 Subject: [PATCH 04/23] fix: disable limit pushdown when row_ranges is set --- crates/paimon/src/table/table_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 8ad8384e..c189ddaf 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -695,7 +695,7 @@ impl<'a> TableScan<'a> { // Apply limit pushdown only when there are no data predicates. // With data predicates, merged_row_count() reflects pre-filter row counts, // so stopping early could return fewer rows than the limit after filtering. - let splits = if self.data_predicates.is_empty() { + let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { self.apply_limit_pushdown(splits) } else { splits From 6d79bfeec49cd705537a24dd36045968cc17c5fe Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 18:01:54 +0800 Subject: [PATCH 05/23] fix: apply row_ranges post-read filter when _ROW_ID is projected --- Cargo.toml | 1 + crates/paimon/Cargo.toml | 1 + crates/paimon/src/arrow/reader.rs | 53 +++++++++++++++++++++++++++++-- 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cfba61f4..af0926c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ arrow-array = { version = "57.0", features = ["ffi"] } arrow-schema = "57.0" arrow-cast = "57.0" arrow-ord = "57.0" +arrow-select = "57.0" datafusion = "52.3.0" datafusion-ffi = "52.3.0" parquet = "57.0" diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index d5d75aeb..383a251d 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -59,6 +59,7 @@ arrow-array = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } futures = "0.3" parquet = { workspace = true, features = ["async", "zstd"] } async-stream = "0.3.6" diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 06d3207a..ecf3f5f4 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -268,7 +268,12 @@ impl ArrowReader { let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - yield batch; + // Post-read row_ranges filter: _ROW_ID disables Parquet-level + // filtering, so filter here using the computed _ROW_ID values. + let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; + if batch.num_rows() > 0 { + yield batch; + } } else { yield batch; } @@ -300,7 +305,10 @@ impl ArrowReader { let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - yield batch; + let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; + if batch.num_rows() > 0 { + yield batch; + } } else { yield batch; } @@ -1420,6 +1428,47 @@ fn exact_parquet_value<'a, T>( } } +/// Filter a batch by row_ranges using the _ROW_ID column values. +/// Keeps only rows whose _ROW_ID falls within any of the given ranges. +fn filter_batch_by_row_id_ranges( + batch: &RecordBatch, + row_id_col_index: usize, + row_ranges: &Option>, +) -> crate::Result { + let ranges = match row_ranges { + Some(r) => r, + None => return Ok(batch.clone()), + }; + + let row_id_array = batch + .column(row_id_col_index) + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::UnexpectedError { + message: "_ROW_ID column is not Int64".to_string(), + source: None, + })?; + + let mut mask = vec![false; batch.num_rows()]; + for (i, &row_id) in row_id_array.values().iter().enumerate() { + if ranges + .iter() + .any(|r| row_id >= r.from() && row_id <= r.to()) + { + mask[i] = true; + } + } + + let filter = BooleanArray::from(mask); + let filtered = arrow_select::filter::filter_record_batch(batch, &filter).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to filter batch by _ROW_ID ranges: {e}"), + source: Some(Box::new(e)), + } + })?; + Ok(filtered) +} + /// Append a computed `_ROW_ID` column at the given position. Each row's value is `base_row_id + offset`. fn append_row_id_column( batch: RecordBatch, From 2cf0c78bb839bc73e7a208f1895d0ee4835155ea Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 18:09:03 +0800 Subject: [PATCH 06/23] fix: skip row_ranges filtering for files without first_row_id (fail-open) --- crates/paimon/src/arrow/reader.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index ecf3f5f4..33d0b86e 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -246,8 +246,11 @@ impl ArrowReader { None }; + // Skip row_ranges for files without first_row_id (fail-open) + let has_row_id = file_meta.first_row_id.is_some(); let file_base_row_id = file_meta.first_row_id.unwrap_or(0); let mut current_row_id = file_base_row_id; + let effective_row_ranges = if has_row_id { file_row_ranges.clone() } else { None }; let mut stream = read_single_file_stream( file_io.clone(), @@ -260,7 +263,7 @@ impl ArrowReader { predicates: Vec::new(), batch_size, dv: None, - row_ranges: file_row_ranges.clone(), + row_ranges: effective_row_ranges, }, )?; while let Some(batch) = stream.next().await { @@ -268,10 +271,12 @@ impl ArrowReader { let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - // Post-read row_ranges filter: _ROW_ID disables Parquet-level - // filtering, so filter here using the computed _ROW_ID values. - let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; - if batch.num_rows() > 0 { + if has_row_id { + let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; + if batch.num_rows() > 0 { + yield batch; + } + } else { yield batch; } } else { From 3f0b3d929bfb6f683271b410a944e04b83b85432 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 18:17:06 +0800 Subject: [PATCH 07/23] fix: return null _ROW_ID for files without first_row_id instead of faking from 0 --- crates/paimon/src/arrow/reader.rs | 51 +++++++++++++++++++++---- crates/paimon/src/table/read_builder.rs | 2 +- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 33d0b86e..f9c7ce1f 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -270,13 +270,15 @@ impl ArrowReader { let batch = batch?; let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; if has_row_id { + let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; if batch.num_rows() > 0 { yield batch; } } else { + // No first_row_id: fill _ROW_ID with nulls + let batch = append_null_row_id_column(batch, idx, &output_schema)?; yield batch; } } else { @@ -290,9 +292,9 @@ impl ArrowReader { .data_files() .iter() .filter_map(|f| f.first_row_id) - .min() - .unwrap_or(0); - let mut current_row_id = group_base_row_id; + .min(); + let has_group_row_id = group_base_row_id.is_some(); + let mut current_row_id = group_base_row_id.unwrap_or(0); // Multiple files need column-wise merge. let mut merge_stream = merge_files_by_columns( @@ -303,15 +305,20 @@ impl ArrowReader { schema_manager.clone(), table_schema_id, batch_size, - file_row_ranges.clone(), + if has_group_row_id { file_row_ranges.clone() } else { None }, )?; while let Some(batch) = merge_stream.next().await { let batch = batch?; let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; - if batch.num_rows() > 0 { + if has_group_row_id { + let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; + let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; + if batch.num_rows() > 0 { + yield batch; + } + } else { + let batch = append_null_row_id_column(batch, idx, &output_schema)?; yield batch; } } else { @@ -1474,6 +1481,34 @@ fn filter_batch_by_row_id_ranges( Ok(filtered) } +/// Append a null `_ROW_ID` column for files without `first_row_id`. +fn append_null_row_id_column( + batch: RecordBatch, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let num_rows = batch.num_rows(); + let null_array: Arc = Arc::new(Int64Array::new_null(num_rows)); + + let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); + let batch_cols: Vec> = batch.columns().to_vec(); + + for (i, col) in batch_cols.iter().enumerate() { + if i == insert_index { + columns.push(null_array.clone()); + } + columns.push(col.clone()); + } + if insert_index >= batch.num_columns() { + columns.push(null_array); + } + + RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to build RecordBatch with null _ROW_ID column: {e}"), + source: Some(Box::new(e)), + }) +} + /// Append a computed `_ROW_ID` column at the given position. Each row's value is `base_row_id + offset`. fn append_row_id_column( batch: RecordBatch, diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index af43fc3e..1d067850 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -222,7 +222,7 @@ impl<'a> ReadBuilder<'a> { resolved.push(DataField::new( crate::spec::ROW_ID_FIELD_ID, crate::spec::ROW_ID_FIELD_NAME.to_string(), - crate::spec::DataType::BigInt(crate::spec::BigIntType::with_nullable(false)), + crate::spec::DataType::BigInt(crate::spec::BigIntType::with_nullable(true)), )); continue; } From 60022ef1a5adb115dfffa1502f24df3a26262a8e Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 18:19:29 +0800 Subject: [PATCH 08/23] clean code --- crates/paimon/src/arrow/reader.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index f9c7ce1f..cec166da 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -232,9 +232,6 @@ impl ArrowReader { Ok(try_stream! { for split in splits { let row_ranges = split.row_ranges().map(|r| r.to_vec()); - // When _ROW_ID is requested, skip Parquet-level row_ranges filtering - // because RowSelection changes which rows appear in the output, - // making positional _ROW_ID tracking incorrect. let file_row_ranges = if row_id_index.is_some() { None } else { row_ranges.clone() }; if split.raw_convertible() || split.data_files().len() == 1 { From c23c392240aba7be8b6ec9d014542f443aa9d469 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 19:28:00 +0800 Subject: [PATCH 09/23] fix: use saturating_add to avoid i64 overflow in merge_row_ranges --- crates/paimon/src/table/source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index 50894166..a26149d5 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -107,7 +107,7 @@ pub fn merge_row_ranges(mut ranges: Vec) -> Vec { let mut current = ranges[0].clone(); for r in &ranges[1..] { // Adjacent or overlapping: [0,5] and [6,10] should merge to [0,10] - if r.from <= current.to + 1 { + if r.from <= current.to.saturating_add(1) { current.to = current.to.max(r.to); } else { merged.push(current); From af4034cbccd87fb0e094244129a4a96b527b3f17 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 19:49:31 +0800 Subject: [PATCH 10/23] refactor: extract insert_column_at to deduplicate append_row_id_column and append_null_row_id_column --- crates/paimon/src/arrow/reader.rs | 55 +++++++++++++------------------ 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index cec166da..fab6d572 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -1478,62 +1478,51 @@ fn filter_batch_by_row_id_ranges( Ok(filtered) } -/// Append a null `_ROW_ID` column for files without `first_row_id`. -fn append_null_row_id_column( +/// Insert a column into a RecordBatch at the given position. +fn insert_column_at( batch: RecordBatch, + column: Arc, insert_index: usize, output_schema: &Arc, ) -> crate::Result { - let num_rows = batch.num_rows(); - let null_array: Arc = Arc::new(Int64Array::new_null(num_rows)); - let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); - let batch_cols: Vec> = batch.columns().to_vec(); - - for (i, col) in batch_cols.iter().enumerate() { + for (i, col) in batch.columns().iter().enumerate() { if i == insert_index { - columns.push(null_array.clone()); + columns.push(column.clone()); } columns.push(col.clone()); } if insert_index >= batch.num_columns() { - columns.push(null_array); + columns.push(column); } - RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { - message: format!("Failed to build RecordBatch with null _ROW_ID column: {e}"), + message: format!("Failed to insert column into RecordBatch: {e}"), source: Some(Box::new(e)), }) } -/// Append a computed `_ROW_ID` column at the given position. Each row's value is `base_row_id + offset`. +/// Append a computed `_ROW_ID` column. Each row's value is `base_row_id + offset`. fn append_row_id_column( batch: RecordBatch, base_row_id: i64, insert_index: usize, output_schema: &Arc, ) -> crate::Result { - let num_rows = batch.num_rows(); - let row_ids: Vec = (0..num_rows as i64).map(|i| base_row_id + i).collect(); - let row_id_array: Arc = Arc::new(Int64Array::from(row_ids)); - - let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); - let batch_cols: Vec> = batch.columns().to_vec(); - - for (i, col) in batch_cols.iter().enumerate() { - if i == insert_index { - columns.push(row_id_array.clone()); - } - columns.push(col.clone()); - } - if insert_index >= batch.num_columns() { - columns.push(row_id_array); - } + let row_ids: Vec = (0..batch.num_rows() as i64) + .map(|i| base_row_id + i) + .collect(); + let array: Arc = Arc::new(Int64Array::from(row_ids)); + insert_column_at(batch, array, insert_index, output_schema) +} - RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { - message: format!("Failed to build RecordBatch with _ROW_ID column: {e}"), - source: Some(Box::new(e)), - }) +/// Append a null `_ROW_ID` column for files without `first_row_id`. +fn append_null_row_id_column( + batch: RecordBatch, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let array: Arc = Arc::new(Int64Array::new_null(batch.num_rows())); + insert_column_at(batch, array, insert_index, output_schema) } /// Build a Parquet [RowSelection] from inclusive `[from, to]` row ranges. From 62ef85396cea7b4c5926531ee7bfeb7a067f426d Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:08:49 +0800 Subject: [PATCH 11/23] refactor: align _ROW_ID with Java - use pre-computed row IDs with RowSelection instead of skipping IO filtering - Replace post-read filter approach with pre-computed selected row ID sequence - RowSelection is always applied at Parquet level for IO optimization - Row IDs are assigned from the pre-computed sequence matching RowSelection output - Extract insert_column_at to deduplicate column insertion logic - Empty row_ranges treated as None (no filtering) - Use saturating_add to prevent overflow in merge_row_ranges and build_row_ranges_selection - Compute row_ranges before moving file_group to avoid clone - Remove arrow-select dependency (no longer needed) --- Cargo.toml | 1 - crates/paimon/Cargo.toml | 1 - crates/paimon/src/arrow/reader.rs | 139 +++++++++++------------- crates/paimon/src/table/read_builder.rs | 6 +- crates/paimon/src/table/source.rs | 2 +- crates/paimon/src/table/table_scan.rs | 29 +++-- 6 files changed, 89 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index af0926c5..cfba61f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ arrow-array = { version = "57.0", features = ["ffi"] } arrow-schema = "57.0" arrow-cast = "57.0" arrow-ord = "57.0" -arrow-select = "57.0" datafusion = "52.3.0" datafusion-ffi = "52.3.0" parquet = "57.0" diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 383a251d..d5d75aeb 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -59,7 +59,6 @@ arrow-array = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } -arrow-select = { workspace = true } futures = "0.3" parquet = { workspace = true, features = ["async", "zstd"] } async-stream = "0.3.6" diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index fab6d572..b95f999a 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -232,7 +232,6 @@ impl ArrowReader { Ok(try_stream! { for split in splits { let row_ranges = split.row_ranges().map(|r| r.to_vec()); - let file_row_ranges = if row_id_index.is_some() { None } else { row_ranges.clone() }; if split.raw_convertible() || split.data_files().len() == 1 { for file_meta in split.data_files().to_vec() { @@ -245,9 +244,21 @@ impl ArrowReader { // Skip row_ranges for files without first_row_id (fail-open) let has_row_id = file_meta.first_row_id.is_some(); - let file_base_row_id = file_meta.first_row_id.unwrap_or(0); - let mut current_row_id = file_base_row_id; - let effective_row_ranges = if has_row_id { file_row_ranges.clone() } else { None }; + let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; + + // Pre-compute selected row IDs for _ROW_ID column. + // RowSelection is always applied at Parquet level for IO optimization. + // The row ID sequence matches the rows that survive RowSelection. + let selected_row_ids = if row_id_index.is_some() && has_row_id { + Some(expand_selected_row_ids( + file_meta.first_row_id.unwrap(), + file_meta.row_count, + &effective_row_ranges, + )) + } else { + None + }; + let mut row_id_offset: usize = 0; let mut stream = read_single_file_stream( file_io.clone(), @@ -267,21 +278,18 @@ impl ArrowReader { let batch = batch?; let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - if has_row_id { - let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; - if batch.num_rows() > 0 { - yield batch; - } + if let Some(ref ids) = selected_row_ids { + let batch_ids = &ids[row_id_offset..row_id_offset + num_rows]; + row_id_offset += num_rows; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + yield insert_column_at(batch, array, idx, &output_schema)?; } else { // No first_row_id: fill _ROW_ID with nulls - let batch = append_null_row_id_column(batch, idx, &output_schema)?; - yield batch; + yield append_null_row_id_column(batch, idx, &output_schema)?; } } else { yield batch; } - current_row_id += num_rows as i64; } } } else { @@ -291,7 +299,19 @@ impl ArrowReader { .filter_map(|f| f.first_row_id) .min(); let has_group_row_id = group_base_row_id.is_some(); - let mut current_row_id = group_base_row_id.unwrap_or(0); + let group_row_count = split.data_files().iter().map(|f| f.row_count).max().unwrap_or(0); + let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; + + let selected_row_ids = if row_id_index.is_some() && has_group_row_id { + Some(expand_selected_row_ids( + group_base_row_id.unwrap(), + group_row_count, + &effective_row_ranges, + )) + } else { + None + }; + let mut row_id_offset: usize = 0; // Multiple files need column-wise merge. let mut merge_stream = merge_files_by_columns( @@ -302,26 +322,23 @@ impl ArrowReader { schema_manager.clone(), table_schema_id, batch_size, - if has_group_row_id { file_row_ranges.clone() } else { None }, + effective_row_ranges, )?; while let Some(batch) = merge_stream.next().await { let batch = batch?; let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - if has_group_row_id { - let batch = append_row_id_column(batch, current_row_id, idx, &output_schema)?; - let batch = filter_batch_by_row_id_ranges(&batch, idx, &row_ranges)?; - if batch.num_rows() > 0 { - yield batch; - } + if let Some(ref ids) = selected_row_ids { + let batch_ids = &ids[row_id_offset..row_id_offset + num_rows]; + row_id_offset += num_rows; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + yield insert_column_at(batch, array, idx, &output_schema)?; } else { - let batch = append_null_row_id_column(batch, idx, &output_schema)?; - yield batch; + yield append_null_row_id_column(batch, idx, &output_schema)?; } } else { yield batch; } - current_row_id += num_rows as i64; } } } @@ -1437,45 +1454,28 @@ fn exact_parquet_value<'a, T>( } } -/// Filter a batch by row_ranges using the _ROW_ID column values. -/// Keeps only rows whose _ROW_ID falls within any of the given ranges. -fn filter_batch_by_row_id_ranges( - batch: &RecordBatch, - row_id_col_index: usize, +/// Expand row_ranges into a flat sequence of selected row IDs for a file. +/// When row_ranges is None, returns all row IDs [first_row_id, first_row_id + row_count). +fn expand_selected_row_ids( + first_row_id: i64, + row_count: i64, row_ranges: &Option>, -) -> crate::Result { - let ranges = match row_ranges { - Some(r) => r, - None => return Ok(batch.clone()), - }; - - let row_id_array = batch - .column(row_id_col_index) - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::UnexpectedError { - message: "_ROW_ID column is not Int64".to_string(), - source: None, - })?; - - let mut mask = vec![false; batch.num_rows()]; - for (i, &row_id) in row_id_array.values().iter().enumerate() { - if ranges - .iter() - .any(|r| row_id >= r.from() && row_id <= r.to()) - { - mask[i] = true; +) -> Vec { + let file_end = first_row_id + row_count - 1; + match row_ranges { + None => (first_row_id..=file_end).collect(), + Some(ranges) => { + let mut ids = Vec::new(); + for r in ranges { + let from = r.from().max(first_row_id); + let to = r.to().min(file_end); + for id in from..=to { + ids.push(id); + } + } + ids } } - - let filter = BooleanArray::from(mask); - let filtered = arrow_select::filter::filter_record_batch(batch, &filter).map_err(|e| { - Error::UnexpectedError { - message: format!("Failed to filter batch by _ROW_ID ranges: {e}"), - source: Some(Box::new(e)), - } - })?; - Ok(filtered) } /// Insert a column into a RecordBatch at the given position. @@ -1501,20 +1501,6 @@ fn insert_column_at( }) } -/// Append a computed `_ROW_ID` column. Each row's value is `base_row_id + offset`. -fn append_row_id_column( - batch: RecordBatch, - base_row_id: i64, - insert_index: usize, - output_schema: &Arc, -) -> crate::Result { - let row_ids: Vec = (0..batch.num_rows() as i64) - .map(|i| base_row_id + i) - .collect(); - let array: Arc = Arc::new(Int64Array::from(row_ids)); - insert_column_at(batch, array, insert_index, output_schema) -} - /// Append a null `_ROW_ID` column for files without `first_row_id`. fn append_null_row_id_column( batch: RecordBatch, @@ -1532,13 +1518,16 @@ fn build_row_ranges_selection( file_first_row_id: i64, ) -> RowSelection { let total_rows: i64 = row_group_metadata_list.iter().map(|rg| rg.num_rows()).sum(); + if total_rows == 0 { + return vec![].into(); + } let mut local_ranges: Vec<(usize, usize)> = row_ranges .iter() .filter_map(|r| { let local_start = (r.from() - file_first_row_id).max(0) as usize; // Clamp to() to file boundary before +1 to avoid i64 overflow when to() == i64::MAX - let clamped_to = r.to().min(file_first_row_id + total_rows - 1); + let clamped_to = r.to().min(file_first_row_id.saturating_add(total_rows - 1)); let local_end = (clamped_to + 1 - file_first_row_id) as usize; if local_start < local_end { Some((local_start, local_end)) diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 1d067850..3f433f35 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -150,7 +150,11 @@ impl<'a> ReadBuilder<'a> { /// Set row ID ranges `[from, to]` (inclusive) for filtering in data evolution mode. pub fn with_row_ranges(&mut self, ranges: Vec) -> &mut Self { - self.row_ranges = Some(ranges); + self.row_ranges = if ranges.is_empty() { + None + } else { + Some(ranges) + }; self } diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index a26149d5..19444118 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; /// Both `from` and `to` are inclusive, aligned with Java's `Range` class. /// /// Reference: Java's `org.apache.paimon.utils.Range` and pypaimon's `IndexedSplit` with `row_ranges`. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RowRange { from: i64, to: i64, diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index c189ddaf..899c4fff 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -667,26 +667,35 @@ impl<'a> TableScan<'a> { .collect::>>() }); + // Compute row_ranges before moving file_group to avoid clone + let split_row_ranges = if let Some(ref ranges) = self.row_ranges { + let mut split_ranges = Vec::new(); + for file in &file_group { + split_ranges.extend(intersect_ranges_with_file(ranges, file)); + } + let split_ranges = merge_row_ranges(split_ranges); + if split_ranges.is_empty() { + None + } else { + Some(split_ranges) + } + } else { + None + }; + let mut builder = DataSplitBuilder::new() .with_snapshot(snapshot_id) .with_partition(partition_row.clone()) .with_bucket(bucket) .with_bucket_path(bucket_path.clone()) .with_total_buckets(total_buckets) - .with_data_files(file_group.clone()) + .with_data_files(file_group) .with_raw_convertible(raw_convertible); if let Some(files) = data_deletion_files { builder = builder.with_data_deletion_files(files); } - if let Some(ref ranges) = self.row_ranges { - let mut split_ranges = Vec::new(); - for file in &file_group { - split_ranges.extend(intersect_ranges_with_file(ranges, file)); - } - let split_ranges = merge_row_ranges(split_ranges); - if !split_ranges.is_empty() { - builder = builder.with_row_ranges(split_ranges); - } + if let Some(row_ranges) = split_row_ranges { + builder = builder.with_row_ranges(row_ranges); } splits.push(builder.build()?); } From 435854c58c599ea53cfb6502be443854f5a55c44 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:14:19 +0800 Subject: [PATCH 12/23] style: remove unnecessary comments --- crates/paimon/src/arrow/reader.rs | 9 --------- crates/paimon/src/table/read_builder.rs | 1 - crates/paimon/src/table/source.rs | 7 +------ 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index b95f999a..dd9f106a 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -219,8 +219,6 @@ impl ArrowReader { let schema_manager = self.schema_manager; let table_schema_id = self.table_schema_id; - // Check if _ROW_ID is requested; if so, strip it from the file read type - // and compute it after reading. let row_id_index = read_type.iter().position(|f| f.name() == ROW_ID_FIELD_NAME); let file_read_type: Vec = read_type .iter() @@ -242,13 +240,9 @@ impl ArrowReader { None }; - // Skip row_ranges for files without first_row_id (fail-open) let has_row_id = file_meta.first_row_id.is_some(); let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; - // Pre-compute selected row IDs for _ROW_ID column. - // RowSelection is always applied at Parquet level for IO optimization. - // The row ID sequence matches the rows that survive RowSelection. let selected_row_ids = if row_id_index.is_some() && has_row_id { Some(expand_selected_row_ids( file_meta.first_row_id.unwrap(), @@ -284,7 +278,6 @@ impl ArrowReader { let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); yield insert_column_at(batch, array, idx, &output_schema)?; } else { - // No first_row_id: fill _ROW_ID with nulls yield append_null_row_id_column(batch, idx, &output_schema)?; } } else { @@ -1455,7 +1448,6 @@ fn exact_parquet_value<'a, T>( } /// Expand row_ranges into a flat sequence of selected row IDs for a file. -/// When row_ranges is None, returns all row IDs [first_row_id, first_row_id + row_count). fn expand_selected_row_ids( first_row_id: i64, row_count: i64, @@ -1526,7 +1518,6 @@ fn build_row_ranges_selection( .iter() .filter_map(|r| { let local_start = (r.from() - file_first_row_id).max(0) as usize; - // Clamp to() to file boundary before +1 to avoid i64 overflow when to() == i64::MAX let clamped_to = r.to().min(file_first_row_id.saturating_add(total_rows - 1)); let local_end = (clamped_to + 1 - file_first_row_id) as usize; if local_start < local_end { diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 3f433f35..c9be32cd 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -222,7 +222,6 @@ impl<'a> ReadBuilder<'a> { } if name == crate::spec::ROW_ID_FIELD_NAME { - // Virtual _ROW_ID column: computed at read time from first_row_id + offset. resolved.push(DataField::new( crate::spec::ROW_ID_FIELD_ID, crate::spec::ROW_ID_FIELD_NAME.to_string(), diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index 19444118..d1432b92 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -25,10 +25,6 @@ use serde::{Deserialize, Serialize}; // ======================= RowRange =============================== /// An inclusive row ID range `[from, to]` for filtering reads in data evolution mode. -/// -/// Both `from` and `to` are inclusive, aligned with Java's `Range` class. -/// -/// Reference: Java's `org.apache.paimon.utils.Range` and pypaimon's `IndexedSplit` with `row_ranges`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RowRange { from: i64, @@ -78,7 +74,7 @@ impl RowRange { } } -/// Returns `true` (fail-open) if the file has no `first_row_id`. +/// Returns `true` if the file has no `first_row_id`. pub fn any_range_overlaps_file(ranges: &[RowRange], file: &DataFileMeta) -> bool { match file.row_id_range() { None => true, @@ -106,7 +102,6 @@ pub fn merge_row_ranges(mut ranges: Vec) -> Vec { let mut merged: Vec = Vec::with_capacity(ranges.len()); let mut current = ranges[0].clone(); for r in &ranges[1..] { - // Adjacent or overlapping: [0,5] and [6,10] should merge to [0,10] if r.from <= current.to.saturating_add(1) { current.to = current.to.max(r.to); } else { From 6f1845ebd1c3a1b4119899bfac7f65f33c913baf Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:23:22 +0800 Subject: [PATCH 13/23] fix: guard against clamped_to underflow and extract attach_row_id helper --- crates/paimon/src/arrow/reader.rs | 40 +++++++++++++++++++------------ 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index dd9f106a..e539ee65 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -270,16 +270,8 @@ impl ArrowReader { )?; while let Some(batch) = stream.next().await { let batch = batch?; - let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - if let Some(ref ids) = selected_row_ids { - let batch_ids = &ids[row_id_offset..row_id_offset + num_rows]; - row_id_offset += num_rows; - let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); - yield insert_column_at(batch, array, idx, &output_schema)?; - } else { - yield append_null_row_id_column(batch, idx, &output_schema)?; - } + yield attach_row_id(batch, idx, &selected_row_ids, &mut row_id_offset, &output_schema)?; } else { yield batch; } @@ -1471,6 +1463,24 @@ fn expand_selected_row_ids( } /// Insert a column into a RecordBatch at the given position. +fn attach_row_id( + batch: RecordBatch, + row_id_index: usize, + selected_row_ids: &Option>, + row_id_offset: &mut usize, + output_schema: &Arc, +) -> crate::Result { + if let Some(ref ids) = selected_row_ids { + let num_rows = batch.num_rows(); + let batch_ids = &ids[*row_id_offset..*row_id_offset + num_rows]; + *row_id_offset += num_rows; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + insert_column_at(batch, array, row_id_index, output_schema) + } else { + append_null_row_id_column(batch, row_id_index, output_schema) + } +} + fn insert_column_at( batch: RecordBatch, column: Arc, @@ -1514,17 +1524,17 @@ fn build_row_ranges_selection( return vec![].into(); } + let file_end_row_id = file_first_row_id.saturating_add(total_rows - 1); let mut local_ranges: Vec<(usize, usize)> = row_ranges .iter() .filter_map(|r| { + if r.to() < file_first_row_id || r.from() > file_end_row_id { + return None; + } let local_start = (r.from() - file_first_row_id).max(0) as usize; - let clamped_to = r.to().min(file_first_row_id.saturating_add(total_rows - 1)); + let clamped_to = r.to().min(file_end_row_id); let local_end = (clamped_to + 1 - file_first_row_id) as usize; - if local_start < local_end { - Some((local_start, local_end)) - } else { - None - } + Some((local_start, local_end)) }) .collect(); local_ranges.sort_by_key(|&(s, _)| s); From bfc11c0fe48bf5a694c39ec2a10d881f0f6d2338 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:28:53 +0800 Subject: [PATCH 14/23] fix: NULL-fill branch in merge_files_by_columns respects row_ranges --- crates/integration_tests/tests/read_tables.rs | 52 +++++++++++++++++++ crates/paimon/src/arrow/reader.rs | 8 ++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index b910ca26..aa182a5e 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -2176,3 +2176,55 @@ async fn test_read_data_evolution_table_with_row_id_projection() { "_ROW_ID values should be unique" ); } + +#[tokio::test] +async fn test_read_data_evolution_table_only_row_id_with_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + // Get full row ID range + let full_rb = table.new_read_builder(); + let full_plan = full_rb.new_scan().plan().await.expect("plan"); + let mut min_row_id = i64::MAX; + let mut max_row_id = i64::MIN; + for split in full_plan.splits() { + for file in split.data_files() { + if let Some(fid) = file.first_row_id { + min_row_id = min_row_id.min(fid); + max_row_id = max_row_id.max(fid + file.row_count - 1); + } + } + } + + // Project only _ROW_ID with a partial row range + let mid = min_row_id + (max_row_id - min_row_id) / 2; + let mut read_builder = table.new_read_builder(); + read_builder.with_projection(&["_ROW_ID"]); + read_builder.with_row_ranges(vec![RowRange::new(min_row_id, mid)]); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("plan"); + + let read = read_builder.new_read().expect("read"); + let stream = read.to_arrow(plan.splits()).expect("stream"); + let batches: Vec = stream.try_collect().await.expect("collect"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0, "Should have rows"); + // Should have fewer rows than full table due to row_ranges filtering + let full_read = table.new_read_builder().new_read().expect("read"); + let full_count: usize = full_read + .to_arrow(full_plan.splits()) + .expect("stream") + .try_collect::>() + .await + .expect("collect") + .iter() + .map(|b| b.num_rows()) + .sum(); + assert!( + total_rows <= full_count, + "Row range filtered count ({total_rows}) should be <= full count ({full_count})" + ); +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index e539ee65..15b04307 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -679,8 +679,12 @@ fn merge_files_by_columns( // column that no file contains yet), we still need to emit NULL-filled rows to // preserve the correct row count. if active_file_indices.is_empty() { - // All files in a merge group cover the same rows; use the first file's row_count. - let total_rows = data_files[0].row_count as usize; + let first_row_id = data_files[0].first_row_id.unwrap_or(0); + let file_row_count = data_files[0].row_count; + let total_rows = match &row_ranges { + Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, &Some(ranges.clone())).len(), + None => file_row_count as usize, + }; let mut emitted = 0; while emitted < total_rows { let rows_to_emit = (total_rows - emitted).min(output_batch_size); From fddb6b66406f6645294ed458b31ed35be0f98316 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:41:07 +0800 Subject: [PATCH 15/23] perf: lazy row ID computation when no row_ranges, avoid full-file allocation --- crates/paimon/src/arrow/reader.rs | 98 ++++++++++++++++--------------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 15b04307..b5f97ffe 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -244,14 +244,18 @@ impl ArrowReader { let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; let selected_row_ids = if row_id_index.is_some() && has_row_id { - Some(expand_selected_row_ids( - file_meta.first_row_id.unwrap(), - file_meta.row_count, - &effective_row_ranges, - )) + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + file_meta.first_row_id.unwrap(), + file_meta.row_count, + ranges, + ) + }) } else { None }; + let file_base_row_id = file_meta.first_row_id.unwrap_or(0); + let mut row_id_cursor = file_base_row_id; let mut row_id_offset: usize = 0; let mut stream = read_single_file_stream( @@ -270,8 +274,18 @@ impl ArrowReader { )?; while let Some(batch) = stream.next().await { let batch = batch?; + let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - yield attach_row_id(batch, idx, &selected_row_ids, &mut row_id_offset, &output_schema)?; + if !has_row_id { + yield append_null_row_id_column(batch, idx, &output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &output_schema)?; + } } else { yield batch; } @@ -288,17 +302,19 @@ impl ArrowReader { let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; let selected_row_ids = if row_id_index.is_some() && has_group_row_id { - Some(expand_selected_row_ids( - group_base_row_id.unwrap(), - group_row_count, - &effective_row_ranges, - )) + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + group_base_row_id.unwrap(), + group_row_count, + ranges, + ) + }) } else { None }; + let mut row_id_cursor = group_base_row_id.unwrap_or(0); let mut row_id_offset: usize = 0; - // Multiple files need column-wise merge. let mut merge_stream = merge_files_by_columns( &file_io, &split, @@ -313,13 +329,15 @@ impl ArrowReader { let batch = batch?; let num_rows = batch.num_rows(); if let Some(idx) = row_id_index { - if let Some(ref ids) = selected_row_ids { - let batch_ids = &ids[row_id_offset..row_id_offset + num_rows]; - row_id_offset += num_rows; - let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); - yield insert_column_at(batch, array, idx, &output_schema)?; - } else { + if !has_group_row_id { yield append_null_row_id_column(batch, idx, &output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &output_schema)?; } } else { yield batch; @@ -682,7 +700,7 @@ fn merge_files_by_columns( let first_row_id = data_files[0].first_row_id.unwrap_or(0); let file_row_count = data_files[0].row_count; let total_rows = match &row_ranges { - Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, &Some(ranges.clone())).len(), + Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, ranges).len(), None => file_row_count as usize, }; let mut emitted = 0; @@ -1444,45 +1462,31 @@ fn exact_parquet_value<'a, T>( } /// Expand row_ranges into a flat sequence of selected row IDs for a file. -fn expand_selected_row_ids( - first_row_id: i64, - row_count: i64, - row_ranges: &Option>, -) -> Vec { +fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec { let file_end = first_row_id + row_count - 1; - match row_ranges { - None => (first_row_id..=file_end).collect(), - Some(ranges) => { - let mut ids = Vec::new(); - for r in ranges { - let from = r.from().max(first_row_id); - let to = r.to().min(file_end); - for id in from..=to { - ids.push(id); - } - } - ids + let mut ids = Vec::new(); + for r in row_ranges { + let from = r.from().max(first_row_id); + let to = r.to().min(file_end); + for id in from..=to { + ids.push(id); } } + ids } -/// Insert a column into a RecordBatch at the given position. fn attach_row_id( batch: RecordBatch, row_id_index: usize, - selected_row_ids: &Option>, + selected_row_ids: &[i64], row_id_offset: &mut usize, output_schema: &Arc, ) -> crate::Result { - if let Some(ref ids) = selected_row_ids { - let num_rows = batch.num_rows(); - let batch_ids = &ids[*row_id_offset..*row_id_offset + num_rows]; - *row_id_offset += num_rows; - let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); - insert_column_at(batch, array, row_id_index, output_schema) - } else { - append_null_row_id_column(batch, row_id_index, output_schema) - } + let num_rows = batch.num_rows(); + let batch_ids = &selected_row_ids[*row_id_offset..*row_id_offset + num_rows]; + *row_id_offset += num_rows; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + insert_column_at(batch, array, row_id_index, output_schema) } fn insert_column_at( From e9f756601304d7102ef519eea9f68db15949fec5 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:45:06 +0800 Subject: [PATCH 16/23] fix: merge overlapping row_ranges in expand_selected_row_ids to match RowSelection --- crates/paimon/src/arrow/reader.rs | 5 +++-- crates/paimon/src/table/mod.rs | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index b5f97ffe..34246400 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -27,7 +27,7 @@ use crate::spec::{ }; use crate::table::schema_manager::SchemaManager; use crate::table::ArrowRecordBatchStream; -use crate::table::RowRange; +use crate::table::{merge_row_ranges, RowRange}; use crate::{DataSplit, Error}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, @@ -1464,8 +1464,9 @@ fn exact_parquet_value<'a, T>( /// Expand row_ranges into a flat sequence of selected row IDs for a file. fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec { let file_end = first_row_id + row_count - 1; + let merged = merge_row_ranges(row_ranges.to_vec()); let mut ids = Vec::new(); - for r in row_ranges { + for r in &merged { let from = r.from().max(first_row_id); let to = r.to().min(file_end); for id in from..=to { diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index f128bd2d..afcefc0b 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -33,7 +33,9 @@ use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use schema_manager::SchemaManager; pub use snapshot_manager::SnapshotManager; -pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange}; +pub use source::{ + merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, +}; pub use table_scan::TableScan; pub use tag_manager::TagManager; From 142e3c032a6dfb7c9b0937c7cd00c6c802954a36 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:47:08 +0800 Subject: [PATCH 17/23] add debug_assert for merge group invariants aligned with Java checkArgument --- crates/paimon/src/arrow/reader.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 34246400..2b06bc1d 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -292,13 +292,26 @@ impl ArrowReader { } } } else { - let group_base_row_id = split - .data_files() + let files = split.data_files(); + debug_assert!( + files.iter().all(|f| f.first_row_id.is_some()), + "All files in a field merge split should have first_row_id" + ); + debug_assert!( + files.iter().all(|f| f.row_count == files[0].row_count), + "All files in a field merge split should have the same row count" + ); + debug_assert!( + files.iter().all(|f| f.first_row_id == files[0].first_row_id), + "All files in a field merge split should have the same first row id" + ); + + let group_base_row_id = files .iter() .filter_map(|f| f.first_row_id) .min(); let has_group_row_id = group_base_row_id.is_some(); - let group_row_count = split.data_files().iter().map(|f| f.row_count).max().unwrap_or(0); + let group_row_count = files.iter().map(|f| f.row_count).max().unwrap_or(0); let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; let selected_row_ids = if row_id_index.is_some() && has_group_row_id { From ac911b519f7713eca1eb906682dfce2d5839635e Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 20:56:06 +0800 Subject: [PATCH 18/23] use assert! instead of debug_assert! for merge group invariants --- crates/paimon/src/arrow/reader.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 2b06bc1d..5040605f 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -293,15 +293,15 @@ impl ArrowReader { } } else { let files = split.data_files(); - debug_assert!( + assert!( files.iter().all(|f| f.first_row_id.is_some()), "All files in a field merge split should have first_row_id" ); - debug_assert!( + assert!( files.iter().all(|f| f.row_count == files[0].row_count), "All files in a field merge split should have the same row count" ); - debug_assert!( + assert!( files.iter().all(|f| f.first_row_id == files[0].first_row_id), "All files in a field merge split should have the same first row id" ); From d90879ea7edb609fa2d5cb90ffd7faa4d8df623a Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 21:31:07 +0800 Subject: [PATCH 19/23] remove unused is_row_id_field and is_empty methods --- crates/paimon/src/spec/schema.rs | 4 ---- crates/paimon/src/table/source.rs | 5 ----- 2 files changed, 9 deletions(-) diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index bd1244c0..39d6aeec 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -164,10 +164,6 @@ impl DataField { self } - pub fn is_row_id_field(&self) -> bool { - self.name == ROW_ID_FIELD_NAME - } - pub fn with_description(mut self, new_description: Option) -> Self { self.description = new_description; self diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index d1432b92..376fe1a3 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -49,10 +49,6 @@ impl RowRange { self.to - self.from + 1 } - pub fn is_empty(&self) -> bool { - self.from > self.to - } - /// Check overlap with an inclusive file range `[file_start, file_end]`. pub fn overlaps_inclusive(&self, file_start: i64, file_end_inclusive: i64) -> bool { self.from <= file_end_inclusive && self.to >= file_start @@ -340,7 +336,6 @@ mod row_range_tests { fn test_row_range_count_and_empty() { let r = RowRange::new(5, 10); assert_eq!(r.count(), 6); // rows 5,6,7,8,9,10 - assert!(!r.is_empty()); } } From 11584471c04553a373ff8ba95727ee4934378331 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 22:15:55 +0800 Subject: [PATCH 20/23] address review: assert in RowRange::new, handle row_count==0, remove redundant merge, avoid clone in merge_row_ranges, update limit comment --- crates/paimon/src/arrow/reader.rs | 8 +++++--- crates/paimon/src/table/source.rs | 9 +++++---- crates/paimon/src/table/table_scan.rs | 5 ++--- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 5040605f..4524ed45 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -27,7 +27,7 @@ use crate::spec::{ }; use crate::table::schema_manager::SchemaManager; use crate::table::ArrowRecordBatchStream; -use crate::table::{merge_row_ranges, RowRange}; +use crate::table::RowRange; use crate::{DataSplit, Error}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, @@ -1476,10 +1476,12 @@ fn exact_parquet_value<'a, T>( /// Expand row_ranges into a flat sequence of selected row IDs for a file. fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec { + if row_count == 0 { + return Vec::new(); + } let file_end = first_row_id + row_count - 1; - let merged = merge_row_ranges(row_ranges.to_vec()); let mut ids = Vec::new(); - for r in &merged { + for r in row_ranges { let from = r.from().max(first_row_id); let to = r.to().min(file_end); for id in from..=to { diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index 376fe1a3..f45fa049 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -33,7 +33,7 @@ pub struct RowRange { impl RowRange { pub fn new(from: i64, to: i64) -> Self { - debug_assert!(from <= to, "RowRange from ({from}) must be <= to ({to})"); + assert!(from <= to, "RowRange from ({from}) must be <= to ({to})"); Self { from, to } } @@ -96,13 +96,14 @@ pub fn merge_row_ranges(mut ranges: Vec) -> Vec { } ranges.sort_by_key(|r| r.from); let mut merged: Vec = Vec::with_capacity(ranges.len()); - let mut current = ranges[0].clone(); - for r in &ranges[1..] { + let mut iter = ranges.into_iter(); + let mut current = iter.next().unwrap(); + for r in iter { if r.from <= current.to.saturating_add(1) { current.to = current.to.max(r.to); } else { merged.push(current); - current = r.clone(); + current = r; } } merged.push(current); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 899c4fff..4e18745c 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -701,9 +701,8 @@ impl<'a> TableScan<'a> { } } - // Apply limit pushdown only when there are no data predicates. - // With data predicates, merged_row_count() reflects pre-filter row counts, - // so stopping early could return fewer rows than the limit after filtering. + // With data predicates or row_ranges, merged_row_count() reflects pre-filter + // row counts, so stopping early could return fewer rows than the limit. let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { self.apply_limit_pushdown(splits) } else { From 481fe2c5ed8ef60112d685cd892ed21ac00a234d Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 23:25:33 +0800 Subject: [PATCH 21/23] support _ROW_ID in DataFusion SQL for data evolution tables --- .../integrations/datafusion/src/table/mod.rs | 12 ++- .../datafusion/tests/read_tables.rs | 74 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index a76174dd..3df1bda2 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -54,9 +54,17 @@ impl PaimonTableProvider { /// /// Loads the table schema and converts it to Arrow for DataFusion. pub fn try_new(table: Table) -> DFResult { - let fields = table.schema().fields(); + let mut fields = table.schema().fields().to_vec(); + let core_options = paimon::spec::CoreOptions::new(table.schema().options()); + if core_options.data_evolution_enabled() { + fields.push(paimon::spec::DataField::new( + paimon::spec::ROW_ID_FIELD_ID, + paimon::spec::ROW_ID_FIELD_NAME.to_string(), + paimon::spec::DataType::BigInt(paimon::spec::BigIntType::with_nullable(true)), + )); + } let schema = - paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?; + paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?; Ok(Self { table, schema }) } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index d438720c..f6a9f507 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -599,3 +599,77 @@ async fn test_read_complex_type_table_via_datafusion() { assert_eq!(rows[2].2, "{}"); assert_eq!(rows[2].3, "{name: carol, value: 300}"); } + +#[tokio::test] +async fn test_select_row_id_from_data_evolution_table() { + use datafusion::arrow::array::Int64Array; + + let ctx = create_context("data_evolution_table").await; + + let batches = ctx + .sql("SELECT _ROW_ID, id, name FROM data_evolution_table") + .await + .expect("SQL should parse") + .collect() + .await + .expect("query should execute"); + + assert!(!batches.is_empty()); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0); + + for batch in &batches { + let row_id_col = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID column should exist"); + let row_id_array = row_id_col + .as_any() + .downcast_ref::() + .expect("_ROW_ID should be Int64"); + for i in 0..batch.num_rows() { + assert!( + row_id_array.is_valid(i), + "_ROW_ID should not be null for data evolution table" + ); + assert!(row_id_array.value(i) >= 0); + } + } +} + +#[tokio::test] +async fn test_filter_row_id_from_data_evolution_table() { + use datafusion::arrow::array::Int64Array; + + let ctx = create_context("data_evolution_table").await; + + let all_batches = ctx + .sql("SELECT _ROW_ID FROM data_evolution_table") + .await + .expect("SQL") + .collect() + .await + .expect("collect"); + let all_count: usize = all_batches.iter().map(|b| b.num_rows()).sum(); + + let filtered_batches = ctx + .sql("SELECT _ROW_ID, id FROM data_evolution_table WHERE _ROW_ID = 0") + .await + .expect("SQL") + .collect() + .await + .expect("collect"); + let filtered_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum(); + + assert!(filtered_count <= all_count); + for batch in &filtered_batches { + let row_id_array = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID") + .as_any() + .downcast_ref::() + .expect("Int64"); + for i in 0..batch.num_rows() { + assert_eq!(row_id_array.value(i), 0); + } + } +} From de7db6708636807ab724e935657961da5169becf Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 23:40:41 +0800 Subject: [PATCH 22/23] fix: quote _ROW_ID in SQL to preserve case sensitivity --- crates/integrations/datafusion/tests/read_tables.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index f6a9f507..b7c99028 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -607,7 +607,7 @@ async fn test_select_row_id_from_data_evolution_table() { let ctx = create_context("data_evolution_table").await; let batches = ctx - .sql("SELECT _ROW_ID, id, name FROM data_evolution_table") + .sql(r#"SELECT "_ROW_ID", id, name FROM data_evolution_table"#) .await .expect("SQL should parse") .collect() @@ -643,7 +643,7 @@ async fn test_filter_row_id_from_data_evolution_table() { let ctx = create_context("data_evolution_table").await; let all_batches = ctx - .sql("SELECT _ROW_ID FROM data_evolution_table") + .sql(r#"SELECT "_ROW_ID" FROM data_evolution_table"#) .await .expect("SQL") .collect() @@ -652,7 +652,7 @@ async fn test_filter_row_id_from_data_evolution_table() { let all_count: usize = all_batches.iter().map(|b| b.num_rows()).sum(); let filtered_batches = ctx - .sql("SELECT _ROW_ID, id FROM data_evolution_table WHERE _ROW_ID = 0") + .sql(r#"SELECT "_ROW_ID", id FROM data_evolution_table WHERE "_ROW_ID" = 0"#) .await .expect("SQL") .collect() From 1d3cf974d939485eeaf2471f27fbe29d17fb3c82 Mon Sep 17 00:00:00 2001 From: XiaoHongbo <1346652787@qq.com> Date: Mon, 6 Apr 2026 23:54:58 +0800 Subject: [PATCH 23/23] fix: always pass column names to ReadBuilder to include _ROW_ID in SELECT * --- crates/integrations/datafusion/src/table/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 3df1bda2..fdc4f0ea 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -103,7 +103,6 @@ impl TableProvider for PaimonTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Convert projection indices to column names and compute projected schema let (projected_schema, projected_columns) = if let Some(indices) = projection { let fields: Vec = indices .iter() @@ -112,7 +111,13 @@ impl TableProvider for PaimonTableProvider { let column_names: Vec = fields.iter().map(|f| f.name().clone()).collect(); (Arc::new(Schema::new(fields)), Some(column_names)) } else { - (self.schema.clone(), None) + let column_names: Vec = self + .schema + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(); + (self.schema.clone(), Some(column_names)) }; // Plan splits eagerly so we know partition count upfront.