From c3a4df2ebabaf391ce8cc9b35a1af70ae190ff7e Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Wed, 13 May 2026 09:14:27 +0800 Subject: [PATCH] feat: support dynamic-bucket partial-update Signed-off-by: QuakeWang --- .../datafusion/tests/dynamic_bucket_tables.rs | 424 +++++++++++++++++- crates/paimon/src/spec/schema.rs | 36 ++ .../src/table/bucket_assigner_dynamic.rs | 2 +- crates/paimon/src/table/kv_file_writer.rs | 26 -- crates/paimon/src/table/table_write.rs | 72 ++- 5 files changed, 512 insertions(+), 48 deletions(-) diff --git a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs index 59804b6c..4c8cbcb5 100644 --- a/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs +++ b/crates/integrations/datafusion/tests/dynamic_bucket_tables.rs @@ -22,10 +22,10 @@ mod common; use common::{ collect_id_name, collect_id_value, create_sql_context, create_test_env, setup_sql_context, }; -use datafusion::arrow::array::{Int32Array, StringArray}; +use datafusion::arrow::array::{Array, Int32Array, StringArray}; use paimon::catalog::Identifier; use paimon::spec::{IndexManifest, IndexManifestEntry}; -use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options, SnapshotManager}; +use paimon::{Catalog, CatalogOptions, DataSplit, FileSystemCatalog, Options, SnapshotManager}; /// PK table with bucket=-1 (dynamic bucket) should write and read correctly. #[tokio::test] @@ -82,6 +82,268 @@ async fn test_pk_dynamic_bucket() { ); } +async fn collect_partial_update_rows( + sql_context: &paimon_datafusion::SQLContext, + sql: &str, +) -> Vec<(i32, Option, Option)> { + let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ints = batch + .column_by_name("v_int") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let strs = batch + .column_by_name("v_str") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + ids.value(i), + if ints.is_null(i) { + None + } else { + Some(ints.value(i)) + }, + if strs.is_null(i) { + None + } else { + Some(strs.value(i).to_string()) + }, + )); + } + } + rows.sort_by_key(|row| row.0); + rows +} + +#[tokio::test] +async fn test_pk_dynamic_bucket_partial_update() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_dyn_partial_update ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')", + ) + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES + (1, 10, 'old-1'), + (2, 20, 'old-2')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES + (1, CAST(NULL AS INT), 'new-1'), + (2, 200, CAST(NULL AS STRING)), + (3, 30, CAST(NULL AS STRING))", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_partial_update VALUES + (1, 111, CAST(NULL AS STRING))", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_partial_update_rows( + &sql_context, + "SELECT id, v_int, v_str FROM paimon.test_db.t_dyn_partial_update", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, Some(111), Some("new-1".to_string())), + (2, Some(200), Some("old-2".to_string())), + (3, Some(30), None), + ] + ); +} + +async fn latest_splits(table: &paimon::Table) -> Vec { + table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap() + .splits() + .to_vec() +} + +async fn bucket_containing_id(table: &paimon::Table, id: i32) -> i32 { + let read_builder = table.new_read_builder(); + let read = read_builder.new_read().unwrap(); + let splits = latest_splits(table).await; + let mut buckets = Vec::new(); + for split in &splits { + let batches: Vec<_> = + futures::TryStreamExt::try_collect(read.to_arrow(std::slice::from_ref(split)).unwrap()) + .await + .unwrap(); + let contains_id = batches.iter().any(|batch| { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + (0..batch.num_rows()).any(|i| ids.value(i) == id) + }); + if contains_id { + buckets.push(split.bucket()); + } + } + + buckets.sort(); + buckets.dedup(); + assert_eq!( + buckets.len(), + 1, + "id={id} should be readable from exactly one bucket" + ); + buckets[0] +} + +async fn index_bucket_count(table: &paimon::Table) -> usize { + let entries = read_hash_index_entries(table).await; + let mut buckets = entries.iter().map(|entry| entry.bucket).collect::>(); + buckets.sort(); + buckets.dedup(); + buckets.len() +} + +#[tokio::test] +async fn test_pk_dynamic_bucket_partial_update_restores_existing_bucket() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog.clone()).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_dyn_partial_route ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ( + 'bucket' = '-1', + 'dynamic-bucket.target-row-num' = '1', + 'merge-engine' = 'partial-update' + )", + ) + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_partial_route VALUES + (2, 20, 'old-2'), + (1, 10, 'old-1')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) + .await + .unwrap(); + assert_eq!( + index_bucket_count(&table).await, + 2, + "target row number 1 should put two new keys into two HASH index buckets" + ); + let id1_bucket = bucket_containing_id(&table, 1).await; + assert_ne!( + id1_bucket, 0, + "test setup writes id=1 second so missing index restore would allocate a different bucket" + ); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_partial_route VALUES + (1, CAST(NULL AS INT), 'new-1')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_dyn_partial_route")) + .await + .unwrap(); + let id1_bucket_after = bucket_containing_id(&table, 1).await; + assert_eq!( + id1_bucket_after, id1_bucket, + "restored HASH index should route id=1 back to its original bucket" + ); + + let splits = latest_splits(&table).await; + let id1_data_files_in_bucket: usize = splits + .iter() + .filter(|split| split.bucket() == id1_bucket) + .map(|split| split.data_files().len()) + .sum(); + assert_eq!( + id1_data_files_in_bucket, 2, + "id=1 initial row and later partial update should be in the same bucket" + ); + let other_bucket_file_count: usize = splits + .iter() + .filter(|split| split.bucket() != id1_bucket) + .map(|split| split.data_files().len()) + .sum(); + assert_eq!( + other_bucket_file_count, 1, + "id=2 should remain in a separate bucket when target row number is 1" + ); + + let rows = collect_partial_update_rows( + &sql_context, + "SELECT id, v_int, v_str FROM paimon.test_db.t_dyn_partial_route", + ) + .await; + assert_eq!( + rows, + vec![ + (1, Some(10), Some("new-1".to_string())), + (2, Some(20), Some("old-2".to_string())), + ] + ); +} + /// Dynamic bucket with partitioned table. #[tokio::test] async fn test_pk_dynamic_bucket_partitioned() { @@ -160,6 +422,162 @@ async fn test_pk_dynamic_bucket_partitioned() { ); } +#[tokio::test] +async fn test_pk_dynamic_bucket_partitioned_partial_update() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_dyn_part_partial_update ( + dt STRING, id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt) + WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')", + ) + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_part_partial_update VALUES + ('2024-01-01', 1, 10, 'old-a'), + ('2024-01-02', 1, 100, 'old-b')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + sql_context + .sql( + "INSERT INTO paimon.test_db.t_dyn_part_partial_update VALUES + ('2024-01-01', 1, CAST(NULL AS INT), 'new-a'), + ('2024-01-02', 1, 200, CAST(NULL AS STRING))", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = sql_context + .sql( + "SELECT dt, id, v_int, v_str + FROM paimon.test_db.t_dyn_part_partial_update + ORDER BY dt, id", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let dts = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ints = batch + .column_by_name("v_int") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let strs = batch + .column_by_name("v_str") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + dts.value(i).to_string(), + ids.value(i), + if ints.is_null(i) { + None + } else { + Some(ints.value(i)) + }, + if strs.is_null(i) { + None + } else { + Some(strs.value(i).to_string()) + }, + )); + } + } + + assert_eq!( + rows, + vec![ + ( + "2024-01-01".to_string(), + 1, + Some(10), + Some("new-a".to_string()) + ), + ( + "2024-01-02".to_string(), + 1, + Some(200), + Some("old-b".to_string()) + ), + ] + ); +} + +#[tokio::test] +async fn test_rejects_cross_partition_dynamic_bucket_partial_update() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_cross_partial_update ( + dt STRING, id INT NOT NULL, v_int INT, + PRIMARY KEY (id) + ) PARTITIONED BY (dt) + WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')", + ) + .await + .unwrap(); + + let result = sql_context + .sql("INSERT INTO paimon.test_db.t_cross_partial_update VALUES ('2024-01-01', 1, 10)") + .await + .unwrap() + .collect() + .await; + + let err = result.unwrap_err().to_string(); + assert!( + err.contains("cross-partition update"), + "expected cross-partition partial-update rejection, got: {err}" + ); +} + +#[tokio::test] +async fn test_rejects_partition_only_primary_key_partial_update() { + let (_tmp, sql_context) = setup_sql_context().await; + + let result = sql_context + .sql( + "CREATE TABLE paimon.test_db.t_partition_only_pk ( + dt STRING NOT NULL, v_int INT, + PRIMARY KEY (dt) + ) PARTITIONED BY (dt) + WITH ('bucket' = '-1', 'merge-engine' = 'partial-update')", + ) + .await; + + let err = result.unwrap_err().to_string(); + assert!( + err.contains("only one record in a partition"), + "expected partition-only primary key rejection, got: {err}" + ); +} + /// Dynamic bucket with three commits — verifies sequence number tracking. #[tokio::test] async fn test_pk_dynamic_bucket_three_commits() { @@ -225,7 +643,7 @@ async fn read_hash_index_entries(table: &paimon::Table) -> Vec Vec { let path = format!("{}/index/{}", table.location(), file_name); let input = table.file_io().new_input(&path).unwrap(); diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index ddada6ad..e6d5af99 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -356,6 +356,7 @@ impl Schema { Self::validate_no_duplicate_fields(&field_names)?; Self::validate_partition_keys(&field_names, partition_keys)?; Self::validate_primary_keys(&field_names, primary_keys)?; + Self::validate_primary_keys_not_partition_only(partition_keys, primary_keys)?; if primary_keys.is_empty() { return Ok(fields.to_vec()); @@ -445,6 +446,29 @@ impl Schema { Ok(()) } + fn validate_primary_keys_not_partition_only( + partition_keys: &[String], + primary_keys: &[String], + ) -> crate::Result<()> { + if primary_keys.is_empty() || partition_keys.is_empty() { + return Ok(()); + } + + let partition_set: HashSet<&str> = partition_keys.iter().map(String::as_str).collect(); + if primary_keys + .iter() + .all(|pk| partition_set.contains(pk.as_str())) + { + return Err(crate::Error::ConfigInvalid { + message: format!( + "Primary key constraint {primary_keys:?} should not be same with partition fields {partition_keys:?}, this will result in only one record in a partition" + ), + }); + } + + Ok(()) + } + fn validate_blob_fields( fields: &[DataField], partition_keys: &[String], @@ -826,6 +850,18 @@ mod tests { "primary key not in columns should be rejected" ); + // Primary key cannot be fully covered by partition keys. + let res = Schema::builder() + .column("a", DataType::Int(IntType::with_nullable(false))) + .column("b", DataType::Int(IntType::new())) + .partition_keys(["a", "b"]) + .primary_key(["a"]) + .build(); + assert!( + matches!(res, Err(crate::Error::ConfigInvalid { message }) if message.contains("only one record in a partition")), + "primary key fully covered by partition keys should be rejected" + ); + // primary-key in options and DDL at same time let res = Schema::builder() .column("a", DataType::Int(IntType::with_nullable(false))) diff --git a/crates/paimon/src/table/bucket_assigner_dynamic.rs b/crates/paimon/src/table/bucket_assigner_dynamic.rs index 6adafb2e..9db4db9c 100644 --- a/crates/paimon/src/table/bucket_assigner_dynamic.rs +++ b/crates/paimon/src/table/bucket_assigner_dynamic.rs @@ -41,7 +41,7 @@ const HASH_INDEX: &str = "HASH"; /// Read/write hash index files. /// -/// A hash index file is a flat binary file containing `i32` values in little-endian byte order. +/// A hash index file is a flat binary file containing `i32` values in big-endian byte order. /// Each value is the hash code of a primary key that belongs to the associated bucket. struct HashIndexFile; diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index 6b434455..a6d71800 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -78,7 +78,6 @@ pub(crate) struct KeyValueWriteConfig { pub sequence_field_indices: Vec, /// Merge engine for deduplication. pub merge_engine: MergeEngine, - pub dynamic_bucket_enabled: bool, pub deletion_vectors_enabled: bool, } @@ -100,15 +99,6 @@ impl KeyValueFileWriter { ), }); } - - if config.dynamic_bucket_enabled { - return Err(crate::Error::Unsupported { - message: format!( - "Table '{}' uses merge-engine=partial-update with bucket=-1, which is not supported yet; currently only fixed-bucket partial-update is supported", - config.table_name - ), - }); - } } Ok(Self { @@ -553,7 +543,6 @@ mod tests { primary_key_types: vec![DataType::Int(IntType::new())], sequence_field_indices: vec![1], merge_engine, - dynamic_bucket_enabled: false, deletion_vectors_enabled: false, } } @@ -621,21 +610,6 @@ mod tests { assert_eq!(selected, vec![0, 1]); } - #[test] - fn test_new_rejects_partial_update_dynamic_bucket() { - let mut config = test_write_config(MergeEngine::PartialUpdate); - config.dynamic_bucket_enabled = true; - - let err = KeyValueFileWriter::new(FileIOBuilder::new("memory").build().unwrap(), config, 0) - .err() - .unwrap(); - - assert!(matches!( - err, - crate::Error::Unsupported { message } if message.contains("bucket=-1") - )); - } - #[test] fn test_new_rejects_partial_update_with_deletion_vectors() { let mut config = test_write_config(MergeEngine::PartialUpdate); diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 167cc850..49c00198 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -139,13 +139,15 @@ impl TableWrite { let has_primary_keys = !schema.primary_keys().is_empty(); let is_dynamic_bucket = has_primary_keys && total_buckets == -1; - let is_cross_partition = is_dynamic_bucket && !schema.partition_keys().is_empty() && { - let pk_set: HashSet<&str> = schema.primary_keys().iter().map(String::as_str).collect(); - schema - .partition_keys() - .iter() - .any(|p| !pk_set.contains(p.as_str())) - }; + let is_dynamic_cross_partition = + is_dynamic_bucket && !schema.partition_keys().is_empty() && { + let pk_set: HashSet<&str> = + schema.primary_keys().iter().map(String::as_str).collect(); + schema + .partition_keys() + .iter() + .any(|p| !pk_set.contains(p.as_str())) + }; if has_primary_keys && !is_dynamic_bucket @@ -223,6 +225,14 @@ impl TableWrite { let merge_engine = core_options.merge_engine()?; + if is_dynamic_cross_partition && merge_engine == MergeEngine::PartialUpdate { + return Err(crate::Error::Unsupported { + message: + "merge-engine=partial-update with cross-partition update is not supported yet" + .to_string(), + }); + } + if has_primary_keys && core_options.rowkind_field().is_some() { return Err(crate::Error::Unsupported { message: "KeyValueFileWriter does not support rowkind.field".to_string(), @@ -231,7 +241,7 @@ impl TableWrite { let target_bucket_row_number = core_options.dynamic_bucket_target_row_num(); - let bucket_assigner = if is_cross_partition { + let bucket_assigner = if is_dynamic_cross_partition { BucketAssignerEnum::CrossPartition(Box::new(CrossPartitionAssigner::new( table.clone(), partition_field_indices, @@ -676,10 +686,6 @@ impl TableWrite { primary_key_types: self.primary_key_types.clone(), sequence_field_indices: self.sequence_field_indices.clone(), merge_engine: self.merge_engine, - dynamic_bucket_enabled: matches!( - self.bucket_assigner, - BucketAssignerEnum::Dynamic(_) | BucketAssignerEnum::CrossPartition(_) - ), deletion_vectors_enabled: CoreOptions::new(self.table.schema().options()) .deletion_vectors_enabled(), }, @@ -930,7 +936,7 @@ mod tests { } #[tokio::test] - async fn test_rejects_partial_update_dynamic_bucket_table_when_creating_writer() { + async fn test_allows_partial_update_dynamic_bucket_table() { let file_io = test_file_io(); let table_path = "memory:/test_partial_update_dynamic_bucket_table"; setup_dirs(&file_io, table_path).await; @@ -953,13 +959,10 @@ mod tests { ); let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); - let err = table_write + table_write .write_arrow_batch(&make_batch(vec![1], vec![10])) .await - .unwrap_err(); - assert!( - matches!(err, crate::Error::Unsupported { message } if message.contains("bucket=-1")) - ); + .unwrap(); } #[tokio::test] @@ -1900,6 +1903,39 @@ mod tests { )); } + #[test] + fn test_rejects_cross_partition_partial_update() { + let file_io = test_file_io(); + let table_path = "memory:/test_cross_partial_update"; + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .partition_keys(["pt"]) + .option("merge-engine", "partial-update") + .build() + .unwrap(); + let table = Table::new( + file_io, + Identifier::new("default", "test_cross_partial_update"), + table_path.to_string(), + TableSchema::new(0, &schema), + None, + ); + + let err = match TableWrite::new(&table, "test-user".to_string()) { + Ok(_) => panic!("cross-partition partial-update should be rejected"), + Err(err) => err, + }; + + assert!(matches!( + err, + crate::Error::Unsupported { message } + if message.contains("cross-partition update") + )); + } + #[tokio::test] async fn test_cross_partition_write_same_partition() { let file_io = test_file_io();