diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index f010dd8b..c2fde30b 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -30,7 +30,7 @@ mod common; use common::{ collect_id_name, collect_id_value, create_handler, create_test_env, row_count, setup_handler, }; -use datafusion::arrow::array::{Int32Array, StringArray}; +use datafusion::arrow::array::{Array, Int32Array, StringArray}; use paimon::catalog::Identifier; use paimon::Catalog; @@ -75,6 +75,106 @@ async fn test_pk_basic_write_read() { ); } +/// Partial-update merge engine: keep latest non-null value for each field. +#[tokio::test] +async fn test_pk_partial_update_fixed_bucket_e2e() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_partial_update ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1', 'merge-engine' = 'partial-update')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_partial_update VALUES + (1, 10, 'old-1'), + (2, 20, 'old-2')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_partial_update VALUES + (1, CAST(NULL AS INT), 'new-1'), + (2, 200, CAST(NULL AS STRING)), + (3, 30, CAST(NULL AS STRING))", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_partial_update VALUES + (1, 111, CAST(NULL AS STRING))", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_partial_update ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ints = batch + .column_by_name("v_int") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let strs = batch + .column_by_name("v_str") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + ids.value(i), + if ints.is_null(i) { + None + } else { + Some(ints.value(i)) + }, + if strs.is_null(i) { + None + } else { + Some(strs.value(i).to_string()) + }, + )); + } + } + + assert_eq!( + rows, + vec![ + (1, Some(111), Some("new-1".to_string())), + (2, Some(200), Some("old-2".to_string())), + (3, Some(30), None), + ] + ); +} + // ======================= Dedup Within Single Commit ======================= /// Duplicate keys in a single INSERT — last value wins (Deduplicate engine). diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 21d3ee1b..c1abb224 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -66,6 +66,8 @@ const DEFAULT_DYNAMIC_BUCKET_TARGET_ROW_NUM: i64 = 200_000; pub enum MergeEngine { /// Keep the row with the highest sequence number (default). Deduplicate, + /// Merge same-key rows field-by-field, usually keeping non-null updates. + PartialUpdate, /// Keep the first row for each key (ignore later updates). FirstRow, } @@ -124,6 +126,7 @@ impl<'a> CoreOptions<'a> { None => Ok(MergeEngine::Deduplicate), Some(v) => match v.to_ascii_lowercase().as_str() { "deduplicate" => Ok(MergeEngine::Deduplicate), + "partial-update" => Ok(MergeEngine::PartialUpdate), "first-row" => Ok(MergeEngine::FirstRow), other => Err(crate::Error::Unsupported { message: format!("Unsupported merge-engine: '{other}'"), @@ -498,6 +501,14 @@ mod tests { } } + #[test] + fn test_merge_engine_accepts_partial_update() { + let options = HashMap::from([(MERGE_ENGINE_OPTION.to_string(), "partial-update".into())]); + let core = CoreOptions::new(&options); + + assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate); + } + #[test] fn test_commit_options_defaults() { let options = HashMap::new(); diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 961e4148..bfefcebe 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -29,6 +29,9 @@ mod core_options; pub(crate) use core_options::TimeTravelSelector; pub use core_options::*; +mod partial_update; +pub(crate) use partial_update::PartialUpdateConfig; + mod schema; pub use schema::*; diff --git a/crates/paimon/src/spec/partial_update.rs b/crates/paimon/src/spec/partial_update.rs new file mode 100644 index 00000000..b7ae1b6d --- /dev/null +++ b/crates/paimon/src/spec/partial_update.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +const MERGE_ENGINE_OPTION: &str = "merge-engine"; +const PARTIAL_UPDATE_ENGINE: &str = "partial-update"; +const IGNORE_DELETE_OPTION: &str = "ignore-delete"; +const IGNORE_DELETE_SUFFIX: &str = ".ignore-delete"; +const PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION: &str = + "partial-update.remove-record-on-delete"; +const PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION: &str = + "partial-update.remove-record-on-sequence-group"; +const FIELDS_DEFAULT_AGG_FUNCTION_OPTION: &str = "fields.default-aggregate-function"; +const FIELDS_PREFIX: &str = "fields."; +const SEQUENCE_GROUP_SUFFIX: &str = ".sequence-group"; +const AGGREGATION_FUNCTION_SUFFIX: &str = ".aggregate-function"; + +/// Minimal partial-update mode recognized by the current Rust implementation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PartialUpdateMode { + Basic, +} + +/// Partial-update-specific option inspection and validation. +/// +/// PR1 only recognizes the basic mode: `merge-engine=partial-update` on a PK +/// table without delete, sequence-group, or aggregation controls. +#[derive(Debug, Clone, Copy)] +pub(crate) struct PartialUpdateConfig<'a> { + options: &'a HashMap, +} + +impl<'a> PartialUpdateConfig<'a> { + pub(crate) fn new(options: &'a HashMap) -> Self { + Self { options } + } + + pub(crate) fn is_enabled(&self) -> bool { + self.options + .get(MERGE_ENGINE_OPTION) + .is_some_and(|value| value.eq_ignore_ascii_case(PARTIAL_UPDATE_ENGINE)) + } + + pub(crate) fn validate_create_mode( + &self, + has_primary_keys: bool, + ) -> crate::Result> { + match self.validated_mode(has_primary_keys) { + Ok(mode) => Ok(mode), + Err(unsupported_options) => Err(crate::Error::ConfigInvalid { + message: format!( + "merge-engine=partial-update only supports the basic mode in this build; unsupported options: {}", + unsupported_options.join(", ") + ), + }), + } + } + + pub(crate) fn validate_runtime_mode( + &self, + has_primary_keys: bool, + table_name: &str, + ) -> crate::Result> { + match self.validated_mode(has_primary_keys) { + Ok(mode) => Ok(mode), + Err(unsupported_options) => Err(crate::Error::Unsupported { + message: format!( + "Table '{table_name}' uses merge-engine=partial-update options not supported by this build: {}", + unsupported_options.join(", ") + ), + }), + } + } + + fn validated_mode( + &self, + has_primary_keys: bool, + ) -> std::result::Result, Vec> { + if !has_primary_keys || !self.is_enabled() { + return Ok(None); + } + + let unsupported_options = self.unsupported_option_keys(); + if !unsupported_options.is_empty() { + return Err(unsupported_options); + } + + Ok(Some(PartialUpdateMode::Basic)) + } + + fn unsupported_option_keys(&self) -> Vec { + let mut keys: Vec = self + .options + .keys() + .filter(|key| is_unsupported_partial_update_option(key)) + .cloned() + .collect(); + keys.sort(); + keys + } +} + +fn is_unsupported_partial_update_option(key: &str) -> bool { + key == IGNORE_DELETE_OPTION + || key.ends_with(IGNORE_DELETE_SUFFIX) + || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION + || key == PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION + || key == FIELDS_DEFAULT_AGG_FUNCTION_OPTION + || is_fields_option_with_suffix(key, SEQUENCE_GROUP_SUFFIX) + || is_fields_option_with_suffix(key, AGGREGATION_FUNCTION_SUFFIX) +} + +fn is_fields_option_with_suffix(key: &str, suffix: &str) -> bool { + key.starts_with(FIELDS_PREFIX) && key.ends_with(suffix) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn partial_update_options(extra: &[(&str, &str)]) -> HashMap { + let mut options = HashMap::from([( + MERGE_ENGINE_OPTION.to_string(), + PARTIAL_UPDATE_ENGINE.to_string(), + )]); + options.extend( + extra + .iter() + .map(|(key, value)| ((*key).to_string(), (*value).to_string())), + ); + options + } + + #[test] + fn test_validate_create_mode_accepts_basic_pk_partial_update() { + let options = partial_update_options(&[]); + let config = PartialUpdateConfig::new(&options); + + assert_eq!( + config.validate_create_mode(true).unwrap(), + Some(PartialUpdateMode::Basic) + ); + } + + #[test] + fn test_validate_create_mode_ignores_non_pk_tables() { + let options = partial_update_options(&[(IGNORE_DELETE_OPTION, "true")]); + let config = PartialUpdateConfig::new(&options); + + assert_eq!(config.validate_create_mode(false).unwrap(), None); + } + + #[test] + fn test_validate_create_mode_rejects_unsupported_partial_update_options() { + for key in [ + IGNORE_DELETE_OPTION, + "partial-update.ignore-delete", + PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE_OPTION, + PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP_OPTION, + "fields.price.sequence-group", + "fields.price.aggregate-function", + FIELDS_DEFAULT_AGG_FUNCTION_OPTION, + ] { + let options = partial_update_options(&[(key, "value")]); + let config = PartialUpdateConfig::new(&options); + let err = config.validate_create_mode(true).unwrap_err(); + + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } if message.contains(key)), + "expected create-time rejection to mention '{key}', got {err:?}" + ); + } + } + + #[test] + fn test_validate_runtime_mode_rejects_unsupported_partial_update_options() { + let options = + partial_update_options(&[("fields.price.aggregate-function", "last_non_null")]); + let config = PartialUpdateConfig::new(&options); + let err = config.validate_runtime_mode(true, "default.t").unwrap_err(); + + assert!( + matches!(err, crate::Error::Unsupported { ref message } if message.contains("fields.price.aggregate-function")), + "expected runtime rejection to mention the unsupported option, got {err:?}" + ); + } +} diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 370a14cc..21c3364b 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -17,6 +17,7 @@ use crate::spec::core_options::CoreOptions; use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; +use crate::spec::PartialUpdateConfig; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::{HashMap, HashSet}; @@ -264,6 +265,7 @@ impl Schema { let partition_keys = Self::normalize_partition_keys(&partition_keys, &mut options)?; let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?; Self::validate_blob_fields(&fields, &partition_keys, &options)?; + PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?; Ok(Self { fields, @@ -892,6 +894,29 @@ mod tests { assert_eq!(schema.fields().len(), 2); } + #[test] + fn test_partial_update_schema_validation_rejects_unsupported_options() { + for (key, value) in [ + ("ignore-delete", "true"), + ("fields.value.sequence-group", "g1"), + ("fields.default-aggregate-function", "last_non_null"), + ] { + let err = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "partial-update") + .option(key, value) + .build() + .unwrap_err(); + + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } if message.contains(key)), + "partial-update create-time validation should reject '{key}', got {err:?}" + ); + } + } + #[test] fn test_schema_builder_column_row_type() { let row_type = RowType::new(vec![DataField::new( diff --git a/crates/paimon/src/table/bucket_assigner_cross.rs b/crates/paimon/src/table/bucket_assigner_cross.rs index e60c9ebe..a021761b 100644 --- a/crates/paimon/src/table/bucket_assigner_cross.rs +++ b/crates/paimon/src/table/bucket_assigner_cross.rs @@ -147,19 +147,19 @@ impl GlobalPartitionIndex { } /// Assign a bucket for the given primary key targeting `new_partition`. - fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) -> AssignResult { + fn assign(&mut self, pk_bytes: &[u8], new_partition: &[u8]) -> Result { if let Some((existing_partition, existing_bucket)) = self.key_to_location.get(pk_bytes) { if existing_partition == new_partition { - return AssignResult::SamePartition { + return Ok(AssignResult::SamePartition { bucket: *existing_bucket, - }; + }); } // Key exists in a different partition match self.merge_engine { MergeEngine::FirstRow => { // FIRST_ROW: keep old data, discard new row - return AssignResult::Skip; + return Ok(AssignResult::Skip); } MergeEngine::Deduplicate => { let old_partition = existing_partition.clone(); @@ -181,11 +181,16 @@ impl GlobalPartitionIndex { self.key_to_location .insert(pk_bytes.to_vec(), (new_partition.to_vec(), new_bucket)); - return AssignResult::CrossPartition { + return Ok(AssignResult::CrossPartition { old_partition, old_bucket, new_bucket, - }; + }); + } + MergeEngine::PartialUpdate => { + return Err(crate::Error::Unsupported { + message: "CrossPartitionAssigner does not support merge-engine=partial-update yet".to_string(), + }); } } } @@ -193,7 +198,7 @@ impl GlobalPartitionIndex { let bucket = self.assign_bucket_in_partition(new_partition); self.key_to_location .insert(pk_bytes.to_vec(), (new_partition.to_vec(), bucket)); - AssignResult::SamePartition { bucket } + Ok(AssignResult::SamePartition { bucket }) } fn assign_bucket_in_partition(&mut self, partition: &[u8]) -> i32 { @@ -309,7 +314,7 @@ impl BucketAssigner for CrossPartitionAssigner { let mut skips = Vec::new(); for row_idx in 0..num_rows { - match global_index.assign(&pk_bytes_vec[row_idx], &partition_bytes_vec[row_idx]) { + match global_index.assign(&pk_bytes_vec[row_idx], &partition_bytes_vec[row_idx])? { AssignResult::SamePartition { bucket } => { buckets.push(bucket); } diff --git a/crates/paimon/src/table/kv_file_reader.rs b/crates/paimon/src/table/kv_file_reader.rs index 1ad0f087..775a5f0a 100644 --- a/crates/paimon/src/table/kv_file_reader.rs +++ b/crates/paimon/src/table/kv_file_reader.rs @@ -24,11 +24,13 @@ //! Reference: Java Paimon `SortMergeReaderWithMinHeap`. use super::data_file_reader::DataFileReader; -use super::sort_merge::{DeduplicateMergeFunction, SortMergeReaderBuilder}; +use super::sort_merge::{ + DeduplicateMergeFunction, PartialUpdateMergeFunction, SortMergeReaderBuilder, +}; use crate::arrow::build_target_arrow_schema; use crate::io::FileIO; use crate::spec::{ - BigIntType, DataField, DataType as PaimonDataType, Predicate, TinyIntType, + BigIntType, DataField, DataType as PaimonDataType, MergeEngine, Predicate, TinyIntType, SEQUENCE_NUMBER_FIELD_ID, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID, VALUE_KIND_FIELD_NAME, }; @@ -55,6 +57,7 @@ pub(crate) struct KeyValueReadConfig { pub read_type: Vec, pub predicates: Vec, pub primary_keys: Vec, + pub merge_engine: MergeEngine, pub sequence_fields: Vec, } @@ -252,9 +255,8 @@ impl KeyValueFileReader { .data_deletion_files() .is_some_and(|files| files.iter().any(Option::is_some)) { - Err(Error::UnexpectedError { + Err(Error::Unsupported { message: "KeyValueFileReader does not support deletion vectors".to_string(), - source: None, })?; } @@ -303,7 +305,13 @@ impl KeyValueFileReader { user_sequence_indices.clone(), value_indices.clone(), merge_output_schema.clone(), - Box::new(DeduplicateMergeFunction), + match self.config.merge_engine { + MergeEngine::Deduplicate => Box::new(DeduplicateMergeFunction), + MergeEngine::PartialUpdate => Box::new(PartialUpdateMergeFunction), + MergeEngine::FirstRow => Err(Error::Unsupported { + message: "KeyValueFileReader does not support merge-engine=first-row; first-row reads should use the non-KV path".to_string(), + })?, + }, ) .build()?; diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index ef740157..dd8dc329 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -184,16 +184,16 @@ impl KeyValueFileWriter { source: None, })?; - // Deduplicate: for consecutive rows with the same PK, pick the winner. // After sorting by PK + seq fields + auto-seq (all ascending): - // Deduplicate → keep last row per key group (highest seq) - // FirstRow → keep first row per key group (lowest seq) - let deduped_indices = self.dedup_sorted_indices(&combined, &sorted_indices)?; - let deduped_num_rows = deduped_indices.len(); - - // Extract min_key / max_key from deduped endpoints. - let first_row = deduped_indices[0] as usize; - let last_row = deduped_indices[deduped_num_rows - 1] as usize; + // Deduplicate → keep last row per key group (highest seq) + // FirstRow → keep first row per key group (lowest seq) + // PartialUpdate → keep all rows for read-side field-wise merge + let selected_indices = self.select_flush_indices(&combined, &sorted_indices)?; + let selected_num_rows = selected_indices.len(); + + // Extract min_key / max_key from selected endpoints. + let first_row = selected_indices[0] as usize; + let last_row = selected_indices[selected_num_rows - 1] as usize; let min_key = self.extract_key_binary_row(&combined, first_row)?; let max_key = self.extract_key_binary_row(&combined, last_row)?; @@ -228,11 +228,11 @@ impl KeyValueFileWriter { ) .await?; - // Chunked write using deduped indices. - let deduped_u32 = arrow_array::UInt32Array::from(deduped_indices); - for chunk_start in (0..deduped_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) { - let chunk_len = Self::FLUSH_CHUNK_ROWS.min(deduped_num_rows - chunk_start); - let chunk_indices = deduped_u32.slice(chunk_start, chunk_len); + // Chunked write using selected indices. + let selected_u32 = arrow_array::UInt32Array::from(selected_indices); + for chunk_start in (0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) { + let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows - chunk_start); + let chunk_indices = selected_u32.slice(chunk_start, chunk_len); let mut physical_columns: Vec> = Vec::new(); // Sequence numbers for this chunk. @@ -293,20 +293,20 @@ impl KeyValueFileWriter { let file_size = writer.close().await? as i64; - // Compute key_stats on deduped data (not the raw combined batch). - let deduped_key_columns: Vec> = - self.config - .primary_key_indices - .iter() - .map(|&idx| { - arrow_select::take::take(combined.column(idx).as_ref(), &deduped_u32, None) - .map_err(|e| crate::Error::DataInvalid { - message: format!("Failed to take key column for stats: {e}"), - source: None, - }) - }) - .collect::>>()?; - let deduped_key_batch = RecordBatch::try_new( + // Compute key_stats on selected output rows (not the raw combined batch). + let selected_key_columns: Vec> = self + .config + .primary_key_indices + .iter() + .map(|&idx| { + arrow_select::take::take(combined.column(idx).as_ref(), &selected_u32, None) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to take key column for stats: {e}"), + source: None, + }) + }) + .collect::>>()?; + let selected_key_batch = RecordBatch::try_new( Arc::new(ArrowSchema::new( self.config .primary_key_indices @@ -314,15 +314,15 @@ impl KeyValueFileWriter { .map(|&idx| user_schema.field(idx).clone()) .collect::>(), )), - deduped_key_columns, + selected_key_columns, ) .map_err(|e| crate::Error::DataInvalid { - message: format!("Failed to build deduped key batch for stats: {e}"), + message: format!("Failed to build selected key batch for stats: {e}"), source: None, })?; let stats_col_indices: Vec = (0..self.config.primary_key_indices.len()).collect(); let key_stats = compute_column_stats( - &deduped_key_batch, + &selected_key_batch, &stats_col_indices, &self.config.primary_key_types, )?; @@ -331,7 +331,7 @@ impl KeyValueFileWriter { let meta = DataFileMeta { file_name, file_size, - row_count: deduped_num_rows as i64, + row_count: selected_num_rows as i64, min_key, max_key, key_stats, @@ -358,7 +358,26 @@ impl KeyValueFileWriter { Ok(()) } - /// Deduplicate sorted indices by primary key using the configured merge engine. + /// Select output row indices from sorted inputs according to merge engine. + /// + /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all ascending). + /// Output: row indices to write in sorted PK order. + fn select_flush_indices( + &self, + batch: &RecordBatch, + sorted_indices: &arrow_array::UInt32Array, + ) -> Result> { + match self.config.merge_engine { + MergeEngine::Deduplicate | MergeEngine::FirstRow => { + self.dedup_sorted_indices(batch, sorted_indices) + } + MergeEngine::PartialUpdate => Ok((0..sorted_indices.len()) + .map(|idx| sorted_indices.value(idx)) + .collect()), + } + } + + /// Deduplicate sorted indices by primary key for Deduplicate / FirstRow engines. /// /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all ascending). /// Output: a Vec of original row indices to keep, in sorted PK order. @@ -412,6 +431,9 @@ impl KeyValueFileWriter { MergeEngine::Deduplicate => group_winner = cur, // FirstRow: keep first (lowest seq), so don't update. MergeEngine::FirstRow => {} + MergeEngine::PartialUpdate => unreachable!( + "partial-update should use select_flush_indices and skip dedup" + ), } } else { // New key group — emit the winner of the previous group. @@ -470,3 +492,96 @@ pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) -> Arc KeyValueFileWriter { + KeyValueFileWriter::new( + FileIOBuilder::new("memory").build().unwrap(), + KeyValueWriteConfig { + table_location: "memory:/kv-first-row".to_string(), + partition_path: String::new(), + bucket: 0, + schema_id: 0, + file_compression: "none".to_string(), + file_compression_zstd_level: 0, + write_buffer_size: 1024, + primary_key_indices: vec![0], + primary_key_types: vec![DataType::Int(IntType::new())], + sequence_field_indices: vec![1], + merge_engine: MergeEngine::FirstRow, + }, + 0, + ) + } + + #[test] + fn test_dedup_sorted_indices_keeps_first_row_for_first_row_engine() { + let schema = Arc::new(ArrowSchema::new(vec![ + Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), + Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)), + Arc::new(ArrowField::new("value", ArrowDataType::Int32, false)), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as Arc, + Arc::new(Int64Array::from(vec![10, 20, 5, 6])) as Arc, + Arc::new(Int32Array::from(vec![100, 200, 300, 400])) as Arc, + ], + ) + .unwrap(); + let sorted_indices = UInt32Array::from(vec![0, 1, 2, 3]); + + let deduped = first_row_writer() + .dedup_sorted_indices(&batch, &sorted_indices) + .unwrap(); + + assert_eq!(deduped, vec![0, 2]); + } + + #[test] + fn test_select_flush_indices_keeps_all_rows_for_partial_update_engine() { + let schema = Arc::new(ArrowSchema::new(vec![ + Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), + Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 1])) as Arc, + Arc::new(Int64Array::from(vec![10, 20])) as Arc, + ], + ) + .unwrap(); + let sorted_indices = UInt32Array::from(vec![0, 1]); + let writer = KeyValueFileWriter::new( + FileIOBuilder::new("memory").build().unwrap(), + KeyValueWriteConfig { + table_location: "memory:/kv-partial-update".to_string(), + partition_path: String::new(), + bucket: 0, + schema_id: 0, + file_compression: "none".to_string(), + file_compression_zstd_level: 0, + write_buffer_size: 1024, + primary_key_indices: vec![0], + primary_key_types: vec![DataType::Int(IntType::new())], + sequence_field_indices: vec![1], + merge_engine: MergeEngine::PartialUpdate, + }, + 0, + ); + + let selected = writer + .select_flush_indices(&batch, &sorted_indices) + .unwrap(); + + assert_eq!(selected, vec![0, 1]); + } +} diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index e62fc31c..c47b967b 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -338,6 +338,28 @@ mod tests { ) } + fn partial_update_dv_pk_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "partial-update") + .option("deletion-vectors.enabled", "true") + .build() + .unwrap(), + ); + Table::new( + file_io, + Identifier::new("default", "partial_update_dv_t"), + "/tmp/test-partial-update-dv-read-builder".to_string(), + table_schema, + None, + ) + } + #[test] fn test_exact_filter_pushdown_is_true_for_partition_only_filter() { let table = simple_table(); @@ -695,4 +717,20 @@ mod tests { assert_eq!(ranges[0].from(), 0); assert_eq!(ranges[0].to(), 5); } + + #[test] + fn test_direct_table_read_rejects_partial_update_with_deletion_vectors() { + let table = partial_update_dv_pk_table(); + let err = match TableRead::new(&table, table.schema().fields().to_vec(), Vec::new()) + .to_arrow(&[]) + { + Ok(_) => panic!("partial-update+DV read should fail fast"), + Err(err) => err, + }; + + assert!( + matches!(err, crate::Error::Unsupported { ref message } if message.contains("deletion-vectors.enabled=true")), + "expected partial-update+DV read to fail fast with Unsupported, got {err:?}" + ); + } } diff --git a/crates/paimon/src/table/sort_merge.rs b/crates/paimon/src/table/sort_merge.rs index b7dcbd20..dd913455 100644 --- a/crates/paimon/src/table/sort_merge.rs +++ b/crates/paimon/src/table/sort_merge.rs @@ -29,7 +29,7 @@ use crate::spec::RowKind; use crate::table::ArrowRecordBatchStream; use crate::Error; -use arrow_array::{ArrayRef, Int64Array, Int8Array, RecordBatch}; +use arrow_array::{new_null_array, ArrayRef, Int64Array, Int8Array, RecordBatch}; use arrow_row::{RowConverter, Rows, SortField}; use arrow_schema::SchemaRef; use arrow_select::interleave::interleave; @@ -41,6 +41,31 @@ use std::cmp::Ordering; // MergeFunction // --------------------------------------------------------------------------- +/// Buffered batches used by the merge reader. +/// +/// Source batches keep the internal read schema, while materialized batches +/// already match the merge output schema. +#[derive(Clone)] +pub(crate) enum BufferedBatch { + Source(RecordBatch), + Materialized(RecordBatch), +} + +impl BufferedBatch { + fn column_for_output<'a>( + &'a self, + output_col_idx: usize, + source_output_col_indices: &[usize], + ) -> &'a dyn arrow_array::Array { + match self { + Self::Source(batch) => batch + .column(source_output_col_indices[output_col_idx]) + .as_ref(), + Self::Materialized(batch) => batch.column(output_col_idx).as_ref(), + } + } +} + /// A row reference as an index into the batch buffer. pub(crate) struct MergeRow { /// Index into the shared batch buffer. @@ -52,15 +77,56 @@ pub(crate) struct MergeRow { pub user_sequences: Vec>, } +impl MergeRow { + #[allow(dead_code)] + fn source_batch<'a>( + &self, + batch_buffer: &'a [BufferedBatch], + ) -> crate::Result<&'a RecordBatch> { + match batch_buffer.get(self.batch_idx) { + Some(BufferedBatch::Source(batch)) => Ok(batch), + Some(BufferedBatch::Materialized(_)) => Err(Error::UnexpectedError { + message: format!( + "Merge row unexpectedly referenced a materialized batch at index {}", + self.batch_idx + ), + source: None, + }), + None => Err(Error::UnexpectedError { + message: format!( + "Merge row referenced batch index {} outside the current buffer", + self.batch_idx + ), + source: None, + }), + } + } +} + +/// Merge result for rows sharing the same primary key. +pub(crate) enum MergeResult { + /// Reuse an existing source row from the batch buffer. + SourceRow { batch_idx: usize, row_idx: usize }, + /// Emit a synthesized one-row batch matching the merge output schema. + #[allow(dead_code)] + MaterializedRow(RecordBatch), + /// Omit this key from the output. + Omit, +} + /// Merge function applied to rows sharing the same primary key. /// -/// For deduplicate: returns the single winner (batch_idx, row_idx), or None -/// if the winning row should be filtered out (e.g. DELETE). +/// Deduplicate-style engines can keep returning a source row. Future +/// field-wise engines may instead materialize a new output row. pub(crate) trait MergeFunction: Send + Sync { - /// Pick the winning row from same-key candidates. - /// Returns `Some((batch_idx, row_idx))` of the winner, or `None` if the - /// key should be omitted from output (e.g. winner is a DELETE row). - fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result>; + /// Merge all rows sharing the same key into a final output result. + fn merge( + &self, + rows: &[MergeRow], + batch_buffer: &[BufferedBatch], + source_output_col_indices: &[usize], + output_schema: &SchemaRef, + ) -> crate::Result; } /// Deduplicate merge: keeps the row with the highest sequence. @@ -71,19 +137,28 @@ pub(crate) trait MergeFunction: Send + Sync { /// Filters out DELETE and UPDATE_BEFORE rows. pub(crate) struct DeduplicateMergeFunction; +fn compare_sequence_order(lhs: &MergeRow, rhs: &MergeRow) -> Ordering { + match (lhs.user_sequences.is_empty(), rhs.user_sequences.is_empty()) { + (false, false) => lhs + .user_sequences + .cmp(&rhs.user_sequences) + .then_with(|| lhs.sequence_number.cmp(&rhs.sequence_number)), + _ => lhs.sequence_number.cmp(&rhs.sequence_number), + } +} + impl MergeFunction for DeduplicateMergeFunction { - fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result> { + fn merge( + &self, + rows: &[MergeRow], + _batch_buffer: &[BufferedBatch], + _source_output_col_indices: &[usize], + _output_schema: &SchemaRef, + ) -> crate::Result { let winner = rows .iter() .reduce(|best, r| { - // Compare user sequences lexicographically first (if present), then system sequence. - let ord = match (r.user_sequences.is_empty(), best.user_sequences.is_empty()) { - (false, false) => r - .user_sequences - .cmp(&best.user_sequences) - .then_with(|| r.sequence_number.cmp(&best.sequence_number)), - _ => r.sequence_number.cmp(&best.sequence_number), - }; + let ord = compare_sequence_order(r, best); // >= semantics: last-writer-wins for equal values. if ord.is_ge() { r @@ -93,10 +168,96 @@ impl MergeFunction for DeduplicateMergeFunction { }) .expect("merge called with empty rows"); if RowKind::from_value(winner.value_kind)?.is_add() { - Ok(Some((winner.batch_idx, winner.row_idx))) + Ok(MergeResult::SourceRow { + batch_idx: winner.batch_idx, + row_idx: winner.row_idx, + }) } else { - Ok(None) + Ok(MergeResult::Omit) + } + } +} + +/// Basic partial-update merge: for each non-key column, keep the latest +/// non-null value ordered by user sequence (if configured) then system sequence. +/// +/// DELETE / UPDATE_BEFORE rows are treated as unsupported in this mode. +pub(crate) struct PartialUpdateMergeFunction; + +impl MergeFunction for PartialUpdateMergeFunction { + fn merge( + &self, + rows: &[MergeRow], + batch_buffer: &[BufferedBatch], + source_output_col_indices: &[usize], + output_schema: &SchemaRef, + ) -> crate::Result { + if rows.is_empty() { + return Err(Error::UnexpectedError { + message: "merge called with empty rows".to_string(), + source: None, + }); + } + + let mut ordered_row_indices: Vec = (0..rows.len()).collect(); + ordered_row_indices.sort_by(|&lhs_idx, &rhs_idx| { + compare_sequence_order(&rows[lhs_idx], &rows[rhs_idx]) + .then_with(|| lhs_idx.cmp(&rhs_idx)) + }); + + let mut latest_non_null_by_col: Vec> = + vec![None; output_schema.fields().len()]; + + for row_idx in ordered_row_indices { + let row = &rows[row_idx]; + if !RowKind::from_value(row.value_kind)?.is_add() { + return Err(crate::Error::Unsupported { + message: "merge-engine=partial-update basic mode does not support DELETE or UPDATE_BEFORE rows".to_string(), + }); + } + + for (output_col_idx, latest_non_null) in latest_non_null_by_col.iter_mut().enumerate() { + let source_array = batch_buffer[row.batch_idx] + .column_for_output(output_col_idx, source_output_col_indices); + if !source_array.is_null(row.row_idx) { + *latest_non_null = Some((row.batch_idx, row.row_idx)); + } + } } + + let output_columns: Vec = output_schema + .fields() + .iter() + .enumerate() + .map(|(output_col_idx, field)| { + Ok(match latest_non_null_by_col[output_col_idx] { + Some((batch_idx, row_idx)) => batch_buffer[batch_idx] + .column_for_output(output_col_idx, source_output_col_indices) + .slice(row_idx, 1), + None => { + if !field.is_nullable() { + return Err(Error::DataInvalid { + message: format!( + "merge-engine=partial-update produced NULL for non-nullable field '{}'", + field.name() + ), + source: None, + }); + } + new_null_array(field.data_type(), 1) + } + }) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new(output_schema.clone(), output_columns).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to build partial-update materialized row: {e}"), + source: Some(Box::new(e)), + } + })?; + + Ok(MergeResult::MaterializedRow(batch)) } } @@ -405,8 +566,9 @@ fn sort_merge_stream( return Ok(futures::stream::empty().boxed()); } - // Output column indices: key columns + value columns (skip _SEQUENCE_NUMBER). - let output_col_indices: Vec = key_indices + // Output column indices for source batches: key columns + value columns + // (skip system columns like _SEQUENCE_NUMBER). + let source_output_col_indices: Vec = key_indices .iter() .chain(value_indices.iter()) .copied() @@ -440,7 +602,7 @@ fn sort_merge_stream( // Each cursor's current batch gets an entry; when a cursor advances // to a new batch, the old one stays in the buffer until the output // batch is flushed. - let mut batch_buffer: Vec = Vec::new(); + let mut batch_buffer: Vec = Vec::new(); // Map from stream_idx -> current batch_buffer index. let mut stream_batch_idx: Vec> = vec![None; num_streams]; @@ -448,7 +610,7 @@ fn sort_merge_stream( for (i, cursor) in cursors.iter().enumerate() { if let Some(c) = cursor { let idx = batch_buffer.len(); - batch_buffer.push(c.batch.clone()); + batch_buffer.push(BufferedBatch::Source(c.batch.clone())); stream_batch_idx[i] = Some(idx); } } @@ -508,7 +670,7 @@ fn sort_merge_stream( if batch.num_rows() > 0 { let rows = convert_batch_keys(&batch, &key_indices, &mut row_converter)?; let buf_idx = batch_buffer.len(); - batch_buffer.push(batch.clone()); + batch_buffer.push(BufferedBatch::Source(batch.clone())); stream_batch_idx[current_winner] = Some(buf_idx); cursors[current_winner] = Some(SortMergeCursor { batch, rows, offset: 0 }); break; @@ -521,10 +683,36 @@ fn sort_merge_stream( tree.update(|a, b| compare_cursors(&cursors, a, b).then_with(|| a.cmp(&b)).is_gt()); } - // Apply merge function to pick the winner row. - // Returns None if the winning row is a DELETE/UPDATE_BEFORE — skip it. - if let Some((win_batch_idx, win_row_idx)) = merge_function.pick_winner(&same_key_rows)? { - output_indices.push((win_batch_idx, win_row_idx)); + match merge_function.merge( + &same_key_rows, + &batch_buffer, + &source_output_col_indices, + &output_schema, + )? { + MergeResult::SourceRow { batch_idx, row_idx } => { + output_indices.push((batch_idx, row_idx)); + } + MergeResult::MaterializedRow(batch) => { + if batch.num_rows() != 1 { + Err(Error::UnexpectedError { + message: format!( + "Materialized merge result must contain exactly one row, got {}", + batch.num_rows() + ), + source: None, + })?; + } + if batch.schema().as_ref() != output_schema.as_ref() { + Err(Error::UnexpectedError { + message: "Materialized merge result schema does not match merge output schema".to_string(), + source: None, + })?; + } + let batch_idx = batch_buffer.len(); + batch_buffer.push(BufferedBatch::Materialized(batch)); + output_indices.push((batch_idx, 0)); + } + MergeResult::Omit => {} } // Yield a batch when we've accumulated enough rows. @@ -532,13 +720,14 @@ fn sort_merge_stream( let batch = build_output_interleave( &output_schema, &batch_buffer, - &output_col_indices, + &source_output_col_indices, &output_indices, )?; output_indices.clear(); - // Compact batch buffer: only keep batches still referenced by cursors. - // SAFETY: output_indices was just cleared above, so no stale references - // exist into the buffer. The yield below happens after compaction. + // Compact batch buffer after the pending output rows have been + // materialized. Source batches still referenced by cursors stay + // alive; materialized batches can be dropped here because they + // are referenced only by the flushed output_indices above. compact_batch_buffer( &mut batch_buffer, &mut stream_batch_idx, @@ -553,7 +742,7 @@ fn sort_merge_stream( let batch = build_output_interleave( &output_schema, &batch_buffer, - &output_col_indices, + &source_output_col_indices, &output_indices, )?; yield batch; @@ -566,20 +755,18 @@ fn sort_merge_stream( /// batch buffer in one pass per column. fn build_output_interleave( schema: &SchemaRef, - batch_buffer: &[RecordBatch], - output_col_indices: &[usize], + batch_buffer: &[BufferedBatch], + source_output_col_indices: &[usize], indices: &[(usize, usize)], ) -> crate::Result { - let columns: Vec = output_col_indices - .iter() - .map(|&col_idx| { - // Collect all arrays for this column from the batch buffer. + let columns: Vec = (0..schema.fields().len()) + .map(|output_col_idx| { let arrays: Vec<&dyn arrow_array::Array> = batch_buffer .iter() - .map(|b| b.column(col_idx).as_ref()) + .map(|batch| batch.column_for_output(output_col_idx, source_output_col_indices)) .collect(); interleave(&arrays, indices).map_err(|e| Error::UnexpectedError { - message: format!("Failed to interleave column {col_idx}: {e}"), + message: format!("Failed to interleave output column {output_col_idx}: {e}"), source: Some(Box::new(e)), }) }) @@ -594,7 +781,7 @@ fn build_output_interleave( /// Compact the batch buffer by removing batches no longer referenced by any /// cursor, and updating indices accordingly. fn compact_batch_buffer( - batch_buffer: &mut Vec, + batch_buffer: &mut Vec, stream_batch_idx: &mut [Option], cursors: &[Option], ) { @@ -610,7 +797,7 @@ fn compact_batch_buffer( // Build old->new index mapping. let mut new_indices: Vec> = vec![None; batch_buffer.len()]; - let mut new_buffer: Vec = Vec::new(); + let mut new_buffer: Vec = Vec::new(); for (old_idx, is_alive) in alive.iter().enumerate() { if *is_alive { new_indices[old_idx] = Some(new_buffer.len()); @@ -693,6 +880,41 @@ mod tests { futures::stream::iter(batches.into_iter().map(Ok)).boxed() } + struct MaterializingMergeFunction; + + impl MergeFunction for MaterializingMergeFunction { + fn merge( + &self, + rows: &[MergeRow], + batch_buffer: &[BufferedBatch], + source_output_col_indices: &[usize], + output_schema: &SchemaRef, + ) -> crate::Result { + let first = rows.first().expect("merge called with empty rows"); + let source_batch = first.source_batch(batch_buffer)?; + let pk = source_batch + .column(source_output_col_indices[0]) + .as_any() + .downcast_ref::() + .expect("pk column must be Int32") + .value(first.row_idx); + + let batch = RecordBatch::try_new( + output_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![pk])) as ArrayRef, + Arc::new(StringArray::from(vec![Some("merged")])) as ArrayRef, + ], + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build materialized merge batch: {e}"), + source: Some(Box::new(e)), + })?; + + Ok(MergeResult::MaterializedRow(batch)) + } + } + #[tokio::test] async fn test_loser_tree_basic() { // 3 streams, verify init produces correct winner @@ -1264,4 +1486,206 @@ mod tests { assert_eq!(pks, vec![1, 2, 3, 4]); assert_eq!(values, vec!["a", "b", "c", "d"]); } + + #[tokio::test] + async fn test_materialized_merge_result_path() { + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 2], + vec![1, 1], + vec![Some("old_a"), Some("old_b")], + )]); + let s1 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 3], + vec![2, 1], + vec![Some("new_a"), Some("c")], + )]); + + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + make_output_schema(), + Box::new(MaterializingMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + + assert_eq!(pks, vec![1, 2, 3]); + assert_eq!(values, vec!["merged", "merged", "merged"]); + } + + #[tokio::test] + async fn test_partial_update_merge_keeps_latest_non_null_values() { + let schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("_SEQUENCE_NUMBER", DataType::Int64, false), + Field::new("_VALUE_KIND", DataType::Int8, false), + Field::new("v_int", DataType::Int32, true), + Field::new("v_str", DataType::Utf8, true), + ])); + let output_schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("v_int", DataType::Int32, true), + Field::new("v_str", DataType::Utf8, true), + ])); + + let s0 = stream_from_batches(vec![RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int64Array::from(vec![1, 1])), + Arc::new(Int8Array::from(vec![0, 0])), + Arc::new(Int32Array::from(vec![10, 20])), + Arc::new(StringArray::from(vec![Some("old-1"), Some("old-2")])), + ], + ) + .unwrap()]); + let s1 = stream_from_batches(vec![RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![2, 2, 1])), + Arc::new(Int8Array::from(vec![0, 0, 0])), + Arc::new(Int32Array::from(vec![None, Some(200), Some(30)])), + Arc::new(StringArray::from(vec![Some("new-1"), None, None])), + ], + ) + .unwrap()]); + + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3, 4], + output_schema, + Box::new(PartialUpdateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let mut rows: Vec<(i32, Option, Option)> = Vec::new(); + for batch in &result { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ints = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let strs = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + ids.value(i), + if ints.is_null(i) { + None + } else { + Some(ints.value(i)) + }, + if strs.is_null(i) { + None + } else { + Some(strs.value(i).to_string()) + }, + )); + } + } + rows.sort_by_key(|row| row.0); + + assert_eq!( + rows, + vec![ + (1, Some(10), Some("new-1".to_string())), + (2, Some(200), Some("old-2".to_string())), + (3, Some(30), None), + ] + ); + } + + #[tokio::test] + async fn test_partial_update_merge_rejects_delete_like_rows() { + let schema = make_schema(); + let output_schema = make_output_schema(); + let s0 = stream_from_batches(vec![make_batch_with_kind( + &schema, + vec![1], + vec![1], + vec![0], + vec![Some("old")], + )]); + let s1 = stream_from_batches(vec![make_batch_with_kind( + &schema, + vec![1], + vec![2], + vec![3], + vec![Some("delete")], + )]); + + let err = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(PartialUpdateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap_err(); + + assert!(matches!( + err, + Error::Unsupported { message } + if message.contains("partial-update basic mode does not support DELETE or UPDATE_BEFORE") + )); + } } diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs index 1499eafa..f168a10c 100644 --- a/crates/paimon/src/table/table_read.rs +++ b/crates/paimon/src/table/table_read.rs @@ -21,7 +21,7 @@ use super::kv_file_reader::{KeyValueFileReader, KeyValueReadConfig}; use super::read_builder::split_scan_predicates; use super::{ArrowRecordBatchStream, Table}; use crate::arrow::filtering::reader_pruning_predicates; -use crate::spec::{CoreOptions, DataField, Predicate}; +use crate::spec::{CoreOptions, DataField, MergeEngine, PartialUpdateConfig, Predicate}; use crate::DataSplit; /// Table read: reads data from splits (e.g. produced by [TableScan::plan]). @@ -73,18 +73,42 @@ impl<'a> TableRead<'a> { /// Returns an [`ArrowRecordBatchStream`]. pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result { let has_primary_keys = !self.table.schema.primary_keys().is_empty(); + let table_name = self.table.identifier().full_name(); + PartialUpdateConfig::new(self.table.schema().options()) + .validate_runtime_mode(has_primary_keys, &table_name)?; let core_options = CoreOptions::new(self.table.schema.options()); + let merge_engine = core_options.merge_engine()?; + let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); + + if has_primary_keys + && merge_engine == MergeEngine::PartialUpdate + && deletion_vectors_enabled + { + return Err(crate::Error::Unsupported { + message: format!( + "Table '{table_name}' uses merge-engine=partial-update with deletion-vectors.enabled=true, which is not supported yet" + ), + }); + } // PK table with Deduplicate engine: splits containing level-0 files // need KeyValueFileReader for sort-merge dedup; splits with only // compacted files (level > 0) can use the faster DataFileReader. // FirstRow engine falls through — scan already skips level-0. - if has_primary_keys - && core_options - .merge_engine() - .is_ok_and(|e| e == crate::spec::MergeEngine::Deduplicate) - { - return self.read_pk_deduplicate(data_splits, &core_options); + if has_primary_keys { + return match merge_engine { + MergeEngine::Deduplicate => self.read_pk_deduplicate(data_splits, &core_options), + MergeEngine::PartialUpdate => { + self.read_pk_partial_update(data_splits, &core_options) + } + MergeEngine::FirstRow => { + if core_options.data_evolution_enabled() { + self.read_with_evolution(data_splits) + } else { + self.read_raw(data_splits) + } + } + }; } if core_options.data_evolution_enabled() { @@ -125,6 +149,18 @@ impl<'a> TableRead<'a> { ]))) } + /// Read PK table with PartialUpdate engine via KeyValueFileReader. + /// + /// Unlike Deduplicate, partial-update rows cannot be safely treated as raw + /// rows, so all splits go through the merge reader. + fn read_pk_partial_update( + &self, + data_splits: &[DataSplit], + core_options: &CoreOptions, + ) -> crate::Result { + self.read_kv(data_splits, core_options) + } + /// Read splits via KeyValueFileReader (sort-merge dedup). fn read_kv( &self, @@ -140,6 +176,7 @@ impl<'a> TableRead<'a> { read_type: self.read_type().to_vec(), predicates: self.data_predicates.clone(), primary_keys: self.table.schema.trimmed_primary_keys(), + merge_engine: core_options.merge_engine()?, sequence_fields: core_options .sequence_fields() .iter() diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index c6cea462..1a832984 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -31,8 +31,8 @@ use crate::io::FileIO; use crate::predicate_stats::data_leaf_may_match; use crate::spec::{ bucket_dir_name, eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, - IndexManifest, ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, - TimeTravelSelector, + IndexManifest, ManifestEntry, ManifestFileMeta, PartialUpdateConfig, PartitionComputer, + Predicate, Snapshot, TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; use crate::table::source::{ @@ -305,8 +305,25 @@ fn partition_matches_predicate( pub(super) fn can_push_down_limit_hint_for_scan( data_predicates: &[Predicate], row_ranges: Option<&[RowRange]>, + partial_update_enabled: bool, ) -> bool { - data_predicates.is_empty() && row_ranges.is_none() + !partial_update_enabled && data_predicates.is_empty() && row_ranges.is_none() +} + +fn should_skip_level_zero_for_scan( + scan_all_files: bool, + has_primary_keys: bool, + deletion_vectors_enabled: bool, + merge_engine: crate::Result, +) -> bool { + if scan_all_files { + return false; + } + if !has_primary_keys { + return false; + } + + deletion_vectors_enabled || merge_engine.is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow) } /// TableScan for full table scan (no incremental, no predicate). @@ -499,16 +516,12 @@ impl<'a> TableScan<'a> { // // Non-read paths (overwrite, truncate, writer restore) set scan_all_files=true // to see all files including level-0, matching Java's CommitScanner behavior. - let skip_level_zero = if self.scan_all_files { - false - } else if has_primary_keys { - deletion_vectors_enabled - || core_options - .merge_engine() - .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow) - } else { - false - }; + let skip_level_zero = should_skip_level_zero_for_scan( + self.scan_all_files, + has_primary_keys, + deletion_vectors_enabled, + core_options.merge_engine(), + ); let partition_fields = self.table.schema().partition_fields(); @@ -562,7 +575,9 @@ impl<'a> TableScan<'a> { } fn can_push_down_limit_hint(&self, row_ranges: Option<&[RowRange]>) -> bool { - can_push_down_limit_hint_for_scan(&self.data_predicates, row_ranges) + let partial_update_enabled = !self.table.schema().primary_keys().is_empty() + && PartialUpdateConfig::new(self.table.schema().options()).is_enabled(); + can_push_down_limit_hint_for_scan(&self.data_predicates, row_ranges, partial_update_enabled) } async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { @@ -804,7 +819,7 @@ impl<'a> TableScan<'a> { #[cfg(test)] mod tests { - use super::{partition_matches_predicate, TableScan}; + use super::{partition_matches_predicate, should_skip_level_zero_for_scan, TableScan}; use crate::catalog::Identifier; use crate::io::FileIOBuilder; use crate::spec::{ @@ -943,6 +958,25 @@ mod tests { ) } + fn limit_test_partial_update_pk_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "partial-update") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + Table::new( + file_io, + Identifier::new("test_db", "partial_update_table"), + "/tmp/test-partial-update-table".to_string(), + table_schema, + None, + ) + } + fn limit_test_split(file_name: &str, row_count: i64) -> DataSplit { let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); file.file_name = file_name.to_string(); @@ -1021,6 +1055,37 @@ mod tests { ); } + #[test] + fn test_partial_update_disables_limit_pushdown_hint() { + let table = limit_test_partial_update_pk_table(); + let scan = TableScan::new(&table, None, vec![], None, Some(10), None); + + assert!( + !scan.can_push_down_limit_hint(None), + "PK partial-update tables must not use limit pushdown hints" + ); + } + + #[test] + fn test_first_row_skips_level_zero_by_default() { + assert!(should_skip_level_zero_for_scan( + false, + true, + false, + Ok(crate::spec::MergeEngine::FirstRow), + )); + } + + #[test] + fn test_scan_all_files_disables_first_row_level_zero_skip() { + assert!(!should_skip_level_zero_for_scan( + true, + true, + false, + Ok(crate::spec::MergeEngine::FirstRow), + )); + } + #[test] fn test_partition_matches_predicate_decode_failure_fails_open() { let predicate = PredicateBuilder::new(&partition_string_field()) diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 539232b4..3ed0241a 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -23,8 +23,8 @@ use crate::spec::DataFileMeta; use crate::spec::PartitionComputer; use crate::spec::{ - BinaryRow, CoreOptions, DataField, DataType, Datum, MergeEngine, Predicate, PredicateBuilder, - EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET, + BinaryRow, CoreOptions, DataField, DataType, Datum, MergeEngine, PartialUpdateConfig, + Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET, }; use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey}; use crate::table::bucket_assigner_constant::ConstantBucketAssigner; @@ -128,6 +128,19 @@ impl TableWrite { let total_buckets = core_options.bucket(); let has_primary_keys = !schema.primary_keys().is_empty(); + let table_name = table.identifier().full_name(); + let partial_update_mode = PartialUpdateConfig::new(schema.options()) + .validate_runtime_mode(has_primary_keys, &table_name)?; + if partial_update_mode.is_some() && core_options.deletion_vectors_enabled() { + return Err(crate::Error::Unsupported { + message: "TableWrite does not support merge-engine=partial-update with deletion-vectors.enabled=true yet".to_string(), + }); + } + if partial_update_mode.is_some() && total_buckets == -1 { + return Err(crate::Error::Unsupported { + message: "TableWrite does not support merge-engine=partial-update with bucket=-1 yet; currently only fixed-bucket partial-update is supported".to_string(), + }); + } let is_dynamic_bucket = has_primary_keys && total_buckets == -1; let is_cross_partition = is_dynamic_bucket && !schema.partition_keys().is_empty() && { @@ -820,6 +833,85 @@ mod tests { ); } + #[test] + fn test_allows_partial_update_fixed_bucket_table() { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_partial_update_table"), + "memory:/test_partial_update_table".to_string(), + TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "1") + .option("merge-engine", "partial-update") + .build() + .unwrap(), + ), + None, + ); + + TableWrite::new(&table, "test-user".to_string(), false).unwrap(); + } + + #[test] + fn test_rejects_partial_update_dynamic_bucket_table() { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_partial_update_dynamic_bucket_table"), + "memory:/test_partial_update_dynamic_bucket_table".to_string(), + TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "partial-update") + .build() + .unwrap(), + ), + None, + ); + + let err = TableWrite::new(&table, "test-user".to_string(), false) + .err() + .unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("bucket=-1")) + ); + } + + #[test] + fn test_rejects_partial_update_with_deletion_vectors() { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_partial_update_dv_table"), + "memory:/test_partial_update_dv_table".to_string(), + TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "1") + .option("merge-engine", "partial-update") + .option("deletion-vectors.enabled", "true") + .build() + .unwrap(), + ), + None, + ); + + let err = TableWrite::new(&table, "test-user".to_string(), false) + .err() + .unwrap(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("deletion-vectors.enabled=true")) + ); + } + #[tokio::test] async fn test_write_partitioned() { let file_io = test_file_io();