diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 008b081e..aa182a5e 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -2000,3 +2000,231 @@ async fn test_bucket_predicate_filtering_long_string_key() { actual.len() ); } + +// --------------------------------------------------------------------------- +// Data Evolution Row ID Range Filter integration tests +// --------------------------------------------------------------------------- + +async fn scan_and_read_with_row_ranges( + table: &paimon::Table, + row_ranges: Vec, +) -> (Plan, Vec) { + let mut read_builder = table.new_read_builder(); + read_builder.with_row_ranges(row_ranges); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("Failed to plan scan"); + + let read = read_builder.new_read().expect("Failed to create read"); + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec<_> = stream + .try_collect() + .await + .expect("Failed to collect batches"); + + (plan, batches) +} + +fn extract_id_name_value(batches: &[RecordBatch]) -> Vec<(i32, String, i32)> { + let mut rows = Vec::new(); + for batch in batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let value = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("value"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), name.value(i).to_string(), value.value(i))); + } + } + rows.sort_by_key(|(id, _, _)| *id); + rows +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (full_plan, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await; + let full_rows = extract_id_name_value(&full_batches); + let full_row_count: usize = full_batches.iter().map(|b| b.num_rows()).sum(); + assert!(full_row_count > 0); + + let mut min_row_id = i64::MAX; + let mut max_row_id_exclusive = i64::MIN; + for split in full_plan.splits() { + for file in split.data_files() { + if let Some(fid) = file.first_row_id { + min_row_id = min_row_id.min(fid); + max_row_id_exclusive = max_row_id_exclusive.max(fid + file.row_count); + } + } + } + assert!(min_row_id < max_row_id_exclusive); + + let mid = min_row_id + (max_row_id_exclusive - min_row_id) / 2; + let (filtered_plan, filtered_batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(min_row_id, mid)]).await; + + let filtered_row_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum(); + let filtered_rows = extract_id_name_value(&filtered_batches); + + assert!( + filtered_row_count < full_row_count || mid >= max_row_id_exclusive, + "filtered={filtered_row_count}, full={full_row_count}" + ); + for row in &filtered_rows { + assert!( + full_rows.contains(row), + "Filtered row {row:?} not in full result" + ); + } + assert!(filtered_plan.splits().len() <= full_plan.splits().len()); +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_empty_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (plan, batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(999_999, 1_000_000)]).await; + + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(row_count, 0); + assert!(plan.splits().is_empty()); +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_full_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + let (_, full_batches) = scan_and_read(&catalog, "data_evolution_table", None).await; + let full_rows = extract_id_name_value(&full_batches); + + let (_, filtered_batches) = + scan_and_read_with_row_ranges(&table, vec![RowRange::new(0, i64::MAX)]).await; + let filtered_rows = extract_id_name_value(&filtered_batches); + + assert_eq!(filtered_rows, full_rows); +} + +#[tokio::test] +async fn test_read_data_evolution_table_with_row_id_projection() { + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + // Project _ROW_ID along with regular columns + let mut read_builder = table.new_read_builder(); + read_builder.with_projection(&["_ROW_ID", "id", "name"]); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("Failed to plan scan"); + + let read = read_builder.new_read().expect("Failed to create read"); + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + let batches: Vec = stream + .try_collect() + .await + .expect("Failed to collect batches"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0, "Should have rows"); + + // Verify _ROW_ID column exists and contains non-negative values + let mut row_ids: Vec = Vec::new(); + for batch in &batches { + let row_id_col = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID column should exist"); + let row_id_array = row_id_col + .as_any() + .downcast_ref::() + .expect("_ROW_ID should be Int64"); + for i in 0..batch.num_rows() { + row_ids.push(row_id_array.value(i)); + } + } + + assert_eq!(row_ids.len(), total_rows); + assert!( + row_ids.iter().all(|&id| id >= 0), + "All _ROW_ID values should be non-negative" + ); + // _ROW_ID values should be unique + let unique: std::collections::HashSet = row_ids.iter().copied().collect(); + assert_eq!( + unique.len(), + row_ids.len(), + "_ROW_ID values should be unique" + ); +} + +#[tokio::test] +async fn test_read_data_evolution_table_only_row_id_with_row_ranges() { + use paimon::RowRange; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "data_evolution_table").await; + + // Get full row ID range + let full_rb = table.new_read_builder(); + let full_plan = full_rb.new_scan().plan().await.expect("plan"); + let mut min_row_id = i64::MAX; + let mut max_row_id = i64::MIN; + for split in full_plan.splits() { + for file in split.data_files() { + if let Some(fid) = file.first_row_id { + min_row_id = min_row_id.min(fid); + max_row_id = max_row_id.max(fid + file.row_count - 1); + } + } + } + + // Project only _ROW_ID with a partial row range + let mid = min_row_id + (max_row_id - min_row_id) / 2; + let mut read_builder = table.new_read_builder(); + read_builder.with_projection(&["_ROW_ID"]); + read_builder.with_row_ranges(vec![RowRange::new(min_row_id, mid)]); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("plan"); + + let read = read_builder.new_read().expect("read"); + let stream = read.to_arrow(plan.splits()).expect("stream"); + let batches: Vec = stream.try_collect().await.expect("collect"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0, "Should have rows"); + // Should have fewer rows than full table due to row_ranges filtering + let full_read = table.new_read_builder().new_read().expect("read"); + let full_count: usize = full_read + .to_arrow(full_plan.splits()) + .expect("stream") + .try_collect::>() + .await + .expect("collect") + .iter() + .map(|b| b.num_rows()) + .sum(); + assert!( + total_rows <= full_count, + "Row range filtered count ({total_rows}) should be <= full count ({full_count})" + ); +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index a76174dd..fdc4f0ea 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -54,9 +54,17 @@ impl PaimonTableProvider { /// /// Loads the table schema and converts it to Arrow for DataFusion. pub fn try_new(table: Table) -> DFResult { - let fields = table.schema().fields(); + let mut fields = table.schema().fields().to_vec(); + let core_options = paimon::spec::CoreOptions::new(table.schema().options()); + if core_options.data_evolution_enabled() { + fields.push(paimon::spec::DataField::new( + paimon::spec::ROW_ID_FIELD_ID, + paimon::spec::ROW_ID_FIELD_NAME.to_string(), + paimon::spec::DataType::BigInt(paimon::spec::BigIntType::with_nullable(true)), + )); + } let schema = - paimon::arrow::build_target_arrow_schema(fields).map_err(to_datafusion_error)?; + paimon::arrow::build_target_arrow_schema(&fields).map_err(to_datafusion_error)?; Ok(Self { table, schema }) } @@ -95,7 +103,6 @@ impl TableProvider for PaimonTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Convert projection indices to column names and compute projected schema let (projected_schema, projected_columns) = if let Some(indices) = projection { let fields: Vec = indices .iter() @@ -104,7 +111,13 @@ impl TableProvider for PaimonTableProvider { let column_names: Vec = fields.iter().map(|f| f.name().clone()).collect(); (Arc::new(Schema::new(fields)), Some(column_names)) } else { - (self.schema.clone(), None) + let column_names: Vec = self + .schema + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(); + (self.schema.clone(), Some(column_names)) }; // Plan splits eagerly so we know partition count upfront. diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index d438720c..b7c99028 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -599,3 +599,77 @@ async fn test_read_complex_type_table_via_datafusion() { assert_eq!(rows[2].2, "{}"); assert_eq!(rows[2].3, "{name: carol, value: 300}"); } + +#[tokio::test] +async fn test_select_row_id_from_data_evolution_table() { + use datafusion::arrow::array::Int64Array; + + let ctx = create_context("data_evolution_table").await; + + let batches = ctx + .sql(r#"SELECT "_ROW_ID", id, name FROM data_evolution_table"#) + .await + .expect("SQL should parse") + .collect() + .await + .expect("query should execute"); + + assert!(!batches.is_empty()); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 0); + + for batch in &batches { + let row_id_col = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID column should exist"); + let row_id_array = row_id_col + .as_any() + .downcast_ref::() + .expect("_ROW_ID should be Int64"); + for i in 0..batch.num_rows() { + assert!( + row_id_array.is_valid(i), + "_ROW_ID should not be null for data evolution table" + ); + assert!(row_id_array.value(i) >= 0); + } + } +} + +#[tokio::test] +async fn test_filter_row_id_from_data_evolution_table() { + use datafusion::arrow::array::Int64Array; + + let ctx = create_context("data_evolution_table").await; + + let all_batches = ctx + .sql(r#"SELECT "_ROW_ID" FROM data_evolution_table"#) + .await + .expect("SQL") + .collect() + .await + .expect("collect"); + let all_count: usize = all_batches.iter().map(|b| b.num_rows()).sum(); + + let filtered_batches = ctx + .sql(r#"SELECT "_ROW_ID", id FROM data_evolution_table WHERE "_ROW_ID" = 0"#) + .await + .expect("SQL") + .collect() + .await + .expect("collect"); + let filtered_count: usize = filtered_batches.iter().map(|b| b.num_rows()).sum(); + + assert!(filtered_count <= all_count); + for batch in &filtered_batches { + let row_id_array = batch + .column_by_name("_ROW_ID") + .expect("_ROW_ID") + .as_any() + .downcast_ref::() + .expect("Int64"); + for i in 0..batch.num_rows() { + assert_eq!(row_id_array.value(i), 0); + } + } +} diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index 91a67c62..4524ed45 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -22,9 +22,12 @@ use crate::arrow::filtering::{ use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX}; use crate::deletion_vector::{DeletionVector, DeletionVectorFactory}; use crate::io::{FileIO, FileRead, FileStatus}; -use crate::spec::{DataField, DataFileMeta, DataType, Datum, Predicate, PredicateOperator}; +use crate::spec::{ + DataField, DataFileMeta, DataType, Datum, Predicate, PredicateOperator, ROW_ID_FIELD_NAME, +}; use crate::table::schema_manager::SchemaManager; use crate::table::ArrowRecordBatchStream; +use crate::table::RowRange; use crate::{DataSplit, Error}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, @@ -183,6 +186,7 @@ impl ArrowReader { predicates: predicates.clone(), batch_size, dv, + row_ranges: None, }, )?; while let Some(batch) = stream.next().await { @@ -215,8 +219,18 @@ impl ArrowReader { let schema_manager = self.schema_manager; let table_schema_id = self.table_schema_id; + let row_id_index = read_type.iter().position(|f| f.name() == ROW_ID_FIELD_NAME); + let file_read_type: Vec = read_type + .iter() + .filter(|f| f.name() != ROW_ID_FIELD_NAME) + .cloned() + .collect(); + let output_schema = build_target_arrow_schema(&read_type)?; + Ok(try_stream! { for split in splits { + let row_ranges = split.row_ranges().map(|r| r.to_vec()); + if split.raw_convertible() || split.data_files().len() == 1 { for file_meta in split.data_files().to_vec() { let data_fields: Option> = if file_meta.schema_id != table_schema_id { @@ -226,36 +240,121 @@ impl ArrowReader { None }; + let has_row_id = file_meta.first_row_id.is_some(); + let effective_row_ranges = if has_row_id { row_ranges.clone() } else { None }; + + let selected_row_ids = if row_id_index.is_some() && has_row_id { + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + file_meta.first_row_id.unwrap(), + file_meta.row_count, + ranges, + ) + }) + } else { + None + }; + let file_base_row_id = file_meta.first_row_id.unwrap_or(0); + let mut row_id_cursor = file_base_row_id; + let mut row_id_offset: usize = 0; + let mut stream = read_single_file_stream( file_io.clone(), SingleFileReadRequest { split: split.clone(), file_meta, - read_type: read_type.clone(), + read_type: file_read_type.clone(), table_fields: table_fields.clone(), data_fields, predicates: Vec::new(), batch_size, dv: None, + row_ranges: effective_row_ranges, }, )?; while let Some(batch) = stream.next().await { - yield batch?; + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = row_id_index { + if !has_row_id { + yield append_null_row_id_column(batch, idx, &output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &output_schema)?; + } + } else { + yield batch; + } } } } else { - // Multiple files need column-wise merge. + let files = split.data_files(); + assert!( + files.iter().all(|f| f.first_row_id.is_some()), + "All files in a field merge split should have first_row_id" + ); + assert!( + files.iter().all(|f| f.row_count == files[0].row_count), + "All files in a field merge split should have the same row count" + ); + assert!( + files.iter().all(|f| f.first_row_id == files[0].first_row_id), + "All files in a field merge split should have the same first row id" + ); + + let group_base_row_id = files + .iter() + .filter_map(|f| f.first_row_id) + .min(); + let has_group_row_id = group_base_row_id.is_some(); + let group_row_count = files.iter().map(|f| f.row_count).max().unwrap_or(0); + let effective_row_ranges = if has_group_row_id { row_ranges.clone() } else { None }; + + let selected_row_ids = if row_id_index.is_some() && has_group_row_id { + effective_row_ranges.as_ref().map(|ranges| { + expand_selected_row_ids( + group_base_row_id.unwrap(), + group_row_count, + ranges, + ) + }) + } else { + None + }; + let mut row_id_cursor = group_base_row_id.unwrap_or(0); + let mut row_id_offset: usize = 0; + let mut merge_stream = merge_files_by_columns( &file_io, &split, - &read_type, + &file_read_type, &table_fields, schema_manager.clone(), table_schema_id, batch_size, + effective_row_ranges, )?; while let Some(batch) = merge_stream.next().await { - yield batch?; + let batch = batch?; + let num_rows = batch.num_rows(); + if let Some(idx) = row_id_index { + if !has_group_row_id { + yield append_null_row_id_column(batch, idx, &output_schema)?; + } else if let Some(ref ids) = selected_row_ids { + yield attach_row_id(batch, idx, ids, &mut row_id_offset, &output_schema)?; + } else { + let row_ids: Vec = (row_id_cursor..row_id_cursor + num_rows as i64).collect(); + row_id_cursor += num_rows as i64; + let array: Arc = Arc::new(Int64Array::from(row_ids)); + yield insert_column_at(batch, array, idx, &output_schema)?; + } + } else { + yield batch; + } } } } @@ -273,6 +372,7 @@ struct SingleFileReadRequest { predicates: Vec, batch_size: Option, dv: Option>, + row_ranges: Option>, } /// Read a single parquet file from a split, returning a lazy stream of batches. @@ -298,6 +398,7 @@ fn read_single_file_stream( predicates, batch_size, dv, + row_ranges, } = request; let target_schema = build_target_arrow_schema(&read_type)?; @@ -393,6 +494,17 @@ fn read_single_file_stream( ); } } + if let Some(ref ranges) = row_ranges { + let range_selection = build_row_ranges_selection( + batch_stream_builder.metadata().row_groups(), + ranges, + file_meta.first_row_id.unwrap_or(0), + ); + row_selection = intersect_optional_row_selections( + row_selection, + Some(range_selection), + ); + } if let Some(row_selection) = row_selection { batch_stream_builder = batch_stream_builder.with_row_selection(row_selection); } @@ -494,6 +606,7 @@ fn read_single_file_stream( /// per file. Each poll slices up to `batch_size` rows from each file's current batch, /// assembles columns from the winning files, and yields the merged batch. When a file's /// current batch is exhausted, the next batch is read from its stream on demand. +#[allow(clippy::too_many_arguments)] fn merge_files_by_columns( file_io: &FileIO, split: &DataSplit, @@ -502,6 +615,7 @@ fn merge_files_by_columns( schema_manager: SchemaManager, table_schema_id: i64, batch_size: Option, + row_ranges: Option>, ) -> crate::Result { let data_files = split.data_files(); if data_files.is_empty() { @@ -596,8 +710,12 @@ fn merge_files_by_columns( // column that no file contains yet), we still need to emit NULL-filled rows to // preserve the correct row count. if active_file_indices.is_empty() { - // All files in a merge group cover the same rows; use the first file's row_count. - let total_rows = data_files[0].row_count as usize; + let first_row_id = data_files[0].first_row_id.unwrap_or(0); + let file_row_count = data_files[0].row_count; + let total_rows = match &row_ranges { + Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, ranges).len(), + None => file_row_count as usize, + }; let mut emitted = 0; while emitted < total_rows { let rows_to_emit = (total_rows - emitted).min(output_batch_size); @@ -647,6 +765,7 @@ fn merge_files_by_columns( predicates: Vec::new(), batch_size, dv: None, + row_ranges: row_ranges.clone(), }, )?; file_streams.insert(file_idx, stream); @@ -1355,6 +1474,114 @@ fn exact_parquet_value<'a, T>( } } +/// Expand row_ranges into a flat sequence of selected row IDs for a file. +fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec { + if row_count == 0 { + return Vec::new(); + } + let file_end = first_row_id + row_count - 1; + let mut ids = Vec::new(); + for r in row_ranges { + let from = r.from().max(first_row_id); + let to = r.to().min(file_end); + for id in from..=to { + ids.push(id); + } + } + ids +} + +fn attach_row_id( + batch: RecordBatch, + row_id_index: usize, + selected_row_ids: &[i64], + row_id_offset: &mut usize, + output_schema: &Arc, +) -> crate::Result { + let num_rows = batch.num_rows(); + let batch_ids = &selected_row_ids[*row_id_offset..*row_id_offset + num_rows]; + *row_id_offset += num_rows; + let array: Arc = Arc::new(Int64Array::from(batch_ids.to_vec())); + insert_column_at(batch, array, row_id_index, output_schema) +} + +fn insert_column_at( + batch: RecordBatch, + column: Arc, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let mut columns: Vec> = Vec::with_capacity(batch.num_columns() + 1); + for (i, col) in batch.columns().iter().enumerate() { + if i == insert_index { + columns.push(column.clone()); + } + columns.push(col.clone()); + } + if insert_index >= batch.num_columns() { + columns.push(column); + } + RecordBatch::try_new(output_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to insert column into RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +/// Append a null `_ROW_ID` column for files without `first_row_id`. +fn append_null_row_id_column( + batch: RecordBatch, + insert_index: usize, + output_schema: &Arc, +) -> crate::Result { + let array: Arc = Arc::new(Int64Array::new_null(batch.num_rows())); + insert_column_at(batch, array, insert_index, output_schema) +} + +/// Build a Parquet [RowSelection] from inclusive `[from, to]` row ranges. +fn build_row_ranges_selection( + row_group_metadata_list: &[RowGroupMetaData], + row_ranges: &[RowRange], + file_first_row_id: i64, +) -> RowSelection { + let total_rows: i64 = row_group_metadata_list.iter().map(|rg| rg.num_rows()).sum(); + if total_rows == 0 { + return vec![].into(); + } + + let file_end_row_id = file_first_row_id.saturating_add(total_rows - 1); + let mut local_ranges: Vec<(usize, usize)> = row_ranges + .iter() + .filter_map(|r| { + if r.to() < file_first_row_id || r.from() > file_end_row_id { + return None; + } + let local_start = (r.from() - file_first_row_id).max(0) as usize; + let clamped_to = r.to().min(file_end_row_id); + let local_end = (clamped_to + 1 - file_first_row_id) as usize; + Some((local_start, local_end)) + }) + .collect(); + local_ranges.sort_by_key(|&(s, _)| s); + + let mut selectors: Vec = Vec::new(); + let mut cursor: usize = 0; + for (start, end) in &local_ranges { + if *start > cursor { + selectors.push(RowSelector::skip(*start - cursor)); + } + let select_start = (*start).max(cursor); + if *end > select_start { + selectors.push(RowSelector::select(*end - select_start)); + } + cursor = cursor.max(*end); + } + let total = total_rows as usize; + if cursor < total { + selectors.push(RowSelector::skip(total - cursor)); + } + selectors.into() +} + /// Builds a Parquet [RowSelection] from deletion vector. /// Only rows not in the deletion vector are selected; deleted rows are skipped at read time. /// todo: Uses [DeletionVectorIterator] with [advance_to](DeletionVectorIterator::advance_to) when skipping row groups similar to iceberg-rust diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index fe340f3d..8771466e 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -39,6 +39,6 @@ pub use catalog::CatalogFactory; pub use catalog::FileSystemCatalog; pub use table::{ - DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager, - Table, TableRead, TableScan, TagManager, + DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, RowRange, + SnapshotManager, Table, TableRead, TableScan, TagManager, }; diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index c1047ad4..39d6aeec 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -110,6 +110,10 @@ impl TableSchema { } } +pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; + +pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; + /// Data field for paimon table. /// /// Impl Reference: diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 481c757b..afcefc0b 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -33,7 +33,9 @@ use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use schema_manager::SchemaManager; pub use snapshot_manager::SnapshotManager; -pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; +pub use source::{ + merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, +}; pub use table_scan::TableScan; pub use tag_manager::TagManager; diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 4a405ea1..c9be32cd 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -25,6 +25,7 @@ use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::filtering::reader_pruning_predicates; use crate::arrow::ArrowReaderBuilder; use crate::spec::{CoreOptions, DataField, Predicate}; +use crate::table::source::RowRange; use crate::Result; use crate::{DataSplit, Error}; use std::collections::{HashMap, HashSet}; @@ -105,6 +106,7 @@ pub struct ReadBuilder<'a> { projected_fields: Option>, filter: NormalizedFilter, limit: Option, + row_ranges: Option>, } impl<'a> ReadBuilder<'a> { @@ -114,6 +116,7 @@ impl<'a> ReadBuilder<'a> { projected_fields: None, filter: NormalizedFilter::default(), limit: None, + row_ranges: None, } } @@ -145,6 +148,16 @@ impl<'a> ReadBuilder<'a> { self } + /// Set row ID ranges `[from, to]` (inclusive) for filtering in data evolution mode. + pub fn with_row_ranges(&mut self, ranges: Vec) -> &mut Self { + self.row_ranges = if ranges.is_empty() { + None + } else { + Some(ranges) + }; + self + } + /// Push a row-limit hint down to scan planning. /// /// This allows the scan to generate fewer splits when possible. The hint is @@ -166,6 +179,7 @@ impl<'a> ReadBuilder<'a> { self.filter.data_predicates.clone(), self.filter.bucket_predicate.clone(), self.limit, + self.row_ranges.clone(), ) } @@ -207,6 +221,15 @@ impl<'a> ReadBuilder<'a> { }); } + if name == crate::spec::ROW_ID_FIELD_NAME { + resolved.push(DataField::new( + crate::spec::ROW_ID_FIELD_ID, + crate::spec::ROW_ID_FIELD_NAME.to_string(), + crate::spec::DataType::BigInt(crate::spec::BigIntType::with_nullable(true)), + )); + continue; + } + let field = field_map .get(name.as_str()) .ok_or_else(|| Error::ColumnNotExist { diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index a137a9ca..f45fa049 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -22,6 +22,324 @@ use crate::spec::{BinaryRow, DataFileMeta}; use crate::table::stats_filter::group_by_overlapping_row_id; use serde::{Deserialize, Serialize}; +// ======================= RowRange =============================== + +/// An inclusive row ID range `[from, to]` for filtering reads in data evolution mode. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RowRange { + from: i64, + to: i64, +} + +impl RowRange { + pub fn new(from: i64, to: i64) -> Self { + assert!(from <= to, "RowRange from ({from}) must be <= to ({to})"); + Self { from, to } + } + + pub fn from(&self) -> i64 { + self.from + } + + pub fn to(&self) -> i64 { + self.to + } + + pub fn count(&self) -> i64 { + self.to - self.from + 1 + } + + /// Check overlap with an inclusive file range `[file_start, file_end]`. + pub fn overlaps_inclusive(&self, file_start: i64, file_end_inclusive: i64) -> bool { + self.from <= file_end_inclusive && self.to >= file_start + } + + /// Intersect with an inclusive file range `[file_start, file_end]`. + pub fn intersect_inclusive( + &self, + file_start: i64, + file_end_inclusive: i64, + ) -> Option { + let from = self.from.max(file_start); + let to = self.to.min(file_end_inclusive); + if from <= to { + Some(RowRange::new(from, to)) + } else { + None + } + } +} + +/// Returns `true` if the file has no `first_row_id`. +pub fn any_range_overlaps_file(ranges: &[RowRange], file: &DataFileMeta) -> bool { + match file.row_id_range() { + None => true, + Some((file_start, file_end)) => ranges + .iter() + .any(|r| r.overlaps_inclusive(file_start, file_end)), + } +} + +pub fn intersect_ranges_with_file(ranges: &[RowRange], file: &DataFileMeta) -> Vec { + match file.row_id_range() { + None => Vec::new(), + Some((file_start, file_end)) => ranges + .iter() + .filter_map(|r| r.intersect_inclusive(file_start, file_end)) + .collect(), + } +} + +pub fn merge_row_ranges(mut ranges: Vec) -> Vec { + if ranges.len() <= 1 { + return ranges; + } + ranges.sort_by_key(|r| r.from); + let mut merged: Vec = Vec::with_capacity(ranges.len()); + let mut iter = ranges.into_iter(); + let mut current = iter.next().unwrap(); + for r in iter { + if r.from <= current.to.saturating_add(1) { + current.to = current.to.max(r.to); + } else { + merged.push(current); + current = r; + } + } + merged.push(current); + merged +} + +#[cfg(test)] +mod row_range_tests { + use super::*; + + fn file_meta_with_row_id(first_row_id: Option, row_count: i64) -> DataFileMeta { + DataFileMeta { + file_name: "test.parquet".into(), + file_size: 128, + row_count, + min_key: Vec::new(), + max_key: Vec::new(), + key_stats: crate::spec::stats::BinaryTableStats::new( + Vec::new(), + Vec::new(), + Vec::new(), + ), + value_stats: crate::spec::stats::BinaryTableStats::new( + Vec::new(), + Vec::new(), + Vec::new(), + ), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: Vec::new(), + creation_time: Some(chrono::Utc::now()), + delete_row_count: None, + embedded_index: None, + first_row_id, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + #[test] + fn test_row_range_overlaps_inclusive_touching() { + // [5, 10] overlaps [10, 15] because row 10 is in both + let r = RowRange::new(5, 10); + assert!(r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_adjacent_no_overlap() { + // [5, 9] does NOT overlap [10, 15] + let r = RowRange::new(5, 9); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_disjoint_before() { + let r = RowRange::new(5, 8); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_disjoint_after() { + let r = RowRange::new(20, 30); + assert!(!r.overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_subset() { + assert!(RowRange::new(12, 14).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_superset() { + assert!(RowRange::new(5, 20).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_partial_left() { + assert!(RowRange::new(8, 12).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_overlaps_inclusive_partial_right() { + assert!(RowRange::new(14, 20).overlaps_inclusive(10, 15)); + } + + #[test] + fn test_row_range_intersect_inclusive_no_overlap() { + assert_eq!(RowRange::new(0, 5).intersect_inclusive(10, 15), None); + } + + #[test] + fn test_row_range_intersect_inclusive_partial() { + assert_eq!( + RowRange::new(8, 12).intersect_inclusive(10, 15), + Some(RowRange::new(10, 12)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_subset() { + assert_eq!( + RowRange::new(11, 14).intersect_inclusive(10, 15), + Some(RowRange::new(11, 14)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_superset() { + assert_eq!( + RowRange::new(5, 20).intersect_inclusive(10, 15), + Some(RowRange::new(10, 15)) + ); + } + + #[test] + fn test_row_range_intersect_inclusive_touching_end() { + assert_eq!( + RowRange::new(5, 10).intersect_inclusive(10, 15), + Some(RowRange::new(10, 10)) + ); + } + + #[test] + fn test_merge_row_ranges_non_overlapping() { + let merged = merge_row_ranges(vec![RowRange::new(0, 4), RowRange::new(10, 15)]); + assert_eq!(merged, vec![RowRange::new(0, 4), RowRange::new(10, 15)]); + } + + #[test] + fn test_merge_row_ranges_overlapping() { + let merged = merge_row_ranges(vec![RowRange::new(0, 10), RowRange::new(5, 15)]); + assert_eq!(merged, vec![RowRange::new(0, 15)]); + } + + #[test] + fn test_merge_row_ranges_adjacent() { + // [0,5] and [6,10] are adjacent and should merge to [0,10] + let merged = merge_row_ranges(vec![RowRange::new(0, 5), RowRange::new(6, 10)]); + assert_eq!(merged, vec![RowRange::new(0, 10)]); + } + + #[test] + fn test_merge_row_ranges_unsorted() { + let merged = merge_row_ranges(vec![ + RowRange::new(10, 20), + RowRange::new(0, 5), + RowRange::new(3, 12), + ]); + assert_eq!(merged, vec![RowRange::new(0, 20)]); + } + + #[test] + fn test_merge_row_ranges_single() { + assert_eq!( + merge_row_ranges(vec![RowRange::new(5, 10)]), + vec![RowRange::new(5, 10)] + ); + } + + #[test] + fn test_merge_row_ranges_empty() { + assert!(merge_row_ranges(Vec::new()).is_empty()); + } + + #[test] + fn test_any_range_overlaps_file_with_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!(any_range_overlaps_file( + &[RowRange::new(0, 5), RowRange::new(12, 20)], + &file + )); + } + + #[test] + fn test_any_range_overlaps_file_no_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!(!any_range_overlaps_file( + &[RowRange::new(0, 5), RowRange::new(20, 30)], + &file + )); + } + + #[test] + fn test_any_range_overlaps_file_no_first_row_id() { + let file = file_meta_with_row_id(None, 5); + assert!(any_range_overlaps_file(&[RowRange::new(0, 5)], &file)); + } + + #[test] + fn test_intersect_ranges_with_file_partial_overlap() { + // file row_id_range = [10, 19] + let file = file_meta_with_row_id(Some(10), 10); + let result = + intersect_ranges_with_file(&[RowRange::new(5, 14), RowRange::new(18, 25)], &file); + assert_eq!(result, vec![RowRange::new(10, 14), RowRange::new(18, 19)]); + } + + #[test] + fn test_intersect_ranges_with_file_no_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert!( + intersect_ranges_with_file(&[RowRange::new(0, 5), RowRange::new(20, 30)], &file) + .is_empty() + ); + } + + #[test] + fn test_intersect_ranges_with_file_full_overlap() { + // file row_id_range = [10, 14] + let file = file_meta_with_row_id(Some(10), 5); + assert_eq!( + intersect_ranges_with_file(&[RowRange::new(0, 100)], &file), + vec![RowRange::new(10, 14)] + ); + } + + #[test] + fn test_intersect_ranges_with_file_no_first_row_id() { + let file = file_meta_with_row_id(None, 5); + assert!(intersect_ranges_with_file(&[RowRange::new(0, 100)], &file).is_empty()); + } + + #[test] + fn test_row_range_count_and_empty() { + let r = RowRange::new(5, 10); + assert_eq!(r.count(), 6); // rows 5,6,7,8,9,10 + } +} + // ======================= DeletionFile =============================== /// Deletion file for a data file: describes a region in a file that stores deletion vector bitmap. @@ -110,6 +428,7 @@ pub struct DataSplit { /// `false` when files need column-wise merge (e.g. data evolution) or /// key-value merge (e.g. primary key tables without deletion vectors). raw_convertible: bool, + row_ranges: Option>, } impl DataSplit { @@ -143,6 +462,10 @@ impl DataSplit { self.raw_convertible } + pub fn row_ranges(&self) -> Option<&[RowRange]> { + self.row_ranges.as_deref() + } + /// Returns the deletion file for the data file at the given index, if any. `None` at that index means no deletion file. pub fn deletion_file_for_data_file_index(&self, index: usize) -> Option<&DeletionFile> { self.data_deletion_files @@ -274,6 +597,7 @@ pub struct DataSplitBuilder { /// Same length as data_files; `None` at index i = no deletion file for data_files[i]. data_deletion_files: Option>>, raw_convertible: bool, + row_ranges: Option>, } impl DataSplitBuilder { @@ -287,6 +611,7 @@ impl DataSplitBuilder { data_files: None, data_deletion_files: None, raw_convertible: false, + row_ranges: None, } } @@ -329,6 +654,11 @@ impl DataSplitBuilder { self } + pub fn with_row_ranges(mut self, row_ranges: Vec) -> Self { + self.row_ranges = Some(row_ranges); + self + } + pub fn build(self) -> crate::Result { if self.snapshot_id == -1 { return Err(crate::Error::UnexpectedError { @@ -381,6 +711,7 @@ impl DataSplitBuilder { data_files, data_deletion_files: self.data_deletion_files, raw_convertible: self.raw_convertible, + row_ranges: self.row_ranges, }) } } diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index da2dc240..4e18745c 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -34,7 +34,10 @@ use crate::spec::{ ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, }; use crate::table::bin_pack::split_for_batch; -use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; +use crate::table::source::{ + any_range_overlaps_file, intersect_ranges_with_file, merge_row_ranges, DataSplit, + DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, +}; use crate::table::SnapshotManager; use crate::table::TagManager; use crate::Error; @@ -304,6 +307,7 @@ pub struct TableScan<'a> { /// Optional limit on the number of rows to return. /// When set, the scan will try to return only enough splits to satisfy the limit. limit: Option, + row_ranges: Option>, } impl<'a> TableScan<'a> { @@ -313,6 +317,7 @@ impl<'a> TableScan<'a> { data_predicates: Vec, bucket_predicate: Option, limit: Option, + row_ranges: Option>, ) -> Self { Self { table, @@ -320,6 +325,7 @@ impl<'a> TableScan<'a> { data_predicates, bucket_predicate, limit, + row_ranges, } } @@ -621,6 +627,16 @@ impl<'a> TableScan<'a> { .collect() }; + // Filter groups by row ID ranges. + let row_id_groups = if let Some(ref ranges) = self.row_ranges { + row_id_groups + .into_iter() + .filter(|group| group.iter().any(|f| any_range_overlaps_file(ranges, f))) + .collect() + } else { + row_id_groups + }; + let (singles, multis): (Vec<_>, Vec<_>) = row_id_groups .into_iter() .partition(|group| group.len() == 1); @@ -651,6 +667,22 @@ impl<'a> TableScan<'a> { .collect::>>() }); + // Compute row_ranges before moving file_group to avoid clone + let split_row_ranges = if let Some(ref ranges) = self.row_ranges { + let mut split_ranges = Vec::new(); + for file in &file_group { + split_ranges.extend(intersect_ranges_with_file(ranges, file)); + } + let split_ranges = merge_row_ranges(split_ranges); + if split_ranges.is_empty() { + None + } else { + Some(split_ranges) + } + } else { + None + }; + let mut builder = DataSplitBuilder::new() .with_snapshot(snapshot_id) .with_partition(partition_row.clone()) @@ -662,14 +694,16 @@ impl<'a> TableScan<'a> { if let Some(files) = data_deletion_files { builder = builder.with_data_deletion_files(files); } + if let Some(row_ranges) = split_row_ranges { + builder = builder.with_row_ranges(row_ranges); + } splits.push(builder.build()?); } } - // Apply limit pushdown only when there are no data predicates. - // With data predicates, merged_row_count() reflects pre-filter row counts, - // so stopping early could return fewer rows than the limit after filtering. - let splits = if self.data_predicates.is_empty() { + // With data predicates or row_ranges, merged_row_count() reflects pre-filter + // row counts, so stopping early could return fewer rows than the limit. + let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { self.apply_limit_pushdown(splits) } else { splits