From 1ab4835261bc118514ca1cafd84b749c222999af Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 15 Apr 2026 19:33:12 +0800 Subject: [PATCH 1/3] feat: add primary-key table read/write support with sort-merge deduplication Add KV file reader/writer, sort-merge reader with LoserTree, and RowKind support to enable reading and writing primary-key tables. Includes integration tests and DataFusion pk_tables tests. --- Cargo.toml | 1 + .../integration_tests/tests/append_tables.rs | 27 - crates/integration_tests/tests/read_tables.rs | 75 +- .../datafusion/tests/pk_tables.rs | 1148 +++++++++++++++ crates/paimon/Cargo.toml | 1 + crates/paimon/src/spec/binary_row.rs | 55 + crates/paimon/src/spec/core_options.rs | 52 + crates/paimon/src/spec/mod.rs | 3 + crates/paimon/src/spec/predicate.rs | 12 + crates/paimon/src/spec/row_kind.rs | 51 + crates/paimon/src/spec/schema.rs | 10 + crates/paimon/src/spec/stats.rs | 56 + crates/paimon/src/table/kv_file_reader.rs | 328 +++++ crates/paimon/src/table/kv_file_writer.rs | 461 ++++++ crates/paimon/src/table/mod.rs | 3 + crates/paimon/src/table/sort_merge.rs | 1267 +++++++++++++++++ crates/paimon/src/table/table_commit.rs | 11 +- crates/paimon/src/table/table_read.rs | 37 +- crates/paimon/src/table/table_scan.rs | 25 +- crates/paimon/src/table/table_write.rs | 427 +++++- crates/paimon/src/table/write_builder.rs | 3 + dev/spark/provision.py | 37 + 22 files changed, 4007 insertions(+), 83 deletions(-) create mode 100644 crates/integrations/datafusion/tests/pk_tables.rs create mode 100644 crates/paimon/src/spec/row_kind.rs create mode 100644 crates/paimon/src/table/kv_file_reader.rs create mode 100644 crates/paimon/src/table/kv_file_writer.rs create mode 100644 crates/paimon/src/table/sort_merge.rs diff --git a/Cargo.toml b/Cargo.toml index fadca8bb..b4011b71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ arrow-buffer = "58.0" arrow-schema = "58.0" arrow-cast = "58.0" arrow-ord = "58.0" +arrow-row = "58.0" arrow-select = "58.0" datafusion = "53.0.0" datafusion-ffi = "53.0.0" diff --git a/crates/integration_tests/tests/append_tables.rs b/crates/integration_tests/tests/append_tables.rs index b2185de9..981c44d9 100644 --- a/crates/integration_tests/tests/append_tables.rs +++ b/crates/integration_tests/tests/append_tables.rs @@ -563,33 +563,6 @@ async fn test_partitioned_fixed_bucket_write_read() { assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]); } -// --------------------------------------------------------------------------- -// Unsupported: primary key table should be rejected -// --------------------------------------------------------------------------- - -#[tokio::test] -async fn test_reject_primary_key_table() { - let schema = Schema::builder() - .column("id", DataType::Int(IntType::new())) - .column("value", DataType::Int(IntType::new())) - .primary_key(["id"]) - .build() - .unwrap(); - let table_schema = TableSchema::new(0, &schema); - - let file_io = memory_file_io(); - let path = "memory:/append_reject_pk"; - let table = make_table(&file_io, path, table_schema); - - let result = table.new_write_builder().new_write(); - assert!(result.is_err()); - let err = result.err().unwrap(); - assert!( - matches!(&err, paimon::Error::Unsupported { message } if message.contains("primary keys")), - "Expected Unsupported error for PK table, got: {err:?}" - ); -} - #[tokio::test] async fn test_reject_fixed_bucket_without_bucket_key() { let schema = Schema::builder() diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 9ed39cf2..b89268c1 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -1478,9 +1478,9 @@ async fn test_read_complex_type_table() { // PK-without-DV and non-PK-with-DV tests // --------------------------------------------------------------------------- -/// Reading a primary-key table without deletion vectors should return an Unsupported error. +/// Reading a primary-key table without deletion vectors should work via sort-merge reader. #[tokio::test] -async fn test_read_pk_table_without_dv_returns_error() { +async fn test_read_pk_table_without_dv_via_sort_merge() { let catalog = create_file_system_catalog(); let table = get_table_from_catalog(&catalog, "simple_pk_table").await; @@ -1493,16 +1493,73 @@ async fn test_read_pk_table_without_dv_returns_error() { ); let read = table.new_read_builder().new_read(); - let result = read + let stream = read .expect("new_read should succeed") - .to_arrow(plan.splits()); - let err = result - .err() - .expect("Reading PK table without DV should fail"); + .to_arrow(plan.splits()) + .expect("to_arrow should succeed for PK table via sort-merge"); + let batches: Vec<_> = futures::TryStreamExt::try_collect(stream) + .await + .expect("Reading PK table without DV should succeed via sort-merge reader"); assert!( - matches!(&err, Error::Unsupported { message } if message.contains("primary-key")), - "Expected Unsupported error about primary-key tables, got: {err:?}" + !batches.is_empty(), + "PK table read should return non-empty results" + ); + + let actual = extract_id_name(&batches); + let expected = vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + ]; + assert_eq!( + actual, expected, + "PK table without DV should return correct rows via sort-merge reader" + ); +} + +/// Reading a first-row merge engine PK table should return only the first-inserted row per key. +/// The table has been compacted so all files are level > 0, and the scan skips level-0 files. +#[tokio::test] +async fn test_read_first_row_pk_table() { + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "first_row_pk_table").await; + + let read_builder = table.new_read_builder(); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.expect("Failed to plan scan"); + assert!( + !plan.splits().is_empty(), + "first-row PK table should have splits to read" + ); + + let read = table.new_read_builder().new_read(); + let stream = read + .expect("new_read should succeed") + .to_arrow(plan.splits()) + .expect("to_arrow should succeed for first-row PK table"); + + let batches: Vec<_> = futures::TryStreamExt::try_collect(stream) + .await + .expect("Reading first-row PK table should succeed"); + assert!( + !batches.is_empty(), + "first-row PK table read should return non-empty results" + ); + + let actual = extract_id_name(&batches); + // first-row keeps the earliest row per key: + // commit 1: (1, alice), (2, bob), (3, carol) + // commit 2: (2, bob-v2), (3, carol-v2), (4, dave) — id=2,3 ignored, id=4 is new + let expected = vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + (4, "dave".to_string()), + ]; + assert_eq!( + actual, expected, + "first-row PK table should keep earliest row per key" ); } diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs new file mode 100644 index 00000000..c93f5e29 --- /dev/null +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -0,0 +1,1148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! E2E integration tests for primary-key tables via DataFusion SQL. +//! +//! Covers: basic write+read, dedup within/across commits, partitioned PK tables, +//! multi-bucket, column projection, FirstRow merge engine, sequence.field, +//! INSERT OVERWRITE, filter pushdown, and error cases. + +use std::sync::Arc; + +use datafusion::arrow::array::{Int32Array, StringArray}; +use datafusion::prelude::SessionContext; +use paimon::{CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler}; +use tempfile::TempDir; + +// ======================= Helpers ======================= + +fn create_test_env() -> (TempDir, Arc) { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let warehouse = format!("file://{}", temp_dir.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + (temp_dir, Arc::new(catalog)) +} + +fn create_handler(catalog: Arc) -> PaimonSqlHandler { + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(catalog.clone())), + ); + ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new())) + .expect("Failed to register relation planner"); + PaimonSqlHandler::new(ctx, catalog, "paimon") +} + +async fn setup_handler() -> (TempDir, PaimonSqlHandler) { + let (tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + (tmp, handler) +} + +async fn collect_id_name(handler: &PaimonSqlHandler, sql: &str) -> Vec<(i32, String)> { + let batches = handler.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id column"); + let names = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name column"); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), names.value(i).to_string())); + } + } + rows.sort_by_key(|(id, _)| *id); + rows +} + +async fn collect_id_value(handler: &PaimonSqlHandler, sql: &str) -> Vec<(i32, i32)> { + let batches = handler.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id column"); + let vals = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("value column"); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), vals.value(i))); + } + } + rows.sort_by_key(|(id, _)| *id); + rows +} + +async fn row_count(handler: &PaimonSqlHandler, sql: &str) -> usize { + let batches = handler.sql(sql).await.unwrap().collect().await.unwrap(); + batches.iter().map(|b| b.num_rows()).sum() +} + +// ======================= Basic PK Write + Read ======================= + +/// Basic: CREATE TABLE with PK, INSERT, SELECT — verifies round-trip. +#[tokio::test] +async fn test_pk_basic_write_read() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t1 ( + id INT NOT NULL, name STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t1 VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_name( + &handler, + "SELECT id, name FROM paimon.test_db.t1 ORDER BY id", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + ] + ); +} + +// ======================= Dedup Within Single Commit ======================= + +/// Duplicate keys in a single INSERT — last value wins (Deduplicate engine). +#[tokio::test] +async fn test_pk_dedup_within_single_commit() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_dedup ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_dedup VALUES (1, 10), (2, 20), (1, 100), (2, 200)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_value( + &handler, + "SELECT id, value FROM paimon.test_db.t_dedup ORDER BY id", + ) + .await; + + // Last occurrence wins for deduplicate merge engine + assert_eq!(rows, vec![(1, 100), (2, 200)]); +} + +// ======================= Dedup Across Commits ======================= + +/// Two commits with overlapping keys — second commit's values win. +#[tokio::test] +async fn test_pk_dedup_across_commits() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_cross ( + id INT NOT NULL, name STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + // First commit + handler + .sql("INSERT INTO paimon.test_db.t_cross VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Second commit: update id=1,3, add id=4 + handler + .sql("INSERT INTO paimon.test_db.t_cross VALUES (1, 'alice-v2'), (3, 'carol-v2'), (4, 'dave')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_name( + &handler, + "SELECT id, name FROM paimon.test_db.t_cross ORDER BY id", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, "alice-v2".to_string()), + (2, "bob".to_string()), + (3, "carol-v2".to_string()), + (4, "dave".to_string()), + ] + ); +} + +// ======================= Three Commits ======================= + +/// Three successive commits — verifies sequence number tracking across commits. +#[tokio::test] +async fn test_pk_three_commits() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_three ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_three VALUES (1, 10), (2, 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_three VALUES (2, 200), (3, 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_three VALUES (1, 100), (3, 300), (4, 40)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_value( + &handler, + "SELECT id, value FROM paimon.test_db.t_three ORDER BY id", + ) + .await; + + assert_eq!(rows, vec![(1, 100), (2, 200), (3, 300), (4, 40)]); +} + +// ======================= Partitioned PK Table ======================= + +/// Partitioned PK table: dedup happens per-partition independently. +#[tokio::test] +async fn test_pk_partitioned_write_read() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_part ( + dt STRING, id INT NOT NULL, name STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_part VALUES \ + ('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \ + ('2024-01-02', 1, 'carol'), ('2024-01-02', 2, 'dave')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_name( + &handler, + "SELECT id, name FROM paimon.test_db.t_part ORDER BY id, name", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, "alice".to_string()), + (1, "carol".to_string()), + (2, "bob".to_string()), + (2, "dave".to_string()), + ] + ); +} + +/// Partitioned PK table: dedup across commits within same partition. +#[tokio::test] +async fn test_pk_partitioned_dedup_across_commits() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_part_dedup ( + dt STRING, id INT NOT NULL, name STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_part_dedup VALUES \ + ('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \ + ('2024-01-02', 1, 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update within partition 2024-01-01 + handler + .sql( + "INSERT INTO paimon.test_db.t_part_dedup VALUES \ + ('2024-01-01', 1, 'alice-v2'), ('2024-01-02', 2, 'dave')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT dt, id, name FROM paimon.test_db.t_part_dedup ORDER BY dt, id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let dts = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let names = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + dts.value(i).to_string(), + ids.value(i), + names.value(i).to_string(), + )); + } + } + + assert_eq!( + rows, + vec![ + ("2024-01-01".to_string(), 1, "alice-v2".to_string()), + ("2024-01-01".to_string(), 2, "bob".to_string()), + ("2024-01-02".to_string(), 1, "carol".to_string()), + ("2024-01-02".to_string(), 2, "dave".to_string()), + ] + ); +} + +/// Partition filter on PK table — only matching partition returned. +#[tokio::test] +async fn test_pk_partitioned_filter() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_part_filter ( + dt STRING, id INT NOT NULL, name STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_part_filter VALUES \ + ('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \ + ('2024-01-02', 3, 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_name( + &handler, + "SELECT id, name FROM paimon.test_db.t_part_filter WHERE dt = '2024-01-01' ORDER BY id", + ) + .await; + + assert_eq!(rows, vec![(1, "alice".to_string()), (2, "bob".to_string())]); +} + +// ======================= Multi-Bucket PK Table ======================= + +/// Multiple buckets: rows are distributed by PK hash, dedup still works. +#[tokio::test] +async fn test_pk_multi_bucket() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_mbucket ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '4')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_mbucket VALUES \ + (1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (7, 70), (8, 80)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update some keys + handler + .sql("INSERT INTO paimon.test_db.t_mbucket VALUES (2, 200), (5, 500), (8, 800)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_value( + &handler, + "SELECT id, value FROM paimon.test_db.t_mbucket ORDER BY id", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, 10), + (2, 200), + (3, 30), + (4, 40), + (5, 500), + (6, 60), + (7, 70), + (8, 800), + ] + ); +} + +// ======================= Column Projection ======================= + +/// SELECT only a subset of columns from a PK table. +#[tokio::test] +async fn test_pk_column_projection() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_proj ( + id INT NOT NULL, name STRING, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_proj VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update id=1 + handler + .sql("INSERT INTO paimon.test_db.t_proj VALUES (1, 'alice-v2', 100)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Project only name + let batches = handler + .sql("SELECT name FROM paimon.test_db.t_proj ORDER BY name") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut names = Vec::new(); + for batch in &batches { + let arr = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + names.push(arr.value(i).to_string()); + } + } + names.sort(); + assert_eq!(names, vec!["alice-v2", "bob"]); + + // Project only value + let rows = collect_id_value( + &handler, + "SELECT id, value FROM paimon.test_db.t_proj ORDER BY id", + ) + .await; + assert_eq!(rows, vec![(1, 100), (2, 20)]); +} + +// ======================= Sequence Field ======================= + +/// sequence.field: dedup uses the specified field instead of system sequence number. +#[tokio::test] +async fn test_pk_sequence_field() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_seqf ( + id INT NOT NULL, version INT, name STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1', 'sequence.field' = 'version')", + ) + .await + .unwrap(); + + // First commit: version=2 for id=1 + handler + .sql("INSERT INTO paimon.test_db.t_seqf VALUES (1, 2, 'alice-v2'), (2, 1, 'bob-v1')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Second commit: version=1 for id=1 (older), version=2 for id=2 (newer) + handler + .sql("INSERT INTO paimon.test_db.t_seqf VALUES (1, 1, 'alice-v1'), (2, 2, 'bob-v2')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_id_name( + &handler, + "SELECT id, name FROM paimon.test_db.t_seqf ORDER BY id", + ) + .await; + + assert_eq!( + rows, + vec![ + (1, "alice-v2".to_string()), // version=2 wins over version=1 + (2, "bob-v2".to_string()), // version=2 wins over version=1 + ] + ); +} + +// ======================= INSERT OVERWRITE ======================= + +/// INSERT OVERWRITE on a partitioned PK table replaces the partition. +#[tokio::test] +async fn test_pk_insert_overwrite_partitioned() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_overwrite ( + dt STRING, id INT NOT NULL, name STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_overwrite VALUES \ + ('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \ + ('2024-01-02', 3, 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Overwrite partition 2024-01-01 + handler + .sql("INSERT OVERWRITE paimon.test_db.t_overwrite VALUES ('2024-01-01', 10, 'new_alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT dt, id, name FROM paimon.test_db.t_overwrite ORDER BY dt, id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let dts = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let names = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + dts.value(i).to_string(), + ids.value(i), + names.value(i).to_string(), + )); + } + } + + assert_eq!( + rows, + vec![ + ("2024-01-01".to_string(), 10, "new_alice".to_string()), + ("2024-01-02".to_string(), 3, "carol".to_string()), + ] + ); +} + +// ======================= Composite Primary Key ======================= + +/// Composite PK with multiple columns. +#[tokio::test] +async fn test_pk_composite_key() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_composite ( + region STRING NOT NULL, id INT NOT NULL, value INT, + PRIMARY KEY (region, id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_composite VALUES \ + ('us', 1, 10), ('eu', 1, 20), ('us', 2, 30)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update (us, 1) — (eu, 1) should be untouched + handler + .sql("INSERT INTO paimon.test_db.t_composite VALUES ('us', 1, 100)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT region, id, value FROM paimon.test_db.t_composite ORDER BY region, id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let regions = batch + .column_by_name("region") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let vals = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((regions.value(i).to_string(), ids.value(i), vals.value(i))); + } + } + + assert_eq!( + rows, + vec![ + ("eu".to_string(), 1, 20), // untouched + ("us".to_string(), 1, 100), // updated + ("us".to_string(), 2, 30), // untouched + ] + ); +} + +// ======================= Empty Table Read ======================= + +/// Reading an empty PK table returns zero rows. +#[tokio::test] +async fn test_pk_empty_table_read() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_empty ( + id INT NOT NULL, name STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + let count = row_count(&handler, "SELECT id, name FROM paimon.test_db.t_empty").await; + assert_eq!(count, 0); +} + +// ======================= Large Batch Dedup ======================= + +/// Many rows with overlapping keys in a single commit. +#[tokio::test] +async fn test_pk_large_batch_dedup() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_large ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + // Insert 100 rows, then overwrite all with new values + let mut values1 = Vec::new(); + let mut values2 = Vec::new(); + for i in 1..=100 { + values1.push(format!("({i}, {i})")); // id=i, value=i + values2.push(format!("({i}, {})", i * 10)); // id=i, value=i*10 + } + + handler + .sql(&format!( + "INSERT INTO paimon.test_db.t_large VALUES {}", + values1.join(", ") + )) + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql(&format!( + "INSERT INTO paimon.test_db.t_large VALUES {}", + values2.join(", ") + )) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let count = row_count(&handler, "SELECT * FROM paimon.test_db.t_large").await; + assert_eq!(count, 100, "Dedup should keep exactly 100 unique keys"); + + // Spot-check a few values + let rows = collect_id_value( + &handler, + "SELECT id, value FROM paimon.test_db.t_large WHERE id IN (1, 50, 100) ORDER BY id", + ) + .await; + assert_eq!(rows, vec![(1, 10), (50, 500), (100, 1000)]); +} + +// ======================= Partitioned + Multi-Bucket ======================= + +/// Partitioned PK table with multiple buckets. +#[tokio::test] +async fn test_pk_partitioned_multi_bucket() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_part_mb ( + dt STRING, id INT NOT NULL, value INT, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '2')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_part_mb VALUES \ + ('2024-01-01', 1, 10), ('2024-01-01', 2, 20), \ + ('2024-01-01', 3, 30), ('2024-01-01', 4, 40), \ + ('2024-01-02', 1, 100), ('2024-01-02', 2, 200)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update across partitions + handler + .sql( + "INSERT INTO paimon.test_db.t_part_mb VALUES \ + ('2024-01-01', 2, 222), ('2024-01-02', 1, 111)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT dt, id, value FROM paimon.test_db.t_part_mb ORDER BY dt, id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let dts = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let vals = batch + .column_by_name("value") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((dts.value(i).to_string(), ids.value(i), vals.value(i))); + } + } + + assert_eq!( + rows, + vec![ + ("2024-01-01".to_string(), 1, 10), + ("2024-01-01".to_string(), 2, 222), + ("2024-01-01".to_string(), 3, 30), + ("2024-01-01".to_string(), 4, 40), + ("2024-01-02".to_string(), 1, 111), + ("2024-01-02".to_string(), 2, 200), + ] + ); +} + +// ======================= Error Cases ======================= + +/// PK table with bucket=-1 (dynamic bucket) should be rejected. +#[tokio::test] +async fn test_pk_reject_dynamic_bucket() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_dyn ( + id INT NOT NULL, name STRING, + PRIMARY KEY (id) + )", + ) + .await + .unwrap(); + + // bucket defaults to -1, PK write should fail + let result = handler + .sql("INSERT INTO paimon.test_db.t_dyn VALUES (1, 'alice')") + .await; + + // The error may come from sql() or collect() + let is_err = match result { + Err(_) => true, + Ok(df) => df.collect().await.is_err(), + }; + assert!(is_err, "PK table with dynamic bucket should reject writes"); +} + +/// PK table with changelog-producer=input should be rejected. +#[tokio::test] +async fn test_pk_reject_changelog_producer_input() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_changelog ( + id INT NOT NULL, name STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1', 'changelog-producer' = 'input')", + ) + .await + .unwrap(); + + let result = handler + .sql("INSERT INTO paimon.test_db.t_changelog VALUES (1, 'alice')") + .await; + + let is_err = match result { + Err(_) => true, + Ok(df) => df.collect().await.is_err(), + }; + assert!( + is_err, + "PK table with changelog-producer=input should reject writes" + ); +} + +// ======================= String Primary Key ======================= + +/// PK table with STRING primary key. +#[tokio::test] +async fn test_pk_string_key() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_strpk ( + code STRING NOT NULL, name STRING, + PRIMARY KEY (code) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql( + "INSERT INTO paimon.test_db.t_strpk VALUES \ + ('A001', 'alice'), ('B002', 'bob'), ('C003', 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Update A001 + handler + .sql("INSERT INTO paimon.test_db.t_strpk VALUES ('A001', 'alice-v2')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT code, name FROM paimon.test_db.t_strpk ORDER BY code") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let codes = batch + .column_by_name("code") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let names = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((codes.value(i).to_string(), names.value(i).to_string())); + } + } + + assert_eq!( + rows, + vec![ + ("A001".to_string(), "alice-v2".to_string()), + ("B002".to_string(), "bob".to_string()), + ("C003".to_string(), "carol".to_string()), + ] + ); +} + +// ======================= Multiple Value Columns ======================= + +/// PK table with many value columns — verifies all columns survive dedup. +#[tokio::test] +async fn test_pk_multiple_value_columns() { + let (_tmp, handler) = setup_handler().await; + + handler + .sql( + "CREATE TABLE paimon.test_db.t_multi_val ( + id INT NOT NULL, col_a INT, col_b STRING, col_c INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_multi_val VALUES (1, 10, 'x', 100), (2, 20, 'y', 200)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.t_multi_val VALUES (1, 11, 'xx', 111)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT id, col_a, col_b, col_c FROM paimon.test_db.t_multi_val ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let as_ = batch + .column_by_name("col_a") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let bs = batch + .column_by_name("col_b") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + let cs = batch + .column_by_name("col_c") + .and_then(|c| c.as_any().downcast_ref::()) + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + ids.value(i), + as_.value(i), + bs.value(i).to_string(), + cs.value(i), + )); + } + } + + assert_eq!( + rows, + vec![ + (1, 11, "xx".to_string(), 111), // updated + (2, 20, "y".to_string(), 200), // untouched + ] + ); +} diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index c3b18b12..b86814b6 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -64,6 +64,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } futures = "0.3" diff --git a/crates/paimon/src/spec/binary_row.rs b/crates/paimon/src/spec/binary_row.rs index b7f9e94a..890eea6f 100644 --- a/crates/paimon/src/spec/binary_row.rs +++ b/crates/paimon/src/spec/binary_row.rs @@ -288,6 +288,61 @@ impl BinaryRow { hash_by_words(&self.data) } + /// Read a Datum from the given position based on the DataType. + /// Returns `None` if the field is null. + pub fn get_datum( + &self, + pos: usize, + data_type: &crate::spec::DataType, + ) -> crate::Result> { + if self.is_null_at(pos) { + return Ok(None); + } + use crate::spec::{DataType, Datum}; + let datum = match data_type { + DataType::Boolean(_) => Datum::Bool(self.get_boolean(pos)?), + DataType::TinyInt(_) => Datum::TinyInt(self.get_byte(pos)?), + DataType::SmallInt(_) => Datum::SmallInt(self.get_short(pos)?), + DataType::Int(_) => Datum::Int(self.get_int(pos)?), + DataType::BigInt(_) => Datum::Long(self.get_long(pos)?), + DataType::Float(_) => Datum::Float(self.get_float(pos)?), + DataType::Double(_) => Datum::Double(self.get_double(pos)?), + DataType::Date(_) => Datum::Date(self.get_int(pos)?), + DataType::Time(_) => Datum::Time(self.get_int(pos)?), + DataType::VarChar(_) | DataType::Char(_) => { + Datum::String(self.get_string(pos)?.to_string()) + } + DataType::Binary(_) | DataType::VarBinary(_) => { + Datum::Bytes(self.get_binary(pos)?.to_vec()) + } + DataType::Decimal(dt) => { + let unscaled = self.get_decimal_unscaled(pos, dt.precision())?; + Datum::Decimal { + unscaled, + precision: dt.precision(), + scale: dt.scale(), + } + } + DataType::Timestamp(ts) => { + let (millis, nanos) = self.get_timestamp_raw(pos, ts.precision())?; + Datum::Timestamp { millis, nanos } + } + DataType::LocalZonedTimestamp(ts) => { + let (millis, nanos) = self.get_timestamp_raw(pos, ts.precision())?; + Datum::LocalZonedTimestamp { millis, nanos } + } + _ => { + return Err(crate::Error::Unsupported { + message: format!( + "BinaryRow::get_datum: unsupported data type {:?} at pos {pos}", + data_type + ), + }); + } + }; + Ok(Some(datum)) + } + /// Build a BinaryRow from typed Datum values using `BinaryRowBuilder`. /// `None` entries are written as null fields. pub fn from_datums(datums: &[(Option<&crate::spec::Datum>, &crate::spec::DataType)]) -> Self { diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index eb8da398..ae9d7cc2 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -36,6 +36,10 @@ const FILE_COMPRESSION_OPTION: &str = "file.compression"; const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; +const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; +const MERGE_ENGINE_OPTION: &str = "merge-engine"; +const CHANGELOG_PRODUCER_OPTION: &str = "changelog-producer"; +const ROWKIND_FIELD_OPTION: &str = "rowkind.field"; const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10; const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000; const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000; @@ -48,6 +52,17 @@ const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024; +/// Merge engine for primary-key tables. +/// +/// Reference: Java `CoreOptions.MergeEngine`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MergeEngine { + /// Keep the row with the highest sequence number (default). + Deduplicate, + /// Keep the first row for each key (ignore later updates). + FirstRow, +} + /// Typed accessors for common table options. /// /// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight. @@ -76,6 +91,43 @@ impl<'a> CoreOptions<'a> { .unwrap_or(false) } + /// Returns the user-specified sequence field names, if configured. + /// When set, the values of these columns are used as `_SEQUENCE_NUMBER` instead of auto-increment. + /// Multiple fields can be comma-separated (e.g. `"col_a,col_b"`). + pub fn sequence_fields(&self) -> Vec<&str> { + self.options + .get(SEQUENCE_FIELD_OPTION) + .map(|s| s.split(',').map(str::trim).collect()) + .unwrap_or_default() + } + + /// Merge engine for primary-key tables. Default is `Deduplicate`. + pub fn merge_engine(&self) -> crate::Result { + match self.options.get(MERGE_ENGINE_OPTION) { + None => Ok(MergeEngine::Deduplicate), + Some(v) => match v.to_ascii_lowercase().as_str() { + "deduplicate" => Ok(MergeEngine::Deduplicate), + "first-row" => Ok(MergeEngine::FirstRow), + other => Err(crate::Error::Unsupported { + message: format!("Unsupported merge-engine: '{other}'"), + }), + }, + } + } + + /// Changelog producer setting. Default is "none". + pub fn changelog_producer(&self) -> &str { + self.options + .get(CHANGELOG_PRODUCER_OPTION) + .map(String::as_str) + .unwrap_or("none") + } + + /// The `rowkind.field` option: a user column whose value encodes the row kind. + pub fn rowkind_field(&self) -> Option<&str> { + self.options.get(ROWKIND_FIELD_OPTION).map(String::as_str) + } + pub fn data_evolution_enabled(&self) -> bool { self.options .get(DATA_EVOLUTION_ENABLED_OPTION) diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index f30df9f3..961e4148 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -72,3 +72,6 @@ pub use predicate::{ pub(crate) mod murmur_hash; mod partition_statistics; pub use partition_statistics::PartitionStatistics; + +mod row_kind; +pub use row_kind::RowKind; diff --git a/crates/paimon/src/spec/predicate.rs b/crates/paimon/src/spec/predicate.rs index 8135c7df..4af4bfb9 100644 --- a/crates/paimon/src/spec/predicate.rs +++ b/crates/paimon/src/spec/predicate.rs @@ -622,6 +622,18 @@ impl PredicateBuilder { self.leaf(field, PredicateOperator::IsNotNull, vec![]) } + /// Build a partition predicate: AND of equal/is_null for each (field_name, datum) pair. + pub fn partition_predicate(&self, fields: &[(&str, Option)]) -> Result { + let predicates: Vec = fields + .iter() + .map(|(name, value)| match value { + Some(v) => self.equal(name, v.clone()), + None => self.is_null(name), + }) + .collect::>>()?; + Ok(Predicate::and(predicates)) + } + // -- set operators -- pub fn is_in(&self, field: &str, literals: Vec) -> Result { diff --git a/crates/paimon/src/spec/row_kind.rs b/crates/paimon/src/spec/row_kind.rs new file mode 100644 index 00000000..1e3ee5de --- /dev/null +++ b/crates/paimon/src/spec/row_kind.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row kind for primary-key table changelog semantics. +//! +//! Reference: [org.apache.paimon.types.RowKind](https://github.com/apache/paimon/blob/release-1.3/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java) + +/// The kind of a row in a changelog, matching Java Paimon's `RowKind`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(i8)] +pub enum RowKind { + Insert = 0, + UpdateBefore = 1, + UpdateAfter = 2, + Delete = 3, +} + +impl RowKind { + /// Create a `RowKind` from its byte value. + pub fn from_value(value: i8) -> crate::Result { + match value { + 0 => Ok(RowKind::Insert), + 1 => Ok(RowKind::UpdateBefore), + 2 => Ok(RowKind::UpdateAfter), + 3 => Ok(RowKind::Delete), + _ => Err(crate::Error::DataInvalid { + message: format!("Invalid RowKind value: {value}, expected 0-3"), + source: None, + }), + } + } + + /// Whether this row kind represents an addition (INSERT or UPDATE_AFTER). + pub fn is_add(&self) -> bool { + matches!(self, RowKind::Insert | RowKind::UpdateAfter) + } +} diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index f1dba002..f923434a 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -141,6 +141,16 @@ pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; +pub const SEQUENCE_NUMBER_FIELD_NAME: &str = "_SEQUENCE_NUMBER"; + +/// Must match Java Paimon's `SpecialFields.SEQUENCE_NUMBER` (Integer.MAX_VALUE - 1). +pub const SEQUENCE_NUMBER_FIELD_ID: i32 = i32::MAX - 1; + +pub const VALUE_KIND_FIELD_NAME: &str = "_VALUE_KIND"; + +/// Must match Java Paimon's `SpecialFields.VALUE_KIND` (Integer.MAX_VALUE - 2). +pub const VALUE_KIND_FIELD_ID: i32 = i32::MAX - 2; + /// Data field for paimon table. /// /// Impl Reference: diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs index adc32022..2c389a39 100644 --- a/crates/paimon/src/spec/stats.rs +++ b/crates/paimon/src/spec/stats.rs @@ -18,6 +18,9 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::{Display, Formatter}; +use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum}; +use arrow_array::RecordBatch; + /// Deserialize `_NULL_COUNTS` which in Avro is `["null", {"type":"array","items":["null","long"]}]`. /// Preserves null items as `None` (meaning "unknown") rather than collapsing to 0. fn deserialize_null_counts<'de, D>(deserializer: D) -> Result>, D::Error> @@ -98,3 +101,56 @@ impl Display for BinaryTableStats { todo!() } } + +/// Compute per-column independent min/max/null_count for the specified columns +/// in a RecordBatch. Each entry in `col_indices` is the column index in the batch, +/// and the corresponding entry in `col_types` is its Paimon DataType. +pub fn compute_column_stats( + batch: &RecordBatch, + col_indices: &[usize], + col_types: &[DataType], +) -> crate::Result { + let num_cols = col_indices.len(); + let num_rows = batch.num_rows(); + let mut min_datums: Vec> = vec![None; num_cols]; + let mut max_datums: Vec> = vec![None; num_cols]; + let mut null_counts: Vec> = vec![Some(0); num_cols]; + + for row_idx in 0..num_rows { + for (pos, (&col_idx, data_type)) in col_indices.iter().zip(col_types.iter()).enumerate() { + let datum = extract_datum_from_arrow(batch, row_idx, col_idx, data_type)?; + match datum { + Some(d) => { + if min_datums[pos].as_ref().is_none_or(|m| d < *m) { + min_datums[pos] = Some(d.clone()); + } + if max_datums[pos].as_ref().is_none_or(|m| d > *m) { + max_datums[pos] = Some(d); + } + } + None => { + *null_counts[pos].as_mut().unwrap() += 1; + } + } + } + } + + let mut min_builder = BinaryRowBuilder::new(num_cols as i32); + let mut max_builder = BinaryRowBuilder::new(num_cols as i32); + for (pos, data_type) in col_types.iter().enumerate() { + match &min_datums[pos] { + Some(d) => min_builder.write_datum(pos, d, data_type), + None => min_builder.set_null_at(pos), + } + match &max_datums[pos] { + Some(d) => max_builder.write_datum(pos, d, data_type), + None => max_builder.set_null_at(pos), + } + } + + Ok(BinaryTableStats::new( + min_builder.build_serialized(), + max_builder.build_serialized(), + null_counts, + )) +} diff --git a/crates/paimon/src/table/kv_file_reader.rs b/crates/paimon/src/table/kv_file_reader.rs new file mode 100644 index 00000000..1ad0f087 --- /dev/null +++ b/crates/paimon/src/table/kv_file_reader.rs @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key-value file reader for primary-key tables using sort-merge with LoserTree. +//! +//! Each data file in a split is read as a separate sorted stream. The streams +//! are merged by primary key using a LoserTree, and rows with the same key are +//! deduplicated by keeping the one with the highest `_SEQUENCE_NUMBER`. +//! +//! Reference: Java Paimon `SortMergeReaderWithMinHeap`. + +use super::data_file_reader::DataFileReader; +use super::sort_merge::{DeduplicateMergeFunction, SortMergeReaderBuilder}; +use crate::arrow::build_target_arrow_schema; +use crate::io::FileIO; +use crate::spec::{ + BigIntType, DataField, DataType as PaimonDataType, Predicate, TinyIntType, + SEQUENCE_NUMBER_FIELD_ID, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID, + VALUE_KIND_FIELD_NAME, +}; +use crate::table::schema_manager::SchemaManager; +use crate::table::ArrowRecordBatchStream; +use crate::{DataSplit, Error}; +use arrow_array::RecordBatch; + +use async_stream::try_stream; +use futures::StreamExt; + +/// Reads primary-key table data files using sort-merge deduplication. +pub(crate) struct KeyValueFileReader { + file_io: FileIO, + config: KeyValueReadConfig, +} + +/// Configuration for [`KeyValueFileReader`], grouping table schema and +/// key/predicate parameters. +pub(crate) struct KeyValueReadConfig { + pub schema_manager: SchemaManager, + pub table_schema_id: i64, + pub table_fields: Vec, + pub read_type: Vec, + pub predicates: Vec, + pub primary_keys: Vec, + pub sequence_fields: Vec, +} + +impl KeyValueFileReader { + pub(crate) fn new(file_io: FileIO, config: KeyValueReadConfig) -> Self { + // Only keep predicates that reference primary key columns. + // Non-PK predicates applied before merge can cause incorrect results. + // Use project_field_index_inclusive: AND keeps PK children, OR requires all PK. + let pk_set: std::collections::HashSet<&str> = + config.primary_keys.iter().map(|s| s.as_str()).collect(); + let mapping: Vec> = config + .table_fields + .iter() + .enumerate() + .map(|(i, f)| { + if pk_set.contains(f.name()) { + Some(i) + } else { + None + } + }) + .collect(); + let pk_predicates = config + .predicates + .into_iter() + .filter_map(|p| p.project_field_index_inclusive(&mapping)) + .collect(); + + Self { + file_io, + config: KeyValueReadConfig { + predicates: pk_predicates, + ..config + }, + } + } + + pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { + // Build the internal read type for thin-mode files. + // Physical file schema: [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...] + // We need: _SEQ + _VK + union(read_type, primary_keys) + let seq_field = DataField::new( + SEQUENCE_NUMBER_FIELD_ID, + SEQUENCE_NUMBER_FIELD_NAME.to_string(), + PaimonDataType::BigInt(BigIntType::new()), + ); + let value_kind_field = DataField::new( + VALUE_KIND_FIELD_ID, + VALUE_KIND_FIELD_NAME.to_string(), + PaimonDataType::TinyInt(TinyIntType::new()), + ); + + let key_names: std::collections::HashSet<&str> = self + .config + .primary_keys + .iter() + .map(|s| s.as_str()) + .collect(); + + // Collect key fields from table schema. + let key_fields: Vec = self + .config + .primary_keys + .iter() + .map(|pk| { + self.config + .table_fields + .iter() + .find(|f| f.name() == pk) + .cloned() + .ok_or_else(|| Error::UnexpectedError { + message: format!("Primary key column '{pk}' not found in table schema"), + source: None, + }) + }) + .collect::>>()?; + + // User columns = read_type fields + any key fields not already in read_type + // + any sequence fields not already included. + let read_type_names: std::collections::HashSet<&str> = + self.config.read_type.iter().map(|f| f.name()).collect(); + let mut user_fields: Vec = self.config.read_type.clone(); + for kf in &key_fields { + if !read_type_names.contains(kf.name()) { + user_fields.push(kf.clone()); + } + } + // Add sequence fields if not already present. + for sf_name in &self.config.sequence_fields { + if user_fields.iter().all(|f| f.name() != sf_name.as_str()) { + let sf = self + .config + .table_fields + .iter() + .find(|f| f.name() == sf_name.as_str()) + .cloned() + .ok_or_else(|| Error::UnexpectedError { + message: format!("Sequence field '{sf_name}' not found in table schema"), + source: None, + })?; + user_fields.push(sf); + } + } + + // Internal read type: [_SEQ, _VK, user_fields...] + let mut internal_read_type: Vec = Vec::new(); + internal_read_type.push(seq_field); + internal_read_type.push(value_kind_field); + internal_read_type.extend(user_fields.clone()); + + let internal_schema = build_target_arrow_schema(&internal_read_type)?; + + // Output schema: user's read_type order + let output_schema = build_target_arrow_schema(&self.config.read_type)?; + + // Indices within internal_schema (offset 2 for _SEQ and _VK). + let seq_index = 0; + let value_kind_index = 1; + let key_indices: Vec = self + .config + .primary_keys + .iter() + .map(|pk| { + user_fields + .iter() + .position(|f| f.name() == pk) + .map(|p| p + 2) + .unwrap() + }) + .collect(); + let value_fields: Vec = user_fields + .iter() + .filter(|f| !key_names.contains(f.name())) + .cloned() + .collect(); + let value_indices: Vec = user_fields + .iter() + .enumerate() + .filter(|(_, f)| !key_names.contains(f.name())) + .map(|(i, _)| i + 2) + .collect(); + + // If sequence.field is configured, find each field's index in the internal schema. + let user_sequence_indices: Vec = self + .config + .sequence_fields + .iter() + .filter_map(|sf| { + user_fields + .iter() + .position(|f| f.name() == sf.as_str()) + .map(|p| p + 2) + }) + .collect(); + + // Build the reorder mapping: merge output is [keys..., values...], + // but user wants them in read_type order. + let num_keys = key_fields.len(); + let mut reorder_map: Vec = vec![0; self.config.read_type.len()]; + for (out_idx, field) in self.config.read_type.iter().enumerate() { + if key_names.contains(field.name()) { + // Find position in key_fields + let key_pos = key_fields + .iter() + .position(|kf| kf.name() == field.name()) + .unwrap(); + reorder_map[out_idx] = key_pos; + } else { + // Find position in value_fields + let val_pos = value_fields + .iter() + .position(|vf| vf.name() == field.name()) + .unwrap(); + reorder_map[out_idx] = num_keys + val_pos; + } + } + + let splits: Vec = data_splits.to_vec(); + let file_io = self.file_io; + let schema_manager = self.config.schema_manager; + let table_schema_id = self.config.table_schema_id; + let table_fields = self.config.table_fields; + let predicates = self.config.predicates; + + // Build the merge output schema (keys + values, no system columns). + let mut merge_output_fields: Vec = Vec::new(); + merge_output_fields.extend(key_fields); + merge_output_fields.extend(value_fields); + let merge_output_schema = build_target_arrow_schema(&merge_output_fields)?; + + Ok(try_stream! { + for split in &splits { + // DV mode should not reach KeyValueFileReader. + if split + .data_deletion_files() + .is_some_and(|files| files.iter().any(Option::is_some)) + { + Err(Error::UnexpectedError { + message: "KeyValueFileReader does not support deletion vectors".to_string(), + source: None, + })?; + } + + // Create one stream per data file. + let mut file_streams: Vec = Vec::new(); + + for file_meta in split.data_files().to_vec() { + let data_fields: Option> = if file_meta.schema_id != table_schema_id { + let data_schema = schema_manager.schema(file_meta.schema_id).await?; + Some(data_schema.fields().to_vec()) + } else { + None + }; + + let reader = DataFileReader::new( + file_io.clone(), + schema_manager.clone(), + table_schema_id, + table_fields.clone(), + internal_read_type.clone(), + predicates.clone(), + ); + + let stream = reader.read_single_file_stream( + split, + file_meta, + data_fields, + None, + None, + )?; + file_streams.push(stream); + } + + if file_streams.is_empty() { + continue; + } + + // Always go through sort-merge even for single file, + // because a single file may contain duplicate keys. + let mut merge_stream = SortMergeReaderBuilder::new( + file_streams, + internal_schema.clone(), + key_indices.clone(), + seq_index, + value_kind_index, + user_sequence_indices.clone(), + value_indices.clone(), + merge_output_schema.clone(), + Box::new(DeduplicateMergeFunction), + ) + .build()?; + + while let Some(batch) = merge_stream.next().await { + let batch = batch?; + // Reorder columns from [keys..., values...] to read_type order. + let columns: Vec<_> = reorder_map + .iter() + .map(|&src| batch.column(src).clone()) + .collect(); + let reordered = RecordBatch::try_new(output_schema.clone(), columns) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to reorder merged RecordBatch: {e}"), + source: Some(Box::new(e)), + })?; + yield reordered; + } + } + } + .boxed()) + } +} diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs new file mode 100644 index 00000000..3309820b --- /dev/null +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key-value file writer for primary-key tables. +//! +//! Buffers data in memory, sorts by primary key on flush, and prepends +//! `_SEQUENCE_NUMBER` and `_VALUE_KIND` columns. +//! +//! Uses thin-mode (`data-file.thin-mode`): the physical file schema is +//! `[_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...]` — primary key columns +//! are NOT duplicated. The read path extracts keys from the value portion. +//! +//! Reference: [org.apache.paimon.io.KeyValueDataFileWriterImpl](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java) + +use crate::arrow::format::create_format_writer; +use crate::io::FileIO; +use crate::spec::stats::{compute_column_stats, BinaryTableStats}; +use crate::spec::{ + extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType, MergeEngine, + EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_NAME, +}; +use crate::Result; +use arrow_array::{Int64Array, Int8Array, RecordBatch}; +use arrow_ord::sort::{lexsort_to_indices, SortColumn, SortOptions}; +use arrow_row::{RowConverter, SortField}; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use chrono::Utc; +use std::sync::Arc; + +/// Internal writer for primary-key tables that buffers data in memory, +/// sorts by primary key on flush, and prepends `_SEQUENCE_NUMBER` and `_VALUE_KIND` columns. +pub(crate) struct KeyValueFileWriter { + file_io: FileIO, + config: KeyValueWriteConfig, + /// Next sequence number to assign (bucket-local, always auto-incremented). + next_sequence_number: i64, + /// Buffered batches (user schema). + buffer: Vec, + /// Approximate buffered bytes. + buffer_bytes: usize, + /// Completed file metadata. + written_files: Vec, +} + +/// Configuration for [`KeyValueFileWriter`], grouping file-location, schema, +/// and key/merge parameters. +pub(crate) struct KeyValueWriteConfig { + pub table_location: String, + pub partition_path: String, + pub bucket: i32, + pub schema_id: i64, + pub file_compression: String, + pub file_compression_zstd_level: i32, + pub write_buffer_size: i64, + /// Primary key column indices in the user schema. + pub primary_key_indices: Vec, + /// Paimon DataTypes for each primary key column (same order as primary_key_indices). + pub primary_key_types: Vec, + /// Sequence field column indices in the user schema (empty if not configured). + pub sequence_field_indices: Vec, + /// Merge engine for deduplication. + pub merge_engine: MergeEngine, + /// Column index in user schema that provides the row kind value. + /// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None (all INSERT). + pub value_kind_col_index: Option, +} + +impl KeyValueFileWriter { + pub(crate) fn new( + file_io: FileIO, + config: KeyValueWriteConfig, + next_sequence_number: i64, + ) -> Self { + Self { + file_io, + config, + next_sequence_number, + buffer: Vec::new(), + buffer_bytes: 0, + written_files: Vec::new(), + } + } + + /// Buffer a RecordBatch. Flushes when buffer exceeds write_buffer_size. + /// Sequence numbers are assigned per-bucket on flush, matching Java Paimon behavior. + pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + let batch_bytes: usize = batch + .columns() + .iter() + .map(|c| c.get_buffer_memory_size()) + .sum(); + self.buffer.push(batch.clone()); + self.buffer_bytes += batch_bytes; + + if self.buffer_bytes as i64 >= self.config.write_buffer_size { + self.flush().await?; + } + Ok(()) + } + + /// Number of rows per chunk when writing sorted data to parquet. + const FLUSH_CHUNK_ROWS: usize = 4096; + + /// Sort buffered data by primary key + sequence fields + auto-seq, deduplicate + /// by merge engine, prepend _SEQUENCE_NUMBER/_VALUE_KIND, and write to a parquet file. + /// + /// Uses chunked writing: after sorting and dedup, data is materialized and written + /// in small chunks so that only `combined`(1x) + one chunk lives in memory at a time. + pub(crate) async fn flush(&mut self) -> Result<()> { + if self.buffer.is_empty() { + return Ok(()); + } + + let batches = std::mem::take(&mut self.buffer); + self.buffer_bytes = 0; + + // Concatenate all buffered batches, then immediately free the originals. + let user_schema = batches[0].schema(); + let combined = + arrow_select::concat::concat_batches(&user_schema, &batches).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to concat batches: {e}"), + source: None, + } + })?; + drop(batches); + + let num_rows = combined.num_rows(); + if num_rows == 0 { + return Ok(()); + } + + // Assign auto-incremented sequence numbers BEFORE sorting (arrival order). + let start_seq = self.next_sequence_number; + let end_seq = start_seq + num_rows as i64 - 1; + self.next_sequence_number = end_seq + 1; + let seq_array: Arc = + Arc::new(Int64Array::from((start_seq..=end_seq).collect::>())); + + // Sort by: primary key columns + sequence field columns + auto-increment seq. + let mut sort_columns: Vec = Vec::new(); + for &idx in &self.config.primary_key_indices { + sort_columns.push(SortColumn { + values: combined.column(idx).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }); + } + for &idx in &self.config.sequence_field_indices { + sort_columns.push(SortColumn { + values: combined.column(idx).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }); + } + sort_columns.push(SortColumn { + values: seq_array.clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }); + let sorted_indices = + lexsort_to_indices(&sort_columns, None).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to sort by primary key: {e}"), + source: None, + })?; + + // Deduplicate: for consecutive rows with the same PK, pick the winner. + // After sorting by PK + seq fields + auto-seq (all ascending): + // Deduplicate → keep last row per key group (highest seq) + // FirstRow → keep first row per key group (lowest seq) + let deduped_indices = self.dedup_sorted_indices(&combined, &sorted_indices)?; + let deduped_num_rows = deduped_indices.len(); + + // Extract min_key / max_key from deduped endpoints. + let first_row = deduped_indices[0] as usize; + let last_row = deduped_indices[deduped_num_rows - 1] as usize; + let min_key = self.extract_key_binary_row(&combined, first_row)?; + let max_key = self.extract_key_binary_row(&combined, last_row)?; + + // Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...] + let user_fields = user_schema.fields(); + let mut physical_fields: Vec> = Vec::new(); + physical_fields.push(Arc::new(ArrowField::new( + SEQUENCE_NUMBER_FIELD_NAME, + ArrowDataType::Int64, + false, + ))); + physical_fields.push(Arc::new(ArrowField::new( + VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + ))); + for field in user_fields.iter() { + physical_fields.push(field.clone()); + } + let physical_schema = Arc::new(ArrowSchema::new(physical_fields)); + + // Open parquet writer. + let file_name = format!( + "data-{}-{}.parquet", + uuid::Uuid::new_v4(), + self.written_files.len() + ); + let bucket_dir = if self.config.partition_path.is_empty() { + format!( + "{}/bucket-{}", + self.config.table_location, self.config.bucket + ) + } else { + format!( + "{}/{}/bucket-{}", + self.config.table_location, self.config.partition_path, self.config.bucket + ) + }; + self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; + let file_path = format!("{}/{}", bucket_dir, file_name); + let output = self.file_io.new_output(&file_path)?; + let mut writer = create_format_writer( + &output, + physical_schema.clone(), + &self.config.file_compression, + self.config.file_compression_zstd_level, + ) + .await?; + + // Chunked write using deduped indices. + let deduped_u32 = arrow_array::UInt32Array::from(deduped_indices); + for chunk_start in (0..deduped_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) { + let chunk_len = Self::FLUSH_CHUNK_ROWS.min(deduped_num_rows - chunk_start); + let chunk_indices = deduped_u32.slice(chunk_start, chunk_len); + + let mut physical_columns: Vec> = Vec::new(); + // Sequence numbers for this chunk. + physical_columns.push( + arrow_select::take::take(seq_array.as_ref(), &chunk_indices, None).map_err( + |e| crate::Error::DataInvalid { + message: format!("Failed to reorder sequence numbers: {e}"), + source: None, + }, + )?, + ); + // Value kind column. + match self.config.value_kind_col_index { + Some(vk_idx) => { + physical_columns.push( + arrow_select::take::take( + combined.column(vk_idx).as_ref(), + &chunk_indices, + None, + ) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to reorder value kind column: {e}"), + source: None, + })?, + ); + } + None => { + // All rows are INSERT (value_kind = 0). + physical_columns.push(Arc::new(Int8Array::from(vec![0i8; chunk_len]))); + } + } + // All user columns. + for idx in 0..combined.num_columns() { + physical_columns.push( + arrow_select::take::take(combined.column(idx).as_ref(), &chunk_indices, None) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to reorder by sort indices: {e}"), + source: None, + })?, + ); + } + + let chunk_batch = RecordBatch::try_new(physical_schema.clone(), physical_columns) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to create physical batch: {e}"), + source: None, + })?; + writer.write(&chunk_batch).await?; + } + + let file_size = writer.close().await? as i64; + + // Compute key_stats on deduped data (not the raw combined batch). + let deduped_key_columns: Vec> = + self.config + .primary_key_indices + .iter() + .map(|&idx| { + arrow_select::take::take(combined.column(idx).as_ref(), &deduped_u32, None) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to take key column for stats: {e}"), + source: None, + }) + }) + .collect::>>()?; + let deduped_key_batch = RecordBatch::try_new( + Arc::new(ArrowSchema::new( + self.config + .primary_key_indices + .iter() + .map(|&idx| user_schema.field(idx).clone()) + .collect::>(), + )), + deduped_key_columns, + ) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to build deduped key batch for stats: {e}"), + source: None, + })?; + let stats_col_indices: Vec = (0..self.config.primary_key_indices.len()).collect(); + let key_stats = compute_column_stats( + &deduped_key_batch, + &stats_col_indices, + &self.config.primary_key_types, + )?; + + // Sequence numbers span the full assigned range. + let meta = DataFileMeta { + file_name, + file_size, + row_count: deduped_num_rows as i64, + min_key, + max_key, + key_stats, + value_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + min_sequence_number: start_seq, + max_sequence_number: end_seq, + schema_id: self.config.schema_id, + level: 0, + extra_files: vec![], + creation_time: Some(Utc::now()), + delete_row_count: Some(0), + embedded_index: None, + file_source: Some(0), // FileSource.APPEND + value_stats_cols: Some(vec![]), + external_path: None, + first_row_id: None, + write_cols: None, + }; + self.written_files.push(meta); + Ok(()) + } + + /// Deduplicate sorted indices by primary key using the configured merge engine. + /// + /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all ascending). + /// Output: a Vec of original row indices to keep, in sorted PK order. + fn dedup_sorted_indices( + &self, + batch: &RecordBatch, + sorted_indices: &arrow_array::UInt32Array, + ) -> Result> { + let n = sorted_indices.len(); + if n == 0 { + return Ok(vec![]); + } + + // Convert PK columns to arrow-row Rows for efficient comparison. + let sort_fields: Vec = self + .config + .primary_key_indices + .iter() + .map(|&idx| SortField::new(batch.schema().field(idx).data_type().clone())) + .collect(); + let converter = + RowConverter::new(sort_fields).map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to create RowConverter for dedup: {e}"), + source: Some(Box::new(e)), + })?; + let key_columns: Vec> = self + .config + .primary_key_indices + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect(); + let rows = + converter + .convert_columns(&key_columns) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to convert key columns for dedup: {e}"), + source: Some(Box::new(e)), + })?; + + let mut result: Vec = Vec::with_capacity(n); + // Track the start of the current key group and the candidate winner. + let mut group_winner = sorted_indices.value(0); + + for i in 1..n { + let cur = sorted_indices.value(i); + if rows.row(group_winner as usize) == rows.row(cur as usize) { + // Same key group — update winner based on merge engine. + match self.config.merge_engine { + // Deduplicate: keep last (highest seq), which is the current row + // since we sorted ascending. + MergeEngine::Deduplicate => group_winner = cur, + // FirstRow: keep first (lowest seq), so don't update. + MergeEngine::FirstRow => {} + } + } else { + // New key group — emit the winner of the previous group. + result.push(group_winner); + group_winner = cur; + } + } + // Emit the last group's winner. + result.push(group_winner); + Ok(result) + } + + /// Flush remaining buffer and return all written file metadata. + pub(crate) async fn prepare_commit(&mut self) -> Result> { + self.flush().await?; + Ok(std::mem::take(&mut self.written_files)) + } + + /// Extract primary key columns from a batch at a given row index into a serialized BinaryRow. + fn extract_key_binary_row(&self, batch: &RecordBatch, row_idx: usize) -> Result> { + let num_keys = self.config.primary_key_indices.len(); + let mut builder = BinaryRowBuilder::new(num_keys as i32); + for (pos, (&col_idx, data_type)) in self + .config + .primary_key_indices + .iter() + .zip(self.config.primary_key_types.iter()) + .enumerate() + { + match extract_datum_from_arrow(batch, row_idx, col_idx, data_type)? { + Some(datum) => builder.write_datum(pos, &datum, data_type), + None => builder.set_null_at(pos), + } + } + Ok(builder.build_serialized()) + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 2ec5cd96..8fb8ef33 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -27,12 +27,15 @@ mod data_file_writer; #[cfg(feature = "fulltext")] mod full_text_search_builder; pub(crate) mod global_index_scanner; +mod kv_file_reader; +mod kv_file_writer; mod read_builder; pub(crate) mod rest_env; pub(crate) mod row_id_predicate; pub(crate) mod schema_manager; pub(crate) mod snapshot_commit; mod snapshot_manager; +mod sort_merge; mod source; mod stats_filter; pub(crate) mod table_commit; diff --git a/crates/paimon/src/table/sort_merge.rs b/crates/paimon/src/table/sort_merge.rs new file mode 100644 index 00000000..b7dcbd20 --- /dev/null +++ b/crates/paimon/src/table/sort_merge.rs @@ -0,0 +1,1267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort-merge reader with LoserTree for primary-key table reads. +//! +//! Merges multiple sorted `ArrowRecordBatchStream`s by primary key using a +//! tournament tree (LoserTree), applying a [`MergeFunction`] to deduplicate +//! rows sharing the same key. +//! +//! Reference: +//! - Java Paimon: `SortMergeReaderWithMinHeap` +//! - DataFusion: `SortPreservingMergeStream` (LoserTree layout) +//! - Arrow-row: `RowConverter` for efficient key comparison + +use crate::spec::RowKind; +use crate::table::ArrowRecordBatchStream; +use crate::Error; +use arrow_array::{ArrayRef, Int64Array, Int8Array, RecordBatch}; +use arrow_row::{RowConverter, Rows, SortField}; +use arrow_schema::SchemaRef; +use arrow_select::interleave::interleave; +use async_stream::try_stream; +use futures::StreamExt; +use std::cmp::Ordering; + +// --------------------------------------------------------------------------- +// MergeFunction +// --------------------------------------------------------------------------- + +/// A row reference as an index into the batch buffer. +pub(crate) struct MergeRow { + /// Index into the shared batch buffer. + pub batch_idx: usize, + pub row_idx: usize, + pub sequence_number: i64, + pub value_kind: i8, + /// User-defined sequence values from `sequence.field` (empty if not configured). + pub user_sequences: Vec>, +} + +/// Merge function applied to rows sharing the same primary key. +/// +/// For deduplicate: returns the single winner (batch_idx, row_idx), or None +/// if the winning row should be filtered out (e.g. DELETE). +pub(crate) trait MergeFunction: Send + Sync { + /// Pick the winning row from same-key candidates. + /// Returns `Some((batch_idx, row_idx))` of the winner, or `None` if the + /// key should be omitted from output (e.g. winner is a DELETE row). + fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result>; +} + +/// Deduplicate merge: keeps the row with the highest sequence. +/// When `sequence.field` is configured (one or more fields), compares user +/// sequences lexicographically first, then falls back to system +/// `_SEQUENCE_NUMBER` as tie-breaker. +/// When sequence numbers are equal, keeps the last-added row (last-writer-wins). +/// Filters out DELETE and UPDATE_BEFORE rows. +pub(crate) struct DeduplicateMergeFunction; + +impl MergeFunction for DeduplicateMergeFunction { + fn pick_winner(&self, rows: &[MergeRow]) -> crate::Result> { + let winner = rows + .iter() + .reduce(|best, r| { + // Compare user sequences lexicographically first (if present), then system sequence. + let ord = match (r.user_sequences.is_empty(), best.user_sequences.is_empty()) { + (false, false) => r + .user_sequences + .cmp(&best.user_sequences) + .then_with(|| r.sequence_number.cmp(&best.sequence_number)), + _ => r.sequence_number.cmp(&best.sequence_number), + }; + // >= semantics: last-writer-wins for equal values. + if ord.is_ge() { + r + } else { + best + } + }) + .expect("merge called with empty rows"); + if RowKind::from_value(winner.value_kind)?.is_add() { + Ok(Some((winner.batch_idx, winner.row_idx))) + } else { + Ok(None) + } + } +} + +// --------------------------------------------------------------------------- +// SortMergeCursor +// --------------------------------------------------------------------------- + +/// Cursor tracking position within a single stream's current RecordBatch. +struct SortMergeCursor { + batch: RecordBatch, + /// Row-encoded keys for the current batch (via arrow-row). + rows: Rows, + offset: usize, +} + +impl SortMergeCursor { + fn is_finished(&self) -> bool { + self.offset >= self.rows.num_rows() + } + + fn current_row(&self) -> arrow_row::Row<'_> { + self.rows.row(self.offset) + } + + fn advance(&mut self) { + self.offset += 1; + } + + fn sequence_number(&self, seq_index: usize) -> i64 { + let col = self.batch.column(seq_index); + let arr = col + .as_any() + .downcast_ref::() + .expect("_SEQUENCE_NUMBER column must be Int64"); + arr.value(self.offset) + } + + fn value_kind(&self, value_kind_index: usize) -> i8 { + let col = self.batch.column(value_kind_index); + match col.as_any().downcast_ref::() { + Some(arr) if !col.is_null(self.offset) => arr.value(self.offset), + _ => 0, // default to INSERT for NULL or missing _VALUE_KIND + } + } + + /// Read the user-defined sequence field value (cast to i64 for ordering). + /// Returns None if the column is NULL at this row. + /// + /// Supports the same types as Java Paimon's `UserDefinedSeqComparator`: + /// TinyInt, SmallInt, Int, BigInt, Timestamp, Date, Decimal. + fn user_sequence(&self, user_seq_index: usize) -> Option { + let col = self.batch.column(user_seq_index); + if col.is_null(self.offset) { + return None; + } + use arrow_array::*; + let any = col.as_any(); + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + // Timestamps are stored as i64 internally (micros, millis, seconds, nanos). + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset) as i128); + } + // Decimal128: use raw i128 value for ordering (same precision/scale within a column). + if let Some(arr) = any.downcast_ref::() { + return Some(arr.value(self.offset)); + } + None + } +} + +// --------------------------------------------------------------------------- +// LoserTree +// --------------------------------------------------------------------------- + +/// A LoserTree (tournament tree) for k-way merge. +/// +/// Layout follows DataFusion's `SortPreservingMergeStream`: +/// - `nodes[0]` = overall winner index +/// - `nodes[1..k]` = loser at each internal node +/// +/// Reference: +struct LoserTree { + /// nodes[0] = winner, nodes[1..] = losers + nodes: Vec, + num_streams: usize, +} + +impl LoserTree { + fn new(num_streams: usize) -> Self { + Self { + nodes: vec![usize::MAX; num_streams], + num_streams, + } + } + + fn winner(&self) -> usize { + self.nodes[0] + } + + /// Leaf node index for a given stream index. + fn leaf_index(&self, stream_idx: usize) -> usize { + (self.num_streams + stream_idx) / 2 + } + + fn parent_index(node_idx: usize) -> usize { + node_idx / 2 + } + + /// Build the tree from scratch given a comparison function. + /// `is_gt(a, b)` returns true if stream `a` > stream `b`. + fn init(&mut self, is_gt: impl Fn(usize, usize) -> bool) { + self.nodes.fill(usize::MAX); + for i in 0..self.num_streams { + let mut winner = i; + let mut cmp_node = self.leaf_index(i); + while cmp_node != 0 && self.nodes[cmp_node] != usize::MAX { + let challenger = self.nodes[cmp_node]; + if is_gt(winner, challenger) { + self.nodes[cmp_node] = winner; + winner = challenger; + } + cmp_node = Self::parent_index(cmp_node); + } + self.nodes[cmp_node] = winner; + } + } + + /// Update the tree after the winner has been consumed/advanced. + fn update(&mut self, is_gt: impl Fn(usize, usize) -> bool) { + let mut winner = self.nodes[0]; + let mut cmp_node = self.leaf_index(winner); + while cmp_node != 0 { + let challenger = self.nodes[cmp_node]; + if is_gt(winner, challenger) { + self.nodes[cmp_node] = winner; + winner = challenger; + } + cmp_node = Self::parent_index(cmp_node); + } + self.nodes[0] = winner; + } +} + +// --------------------------------------------------------------------------- +// SortMergeReader +// --------------------------------------------------------------------------- + +/// Configuration for building a [`SortMergeReader`]. +pub(crate) struct SortMergeReaderBuilder { + streams: Vec, + /// Full schema of the input streams (key + seq + value_kind + value columns). + input_schema: SchemaRef, + /// Indices of primary key columns in input_schema. + key_indices: Vec, + /// Index of _SEQUENCE_NUMBER column in input_schema. + seq_index: usize, + /// Index of _VALUE_KIND column in input_schema. + value_kind_index: usize, + /// Indices of user-defined sequence field columns in input_schema (if configured). + user_sequence_indices: Vec, + /// Indices of user value columns in input_schema (output columns). + value_indices: Vec, + /// Output schema (key + value columns, no system columns). + output_schema: SchemaRef, + merge_function: Box, + batch_size: usize, +} + +impl SortMergeReaderBuilder { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + streams: Vec, + input_schema: SchemaRef, + key_indices: Vec, + seq_index: usize, + value_kind_index: usize, + user_sequence_indices: Vec, + value_indices: Vec, + output_schema: SchemaRef, + merge_function: Box, + ) -> Self { + Self { + streams, + input_schema, + key_indices, + seq_index, + value_kind_index, + user_sequence_indices, + value_indices, + output_schema, + merge_function, + batch_size: 1024, + } + } + + #[allow(dead_code)] + pub(crate) fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Build the sort-merge stream. + pub(crate) fn build(self) -> crate::Result { + let sort_fields: Vec = self + .key_indices + .iter() + .map(|&idx| SortField::new(self.input_schema.field(idx).data_type().clone())) + .collect(); + + let row_converter = RowConverter::new(sort_fields).map_err(|e| Error::UnexpectedError { + message: format!("Failed to create RowConverter: {e}"), + source: Some(Box::new(e)), + })?; + + sort_merge_stream( + self.streams, + row_converter, + self.key_indices, + self.seq_index, + self.value_kind_index, + self.user_sequence_indices, + self.value_indices, + self.output_schema, + self.merge_function, + self.batch_size, + ) + } +} + +/// Convert a RecordBatch's key columns into arrow-row `Rows`. +fn convert_batch_keys( + batch: &RecordBatch, + key_indices: &[usize], + converter: &mut RowConverter, +) -> crate::Result { + let key_columns: Vec = key_indices + .iter() + .map(|&idx| batch.column(idx).clone()) + .collect(); + converter + .convert_columns(&key_columns) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to convert key columns to Rows: {e}"), + source: Some(Box::new(e)), + }) +} + +/// Compare two cursors by their current key. `None` cursors are treated as +/// greater than any value (exhausted streams sink to the bottom). +fn compare_cursors(cursors: &[Option], a: usize, b: usize) -> Ordering { + match (&cursors[a], &cursors[b]) { + (None, None) => Ordering::Equal, + (None, _) => Ordering::Greater, + (_, None) => Ordering::Less, + (Some(ca), Some(cb)) => ca.current_row().cmp(&cb.current_row()), + } +} + +/// The main sort-merge stream implementation. +/// +/// Uses an interleave-based output strategy (like DataFusion's BatchBuilder): +/// instead of slicing individual rows and concatenating, we record +/// `(batch_idx, row_idx)` indices and use `arrow_select::interleave` to +/// gather all output rows in one pass per column. +#[allow(clippy::too_many_arguments)] +fn sort_merge_stream( + mut streams: Vec, + mut row_converter: RowConverter, + key_indices: Vec, + seq_index: usize, + value_kind_index: usize, + user_sequence_indices: Vec, + value_indices: Vec, + output_schema: SchemaRef, + merge_function: Box, + batch_size: usize, +) -> crate::Result { + let num_streams = streams.len(); + if num_streams == 0 { + return Ok(futures::stream::empty().boxed()); + } + + // Output column indices: key columns + value columns (skip _SEQUENCE_NUMBER). + let output_col_indices: Vec = key_indices + .iter() + .chain(value_indices.iter()) + .copied() + .collect(); + + Ok(try_stream! { + // Initialize cursors: read first non-empty batch from each stream. + // Loop to skip empty batches (e.g. from predicate filtering). + let mut cursors: Vec> = Vec::with_capacity(num_streams); + for stream in &mut streams { + let mut found = false; + while let Some(batch_result) = stream.next().await { + let batch = batch_result?; + if batch.num_rows() > 0 { + let rows = convert_batch_keys(&batch, &key_indices, &mut row_converter)?; + cursors.push(Some(SortMergeCursor { batch, rows, offset: 0 })); + found = true; + break; + } + } + if !found { + cursors.push(None); + } + } + + // Build loser tree. + let mut tree = LoserTree::new(num_streams); + tree.init(|a, b| compare_cursors(&cursors, a, b).then_with(|| a.cmp(&b)).is_gt()); + + // Batch buffer: stores RecordBatches referenced by output indices. + // Each cursor's current batch gets an entry; when a cursor advances + // to a new batch, the old one stays in the buffer until the output + // batch is flushed. + let mut batch_buffer: Vec = Vec::new(); + // Map from stream_idx -> current batch_buffer index. + let mut stream_batch_idx: Vec> = vec![None; num_streams]; + + // Register initial batches. + for (i, cursor) in cursors.iter().enumerate() { + if let Some(c) = cursor { + let idx = batch_buffer.len(); + batch_buffer.push(c.batch.clone()); + stream_batch_idx[i] = Some(idx); + } + } + + // Output indices: (batch_buffer_idx, row_idx) for interleave. + let mut output_indices: Vec<(usize, usize)> = Vec::with_capacity(batch_size); + + loop { + let winner_idx = tree.winner(); + // Check if all streams are exhausted. + if cursors[winner_idx].is_none() { + break; + } + + // Capture the winner's key for grouping same-key rows. + let winner_key = { + let cursor = cursors[winner_idx].as_ref().unwrap(); + cursor.current_row().owned() + }; + + // Collect all rows with the same key across all streams. + let mut same_key_rows: Vec = Vec::new(); + + loop { + let current_winner = tree.winner(); + let matches = match &cursors[current_winner] { + None => false, + Some(c) => c.current_row().cmp(&winner_key.row()) == Ordering::Equal, + }; + if !matches { + break; + } + + // Record this row. + { + let cursor = cursors[current_winner].as_ref().unwrap(); + let buf_idx = stream_batch_idx[current_winner].unwrap(); + same_key_rows.push(MergeRow { + batch_idx: buf_idx, + row_idx: cursor.offset, + sequence_number: cursor.sequence_number(seq_index), + value_kind: cursor.value_kind(value_kind_index), + user_sequences: user_sequence_indices.iter().map(|&idx| cursor.user_sequence(idx)).collect(), + }); + } + + // Advance the cursor. + { + let cursor = cursors[current_winner].as_mut().unwrap(); + cursor.advance(); + if cursor.is_finished() { + // Try to get next non-empty batch from this stream. + // Loop to skip empty batches. + cursors[current_winner] = None; + while let Some(batch_result) = streams[current_winner].next().await { + let batch = batch_result?; + if batch.num_rows() > 0 { + let rows = convert_batch_keys(&batch, &key_indices, &mut row_converter)?; + let buf_idx = batch_buffer.len(); + batch_buffer.push(batch.clone()); + stream_batch_idx[current_winner] = Some(buf_idx); + cursors[current_winner] = Some(SortMergeCursor { batch, rows, offset: 0 }); + break; + } + } + } + } + + // Update loser tree after advancing. + tree.update(|a, b| compare_cursors(&cursors, a, b).then_with(|| a.cmp(&b)).is_gt()); + } + + // Apply merge function to pick the winner row. + // Returns None if the winning row is a DELETE/UPDATE_BEFORE — skip it. + if let Some((win_batch_idx, win_row_idx)) = merge_function.pick_winner(&same_key_rows)? { + output_indices.push((win_batch_idx, win_row_idx)); + } + + // Yield a batch when we've accumulated enough rows. + if output_indices.len() >= batch_size { + let batch = build_output_interleave( + &output_schema, + &batch_buffer, + &output_col_indices, + &output_indices, + )?; + output_indices.clear(); + // Compact batch buffer: only keep batches still referenced by cursors. + // SAFETY: output_indices was just cleared above, so no stale references + // exist into the buffer. The yield below happens after compaction. + compact_batch_buffer( + &mut batch_buffer, + &mut stream_batch_idx, + &cursors, + ); + yield batch; + } + } + + // Yield remaining rows. + if !output_indices.is_empty() { + let batch = build_output_interleave( + &output_schema, + &batch_buffer, + &output_col_indices, + &output_indices, + )?; + yield batch; + } + } + .boxed()) +} + +/// Build an output RecordBatch using `interleave` to gather rows from the +/// batch buffer in one pass per column. +fn build_output_interleave( + schema: &SchemaRef, + batch_buffer: &[RecordBatch], + output_col_indices: &[usize], + indices: &[(usize, usize)], +) -> crate::Result { + let columns: Vec = output_col_indices + .iter() + .map(|&col_idx| { + // Collect all arrays for this column from the batch buffer. + let arrays: Vec<&dyn arrow_array::Array> = batch_buffer + .iter() + .map(|b| b.column(col_idx).as_ref()) + .collect(); + interleave(&arrays, indices).map_err(|e| Error::UnexpectedError { + message: format!("Failed to interleave column {col_idx}: {e}"), + source: Some(Box::new(e)), + }) + }) + .collect::>>()?; + + RecordBatch::try_new(schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to build interleaved RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +/// Compact the batch buffer by removing batches no longer referenced by any +/// cursor, and updating indices accordingly. +fn compact_batch_buffer( + batch_buffer: &mut Vec, + stream_batch_idx: &mut [Option], + cursors: &[Option], +) { + // Collect which buffer indices are still alive (referenced by a cursor). + let mut alive: Vec = vec![false; batch_buffer.len()]; + for (i, cursor) in cursors.iter().enumerate() { + if cursor.is_some() { + if let Some(idx) = stream_batch_idx[i] { + alive[idx] = true; + } + } + } + + // Build old->new index mapping. + let mut new_indices: Vec> = vec![None; batch_buffer.len()]; + let mut new_buffer: Vec = Vec::new(); + for (old_idx, is_alive) in alive.iter().enumerate() { + if *is_alive { + new_indices[old_idx] = Some(new_buffer.len()); + new_buffer.push(batch_buffer[old_idx].clone()); + } + } + + *batch_buffer = new_buffer; + + // Remap stream_batch_idx. + for (i, cursor) in cursors.iter().enumerate() { + if cursor.is_some() { + if let Some(old_idx) = stream_batch_idx[i] { + stream_batch_idx[i] = new_indices[old_idx]; + } + } else { + stream_batch_idx[i] = None; + } + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{Array, Int32Array, Int64Array, Int8Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use futures::TryStreamExt; + use std::sync::Arc; + + fn make_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("_SEQUENCE_NUMBER", DataType::Int64, false), + Field::new("_VALUE_KIND", DataType::Int8, false), + Field::new("value", DataType::Utf8, true), + ])) + } + + fn make_output_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ])) + } + + fn make_batch( + schema: &SchemaRef, + pks: Vec, + seqs: Vec, + values: Vec>, + ) -> RecordBatch { + let len = pks.len(); + make_batch_with_kind(schema, pks, seqs, vec![0i8; len], values) + } + + fn make_batch_with_kind( + schema: &SchemaRef, + pks: Vec, + seqs: Vec, + kinds: Vec, + values: Vec>, + ) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(pks)), + Arc::new(Int64Array::from(seqs)), + Arc::new(Int8Array::from(kinds)), + Arc::new(StringArray::from(values)), + ], + ) + .unwrap() + } + + fn stream_from_batches(batches: Vec) -> ArrowRecordBatchStream { + futures::stream::iter(batches.into_iter().map(Ok)).boxed() + } + + #[tokio::test] + async fn test_loser_tree_basic() { + // 3 streams, verify init produces correct winner + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 3], + vec![1, 1], + vec![Some("a"), Some("c")], + )]); + let s1 = stream_from_batches(vec![make_batch( + &schema, + vec![2, 4], + vec![1, 1], + vec![Some("b"), Some("d")], + )]); + let s2 = stream_from_batches(vec![make_batch(&schema, vec![5], vec![1], vec![Some("e")])]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1, s2], + schema, + vec![0], // key: pk + 1, // seq index + 2, // value_kind index + vec![], // no user sequence fields + vec![3], // value index + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(pks, vec![1, 2, 3, 4, 5]); + } + + #[tokio::test] + async fn test_deduplicate_merge() { + // Two streams with overlapping keys, different sequence numbers + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 2, 3], + vec![1, 1, 1], + vec![Some("old_a"), Some("old_b"), Some("old_c")], + )]); + let s1 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 2, 4], + vec![2, 2, 2], + vec![Some("new_a"), Some("new_b"), Some("new_d")], + )]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, // value_kind index + vec![], // no user sequence fields + vec![3], // value index + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + + assert_eq!(pks, vec![1, 2, 3, 4]); + // key 1,2: newer seq wins; key 3: only in s0; key 4: only in s1 + assert_eq!(values, vec!["new_a", "new_b", "old_c", "new_d"]); + } + + #[tokio::test] + async fn test_empty_streams() { + let schema = make_schema(); + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![], + schema, + vec![0], + 1, + 2, // value_kind index + vec![], // no user sequence fields + vec![3], // value index + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_single_stream_no_duplicates() { + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 2, 3], + vec![1, 1, 1], + vec![Some("a"), Some("b"), Some("c")], + )]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(pks, vec![1, 2, 3]); + } + + #[tokio::test] + async fn test_multi_batch_per_stream() { + let schema = make_schema(); + // Stream 0: two batches + let s0 = stream_from_batches(vec![ + make_batch(&schema, vec![1, 3], vec![1, 1], vec![Some("a"), Some("c")]), + make_batch(&schema, vec![5, 7], vec![1, 1], vec![Some("e"), Some("g")]), + ]); + // Stream 1: two batches + let s1 = stream_from_batches(vec![ + make_batch(&schema, vec![2, 4], vec![1, 1], vec![Some("b"), Some("d")]), + make_batch(&schema, vec![6], vec![1], vec![Some("f")]), + ]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(pks, vec![1, 2, 3, 4, 5, 6, 7]); + } + + #[tokio::test] + async fn test_batch_size_boundary() { + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 2, 3, 4, 5], + vec![1, 1, 1, 1, 1], + vec![Some("a"), Some("b"), Some("c"), Some("d"), Some("e")], + )]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .with_batch_size(2) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Should produce 3 batches: [2, 2, 1] rows + assert_eq!(result.len(), 3); + assert_eq!(result[0].num_rows(), 2); + assert_eq!(result[1].num_rows(), 2); + assert_eq!(result[2].num_rows(), 1); + } + + #[tokio::test] + async fn test_multi_sequence_fields() { + // Schema: pk, _SEQUENCE_NUMBER, _VALUE_KIND, seq1, seq2, value + let schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("_SEQUENCE_NUMBER", DataType::Int64, false), + Field::new("_VALUE_KIND", DataType::Int8, false), + Field::new("seq1", DataType::Int64, false), + Field::new("seq2", DataType::Int64, false), + Field::new("value", DataType::Utf8, true), + ])); + let output_schema = Arc::new(Schema::new(vec![ + Field::new("pk", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ])); + + // pk=1: s0 has (seq1=10, seq2=1), s1 has (seq1=10, seq2=2) → s1 wins (second field higher) + // pk=2: s0 has (seq1=20, seq2=1), s1 has (seq1=10, seq2=99) → s0 wins (first field higher) + let s0 = stream_from_batches(vec![RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int64Array::from(vec![1, 1])), + Arc::new(Int8Array::from(vec![0, 0])), + Arc::new(Int64Array::from(vec![10, 20])), + Arc::new(Int64Array::from(vec![1, 1])), + Arc::new(StringArray::from(vec!["old_a", "winner_b"])), + ], + ) + .unwrap()]); + let s1 = stream_from_batches(vec![RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(Int8Array::from(vec![0, 0])), + Arc::new(Int64Array::from(vec![10, 10])), + Arc::new(Int64Array::from(vec![2, 99])), + Arc::new(StringArray::from(vec!["winner_a", "loser_b"])), + ], + ) + .unwrap()]); + + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], // key: pk + 1, // seq index + 2, // value_kind index + vec![3, 4], // user sequence fields: seq1, seq2 + vec![5], // value index + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + assert_eq!(values, vec!["winner_a", "winner_b"]); + } + + #[tokio::test] + async fn test_delete_row_filtered() { + let schema = make_schema(); + // Stream 0: pk=1 INSERT (seq=1), pk=2 INSERT (seq=1) + let s0 = stream_from_batches(vec![make_batch_with_kind( + &schema, + vec![1, 2], + vec![1, 1], + vec![0, 0], + vec![Some("a"), Some("b")], + )]); + // Stream 1: pk=1 DELETE (seq=2) — should win and be filtered out + let s1 = stream_from_batches(vec![make_batch_with_kind( + &schema, + vec![1], + vec![2], + vec![3], // DELETE + vec![Some("a")], + )]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + // pk=1 deleted, only pk=2 remains + assert_eq!(pks, vec![2]); + } + + #[tokio::test] + async fn test_single_stream_duplicate_keys() { + let schema = make_schema(); + // Single stream with duplicate pk=1 (seq 1 and 2), unique pk=2 + let s0 = stream_from_batches(vec![make_batch( + &schema, + vec![1, 1, 2], + vec![1, 2, 1], + vec![Some("old"), Some("new"), Some("only")], + )]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + + assert_eq!(pks, vec![1, 2]); + assert_eq!(values, vec!["new", "only"]); + } + + #[tokio::test] + async fn test_single_row_per_stream() { + let schema = make_schema(); + let s0 = stream_from_batches(vec![make_batch(&schema, vec![3], vec![1], vec![Some("c")])]); + let s1 = stream_from_batches(vec![make_batch(&schema, vec![1], vec![1], vec![Some("a")])]); + let s2 = stream_from_batches(vec![make_batch(&schema, vec![2], vec![1], vec![Some("b")])]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1, s2], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + + assert_eq!(pks, vec![1, 2, 3]); + assert_eq!(values, vec!["a", "b", "c"]); + } + + /// Helper to create an empty batch with the test schema. + fn make_empty_batch(schema: &SchemaRef) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(Vec::::new())), + Arc::new(Int64Array::from(Vec::::new())), + Arc::new(Int8Array::from(Vec::::new())), + Arc::new(StringArray::from(Vec::>::new())), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_empty_batches_skipped() { + // Regression: empty batches (e.g. from predicate filtering) must be + // skipped, not treated as stream exhaustion. + let schema = make_schema(); + + // Stream 0: empty batch at start, then real data + let s0 = stream_from_batches(vec![ + make_empty_batch(&schema), + make_batch(&schema, vec![1, 3], vec![1, 1], vec![Some("a"), Some("c")]), + ]); + // Stream 1: data, empty batch in the middle, then more data + let s1 = stream_from_batches(vec![ + make_batch(&schema, vec![2], vec![1], vec![Some("b")]), + make_empty_batch(&schema), + make_empty_batch(&schema), + make_batch(&schema, vec![4], vec![1], vec![Some("d")]), + ]); + + let output_schema = make_output_schema(); + let result = SortMergeReaderBuilder::new( + vec![s0, s1], + schema, + vec![0], + 1, + 2, + vec![], + vec![3], + output_schema, + Box::new(DeduplicateMergeFunction), + ) + .build() + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let pks: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = result + .iter() + .flat_map(|b| { + let arr = b.column(1).as_any().downcast_ref::().unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + + assert_eq!(pks, vec![1, 2, 3, 4]); + assert_eq!(values, vec!["a", "b", "c", "d"]); + } +} diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 25b1d942..5aa4b590 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -177,14 +177,11 @@ impl TableCommit { partition: &HashMap>, ) -> Result { let pb = PredicateBuilder::new(&self.table.schema().partition_fields()); - let predicates: Vec = partition + let fields: Vec<(&str, Option)> = partition .iter() - .map(|(key, value)| match value { - Some(v) => pb.equal(key, v.clone()), - None => pb.is_null(key), - }) - .collect::>>()?; - Ok(Predicate::and(predicates)) + .map(|(key, value)| (key.as_str(), value.clone())) + .collect(); + pb.partition_predicate(&fields) } /// Drop specific partitions (OVERWRITE with only deletes). diff --git a/crates/paimon/src/table/table_read.rs b/crates/paimon/src/table/table_read.rs index cab22f9b..47ee360a 100644 --- a/crates/paimon/src/table/table_read.rs +++ b/crates/paimon/src/table/table_read.rs @@ -17,11 +17,12 @@ use super::data_evolution_reader::DataEvolutionReader; use super::data_file_reader::DataFileReader; +use super::kv_file_reader::{KeyValueFileReader, KeyValueReadConfig}; use super::read_builder::split_scan_predicates; use super::{ArrowRecordBatchStream, Table}; use crate::arrow::filtering::reader_pruning_predicates; use crate::spec::{CoreOptions, DataField, Predicate}; -use crate::{DataSplit, Error}; +use crate::DataSplit; /// Table read: reads data from splits (e.g. produced by [TableScan::plan]). /// @@ -76,17 +77,37 @@ impl<'a> TableRead<'a> { let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); let data_evolution = core_options.data_evolution_enabled(); + // PK table without DV: route by merge engine. + // Exhaustive match ensures new MergeEngine variants trigger a compile error. if has_primary_keys && !deletion_vectors_enabled { - return Err(Error::Unsupported { - message: format!( - "Reading primary-key tables without deletion vectors is not yet supported. Primary keys: {:?}", - self.table.schema.primary_keys() - ), - }); + match core_options.merge_engine()? { + crate::spec::MergeEngine::Deduplicate => { + let reader = KeyValueFileReader::new( + self.table.file_io.clone(), + KeyValueReadConfig { + schema_manager: self.table.schema_manager().clone(), + table_schema_id: self.table.schema().id(), + table_fields: self.table.schema.fields().to_vec(), + read_type: self.read_type().to_vec(), + predicates: self.data_predicates.clone(), + primary_keys: self.table.schema.primary_keys().to_vec(), + sequence_fields: core_options + .sequence_fields() + .iter() + .map(|s| s.to_string()) + .collect(), + }, + ); + return reader.read(data_splits); + } + crate::spec::MergeEngine::FirstRow => { + // Fall through to DataFileReader — scan already skips level-0 + } + } } + // PK table with DV or append-only table: DataFileReader / DataEvolutionReader if data_evolution { - // TODO: data evolution mode does not support read-side predicate pruning yet. let reader = DataEvolutionReader::new( self.table.file_io.clone(), self.table.schema_manager().clone(), diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index a21b214f..3312375a 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -126,7 +126,7 @@ fn manifest_partition_predicate_may_match( /// Reads all manifest entries for a snapshot (base + delta manifest lists, then each manifest file). /// Applies filters during concurrent manifest reading to reduce entries early: /// - Manifest-file-level partition stats pruning (skip entire manifest files) -/// - DV level-0 filtering per entry +/// - Level-0 filtering per entry (DV mode or FirstRow engine) /// - Partition predicate filtering per entry /// - Data-level stats pruning per entry (current schema only, cross-schema fail-open) #[allow(clippy::too_many_arguments)] @@ -134,7 +134,7 @@ async fn read_all_manifest_entries( file_io: &FileIO, table_path: &str, snapshot: &Snapshot, - deletion_vectors_enabled: bool, + skip_level_zero: bool, has_primary_keys: bool, partition_predicate: Option<&Predicate>, partition_fields: &[DataField], @@ -170,7 +170,7 @@ async fn read_all_manifest_entries( let filtered: Vec = entries .into_iter() .filter(|entry| { - if deletion_vectors_enabled && has_primary_keys && entry.file().level == 0 { + if skip_level_zero && has_primary_keys && entry.file().level == 0 { return false; } if has_primary_keys && entry.bucket() < 0 { @@ -461,6 +461,21 @@ impl<'a> TableScan<'a> { let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); let data_evolution_enabled = core_options.data_evolution_enabled(); + let has_primary_keys = !self.table.schema().primary_keys().is_empty(); + + // Skip level-0 files for PK tables when: + // - DV mode: level-0 files are unmerged, DV handles dedup at higher levels + // - FirstRow engine without DV: reads go through DataFileReader (no merge), + // so only compacted (level > 0) files are safe to read directly + let skip_level_zero = if has_primary_keys { + deletion_vectors_enabled + || core_options + .merge_engine() + .is_ok_and(|e| e == crate::spec::MergeEngine::FirstRow) + } else { + false + }; + let partition_fields = self.table.schema().partition_fields(); let pushdown_data_predicates = if data_evolution_enabled { @@ -469,8 +484,6 @@ impl<'a> TableScan<'a> { self.data_predicates.as_slice() }; - let has_primary_keys = !self.table.schema().primary_keys().is_empty(); - let bucket_key_fields: Vec = if self.bucket_predicate.is_none() || !core_options.is_default_bucket_function() { Vec::new() @@ -504,7 +517,7 @@ impl<'a> TableScan<'a> { file_io, table_path, snapshot, - deletion_vectors_enabled, + skip_level_zero, has_primary_keys, self.partition_predicate.as_ref(), &partition_fields, diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 3d3cbf28..728209d8 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -20,14 +20,16 @@ //! Reference: [pypaimon TableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py) //! and [pypaimon FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py) +use crate::spec::DataFileMeta; use crate::spec::PartitionComputer; use crate::spec::{ extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataType, Datum, - EMPTY_SERIALIZED_ROW, + MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW, }; use crate::table::commit_message::CommitMessage; use crate::table::data_file_writer::DataFileWriter; -use crate::table::Table; +use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig}; +use crate::table::{SnapshotManager, Table, TableScan}; use crate::Result; use arrow_array::RecordBatch; use std::collections::HashMap; @@ -41,9 +43,31 @@ fn schema_contains_blob_type(fields: &[DataField]) -> bool { .any(|field| field.data_type().contains_blob_type()) } +/// Enum to hold either an append-only writer or a key-value writer. +enum FileWriter { + Append(DataFileWriter), + KeyValue(KeyValueFileWriter), +} + +impl FileWriter { + async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + match self { + FileWriter::Append(w) => w.write(batch).await, + FileWriter::KeyValue(w) => w.write(batch).await, + } + } + + async fn prepare_commit(mut self) -> Result> { + match self { + FileWriter::Append(ref mut w) => w.prepare_commit().await, + FileWriter::KeyValue(ref mut w) => w.prepare_commit().await, + } + } +} + /// TableWrite writes Arrow RecordBatches to Paimon data files. /// -/// Each (partition, bucket) pair gets its own `DataFileWriter` held in a HashMap. +/// Each (partition, bucket) pair gets its own writer held in a HashMap. /// Batches are routed to the correct writer based on partition/bucket. /// /// Call `prepare_commit()` to close all writers and collect @@ -52,7 +76,7 @@ fn schema_contains_blob_type(fields: &[DataField]) -> bool { /// Reference: [pypaimon BatchTableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py) pub struct TableWrite { table: Table, - partition_writers: HashMap, + partition_writers: HashMap, partition_computer: PartitionComputer, partition_keys: Vec, partition_field_indices: Vec, @@ -63,6 +87,18 @@ pub struct TableWrite { file_compression: String, file_compression_zstd_level: i32, write_buffer_size: i64, + /// Primary key column indices in the user schema (empty for append-only). + primary_key_indices: Vec, + /// Paimon DataTypes for each primary key column (same order as primary_key_indices). + primary_key_types: Vec, + /// Sequence field column indices in the user schema (empty if not configured). + sequence_field_indices: Vec, + /// Merge engine for primary-key tables. + merge_engine: MergeEngine, + /// Column index in user schema for row kind (resolved from rowkind.field or _VALUE_KIND). + value_kind_col_index: Option, + /// Cache of per-partition bucket→max_sequence_number, lazily populated on first write. + partition_seq_cache: HashMap, HashMap>, } impl TableWrite { @@ -78,14 +114,35 @@ impl TableWrite { }); } - if !schema.primary_keys().is_empty() { + if core_options.data_evolution_enabled() { return Err(crate::Error::Unsupported { - message: "TableWrite does not support tables with primary keys".to_string(), + message: "TableWrite does not support data-evolution.enabled mode".to_string(), }); } let total_buckets = core_options.bucket(); - if total_buckets != -1 && core_options.bucket_key().is_none() { + let has_primary_keys = !schema.primary_keys().is_empty(); + + if has_primary_keys { + if total_buckets < 1 { + return Err(crate::Error::Unsupported { + message: format!( + "KeyValueFileWriter does not support bucket={total_buckets}, only fixed bucket (>= 1) is supported" + ), + }); + } + if core_options + .changelog_producer() + .eq_ignore_ascii_case("input") + { + return Err(crate::Error::Unsupported { + message: "KeyValueFileWriter does not support changelog-producer=input" + .to_string(), + }); + } + } + + if !has_primary_keys && total_buckets != -1 && core_options.bucket_key().is_none() { return Err(crate::Error::Unsupported { message: "Append tables with fixed bucket must configure 'bucket-key'".to_string(), }); @@ -118,6 +175,36 @@ impl TableWrite { ) .unwrap(); + let primary_key_indices: Vec = schema + .primary_keys() + .iter() + .filter_map(|pk| fields.iter().position(|f| f.name() == pk)) + .collect(); + + let primary_key_types: Vec = primary_key_indices + .iter() + .map(|&idx| fields[idx].data_type().clone()) + .collect(); + + let sequence_field_indices: Vec = core_options + .sequence_fields() + .iter() + .filter_map(|sf| fields.iter().position(|f| f.name() == *sf)) + .collect(); + + let merge_engine = core_options.merge_engine()?; + + if has_primary_keys && core_options.rowkind_field().is_some() { + return Err(crate::Error::Unsupported { + message: "KeyValueFileWriter does not support rowkind.field".to_string(), + }); + } + + // Resolve value_kind column from _VALUE_KIND in user schema, if present. + let value_kind_col_index = fields + .iter() + .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); + Ok(Self { table: table.clone(), partition_writers: HashMap::new(), @@ -131,9 +218,63 @@ impl TableWrite { file_compression, file_compression_zstd_level, write_buffer_size, + primary_key_indices, + primary_key_types, + sequence_field_indices, + merge_engine, + value_kind_col_index, + partition_seq_cache: HashMap::new(), }) } + /// Scan the latest snapshot for a specific partition and return a map of + /// bucket → (max_sequence_number + 1) for each bucket in that partition. + async fn scan_partition_sequence_numbers( + table: &Table, + partition_bytes: &[u8], + ) -> crate::Result> { + let snapshot_manager = + SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let latest_snapshot = snapshot_manager.get_latest_snapshot().await?; + let mut bucket_seq: HashMap = HashMap::new(); + if let Some(snapshot) = latest_snapshot { + let partition_predicate = Self::build_partition_predicate(table, partition_bytes)?; + let scan = TableScan::new(table, partition_predicate, vec![], None, None, None); + let entries = scan.plan_manifest_entries(&snapshot).await?; + for entry in &entries { + let bucket = entry.bucket(); + let max_seq = entry.file().max_sequence_number; + let current = bucket_seq.entry(bucket).or_insert(0); + if max_seq + 1 > *current { + *current = max_seq + 1; + } + } + } + Ok(bucket_seq) + } + + /// Build a partition predicate from serialized partition bytes. + fn build_partition_predicate( + table: &Table, + partition_bytes: &[u8], + ) -> crate::Result> { + let partition_fields = table.schema().partition_fields(); + if partition_fields.is_empty() { + return Ok(None); + } + let partition_row = BinaryRow::from_serialized_bytes(partition_bytes)?; + let fields: Vec<(&str, Option)> = partition_fields + .iter() + .enumerate() + .map(|(pos, field)| { + let datum = partition_row.get_datum(pos, field.data_type())?; + Ok((field.name(), datum)) + }) + .collect::>>()?; + let pred_builder = PredicateBuilder::new(table.schema().fields()); + Ok(Some(pred_builder.partition_predicate(&fields)?)) + } + /// Write an Arrow RecordBatch. Rows are routed to the correct partition and bucket. pub async fn write_arrow_batch(&mut self, batch: &RecordBatch) -> Result<()> { if batch.num_rows() == 0 { @@ -199,7 +340,7 @@ impl TableWrite { Ok(result) } - /// Write a batch directly to the DataFileWriter for the given (partition, bucket). + /// Write a batch directly to the writer for the given (partition, bucket). async fn write_bucket( &mut self, partition_bytes: Vec, @@ -208,7 +349,7 @@ impl TableWrite { ) -> Result<()> { let key = (partition_bytes, bucket); if !self.partition_writers.contains_key(&key) { - self.create_writer(key.0.clone(), key.1)?; + self.create_writer(key.0.clone(), key.1).await?; } let writer = self.partition_writers.get_mut(&key).unwrap(); writer.write(&batch).await @@ -225,12 +366,12 @@ impl TableWrite { /// Close all writers and collect CommitMessages for use with TableCommit. /// Writers are cleared after this call, allowing the TableWrite to be reused. pub async fn prepare_commit(&mut self) -> Result> { - let writers: Vec<(PartitionBucketKey, DataFileWriter)> = + let writers: Vec<(PartitionBucketKey, FileWriter)> = self.partition_writers.drain().collect(); let futures: Vec<_> = writers .into_iter() - .map(|((partition_bytes, bucket), mut writer)| async move { + .map(|((partition_bytes, bucket), writer)| async move { let files = writer.prepare_commit().await?; Ok::<_, crate::Error>((partition_bytes, bucket, files)) }) @@ -287,7 +428,7 @@ impl TableWrite { Ok((partition_bytes, bucket)) } - fn create_writer(&mut self, partition_bytes: Vec, bucket: i32) -> Result<()> { + async fn create_writer(&mut self, partition_bytes: Vec, bucket: i32) -> Result<()> { let partition_path = if self.partition_keys.is_empty() { String::new() } else { @@ -295,20 +436,55 @@ impl TableWrite { self.partition_computer.generate_partition_path(&row)? }; - let writer = DataFileWriter::new( - self.table.file_io().clone(), - self.table.location().to_string(), - partition_path, - bucket, - self.schema_id, - self.target_file_size, - self.file_compression.clone(), - self.file_compression_zstd_level, - self.write_buffer_size, - Some(0), // file_source: APPEND - None, // first_row_id: assigned by commit - None, // write_cols: full-row write - ); + let writer = if self.primary_key_indices.is_empty() { + FileWriter::Append(DataFileWriter::new( + self.table.file_io().clone(), + self.table.location().to_string(), + partition_path, + bucket, + self.schema_id, + self.target_file_size, + self.file_compression.clone(), + self.file_compression_zstd_level, + self.write_buffer_size, + Some(0), // file_source: APPEND + None, // first_row_id: assigned by commit + None, // write_cols: full-row write + )) + } else { + // Lazily scan partition sequence numbers on first writer creation per partition. + if !self.partition_seq_cache.contains_key(&partition_bytes) { + let bucket_seq = + Self::scan_partition_sequence_numbers(&self.table, &partition_bytes).await?; + self.partition_seq_cache + .insert(partition_bytes.clone(), bucket_seq); + } + let next_seq = self + .partition_seq_cache + .get(&partition_bytes) + .and_then(|m| m.get(&bucket)) + .copied() + .unwrap_or(0); + + FileWriter::KeyValue(KeyValueFileWriter::new( + self.table.file_io().clone(), + KeyValueWriteConfig { + table_location: self.table.location().to_string(), + partition_path, + bucket, + schema_id: self.schema_id, + file_compression: self.file_compression.clone(), + file_compression_zstd_level: self.file_compression_zstd_level, + write_buffer_size: self.write_buffer_size, + primary_key_indices: self.primary_key_indices.clone(), + primary_key_types: self.primary_key_types.clone(), + sequence_field_indices: self.sequence_field_indices.clone(), + merge_engine: self.merge_engine, + value_kind_col_index: self.value_kind_col_index, + }, + next_seq, + )) + }; self.partition_writers .insert((partition_bytes, bucket), writer); @@ -859,4 +1035,203 @@ mod tests { let total_rows: i64 = messages[0].new_files.iter().map(|f| f.row_count).sum(); assert_eq!(total_rows, 4); } + + // ----------------------------------------------------------------------- + // Primary-key table write tests + // ----------------------------------------------------------------------- + + fn test_pk_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "1") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_pk_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_pk_table"), + table_path.to_string(), + test_pk_schema(), + None, + ) + } + + #[tokio::test] + async fn test_pk_write_and_commit() { + let file_io = test_file_io(); + let table_path = "memory:/test_pk_write"; + setup_dirs(&file_io, table_path).await; + + let table = test_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 1); + + let file = &messages[0].new_files[0]; + assert_eq!(file.row_count, 3); + assert_eq!(file.level, 0); + assert_eq!(file.min_sequence_number, 0); + assert_eq!(file.max_sequence_number, 2); + // min_key and max_key should be non-empty (serialized BinaryRow) + assert!(!file.min_key.is_empty()); + assert!(!file.max_key.is_empty()); + + // Commit + let commit = TableCommit::new(table.clone(), "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(3)); + } + + #[tokio::test] + async fn test_pk_write_sorted_output() { + let file_io = test_file_io(); + let table_path = "memory:/test_pk_sorted"; + setup_dirs(&file_io, table_path).await; + + let table = test_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + // Write unsorted data + let batch = make_batch(vec![5, 2, 4, 1, 3], vec![50, 20, 40, 10, 30]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + let commit = TableCommit::new(table.clone(), "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + // Read back using sort-merge reader — should be sorted by PK + let rb = table.new_read_builder(); + let scan = rb.new_scan(); + let plan = scan.plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let batches: Vec = + futures::TryStreamExt::try_collect(read.to_arrow(plan.splits()).unwrap()) + .await + .unwrap(); + + let ids: Vec = batches + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(ids, vec![1, 2, 3, 4, 5]); + + let values: Vec = batches + .iter() + .flat_map(|b| { + b.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(values, vec![10, 20, 30, 40, 50]); + } + + #[tokio::test] + async fn test_pk_write_dedup_across_commits() { + let file_io = test_file_io(); + let table_path = "memory:/test_pk_dedup"; + setup_dirs(&file_io, table_path).await; + + let table = test_pk_table(&file_io, table_path); + + // First commit: id=1,2,3 + let mut tw1 = TableWrite::new(&table).unwrap(); + tw1.write_arrow_batch(&make_batch(vec![1, 2, 3], vec![10, 20, 30])) + .await + .unwrap(); + let msgs1 = tw1.prepare_commit().await.unwrap(); + let commit = TableCommit::new(table.clone(), "test-user".to_string()); + commit.commit(msgs1).await.unwrap(); + + // Second commit: id=2,3,4 with updated values, higher sequence numbers + let mut tw2 = TableWrite::new(&table).unwrap(); + tw2.write_arrow_batch(&make_batch(vec![2, 3, 4], vec![200, 300, 400])) + .await + .unwrap(); + let msgs2 = tw2.prepare_commit().await.unwrap(); + commit.commit(msgs2).await.unwrap(); + + // Read back — dedup should keep newer values for id=2,3 + let rb = table.new_read_builder(); + let scan = rb.new_scan(); + let plan = scan.plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let batches: Vec = + futures::TryStreamExt::try_collect(read.to_arrow(plan.splits()).unwrap()) + .await + .unwrap(); + + let ids: Vec = batches + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let values: Vec = batches + .iter() + .flat_map(|b| { + b.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + + assert_eq!(ids, vec![1, 2, 3, 4]); + assert_eq!(values, vec![10, 200, 300, 400]); + } + + #[tokio::test] + async fn test_pk_write_sequence_number_in_file() { + let file_io = test_file_io(); + let table_path = "memory:/test_pk_seq"; + setup_dirs(&file_io, table_path).await; + + let table = test_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + let batch = make_batch(vec![1, 2], vec![10, 20]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + let file = &messages[0].new_files[0]; + // Fresh table, seq starts at 0, 2 rows → min=0, max=1 + assert_eq!(file.min_sequence_number, 0); + assert_eq!(file.max_sequence_number, 1); + } } diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs index 6feda239..57a4820c 100644 --- a/crates/paimon/src/table/write_builder.rs +++ b/crates/paimon/src/table/write_builder.rs @@ -45,6 +45,9 @@ impl<'a> WriteBuilder<'a> { } /// Create a new TableWrite for writing Arrow data. + /// + /// For primary-key tables, sequence numbers are lazily scanned per partition + /// when the first writer for that partition is created. pub fn new_write(&self) -> crate::Result { TableWrite::new(self.table) } diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 82483e21..2323b54a 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -790,5 +790,42 @@ def main(): ) + # ===== First-Row merge engine PK table ===== + # first-row keeps the earliest inserted row per key; later duplicates are ignored. + # After compaction, level-0 files are promoted so the batch reader (which skips + # level-0 for first-row) can see the compacted data. + spark.sql( + """ + CREATE TABLE IF NOT EXISTS first_row_pk_table ( + id INT, + name STRING + ) USING paimon + TBLPROPERTIES ( + 'primary-key' = 'id', + 'bucket' = '1', + 'merge-engine' = 'first-row' + ) + """ + ) + spark.sql( + """ + INSERT INTO first_row_pk_table VALUES + (1, 'alice'), + (2, 'bob'), + (3, 'carol') + """ + ) + spark.sql( + """ + INSERT INTO first_row_pk_table VALUES + (2, 'bob-v2'), + (3, 'carol-v2'), + (4, 'dave') + """ + ) + # Compact to promote level-0 files so the batch reader can see them. + spark.sql("CALL sys.compact('default.first_row_pk_table')") + + if __name__ == "__main__": main() From 7ede3c99703975c5a2e512601efb8a296bc1ab6a Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 16 Apr 2026 11:53:11 +0800 Subject: [PATCH 2/3] Fix comment --- .../datafusion/tests/pk_tables.rs | 112 +++++++++++++++++- crates/paimon/src/table/table_commit.rs | 6 +- crates/paimon/src/table/table_scan.rs | 21 +++- crates/paimon/src/table/table_write.rs | 3 +- 4 files changed, 137 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index c93f5e29..629c13b6 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -25,7 +25,8 @@ use std::sync::Arc; use datafusion::arrow::array::{Int32Array, StringArray}; use datafusion::prelude::SessionContext; -use paimon::{CatalogOptions, FileSystemCatalog, Options}; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler}; use tempfile::TempDir; @@ -1146,3 +1147,112 @@ async fn test_pk_multiple_value_columns() { ] ); } + +// ======================= FirstRow Engine: INSERT OVERWRITE ======================= + +/// INSERT OVERWRITE on a partitioned FirstRow-engine PK table should delete +/// level-0 files. Before the fix, `skip_level_zero` was applied in the overwrite +/// scan path, causing level-0 files to survive the overwrite. +/// +/// Verifies via TableScan (scan_all_files) that the overwrite correctly produces +/// delete entries for level-0 files, leaving only the new file per partition. +#[tokio::test] +async fn test_pk_first_row_insert_overwrite() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + + handler + .sql( + "CREATE TABLE paimon.test_db.t_fr_ow ( + dt STRING, id INT NOT NULL, name STRING, + PRIMARY KEY (dt, id) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '1', 'merge-engine' = 'first-row')", + ) + .await + .unwrap(); + + // First commit: two partitions, creates level-0 files + handler + .sql( + "INSERT INTO paimon.test_db.t_fr_ow VALUES \ + ('2024-01-01', 1, 'alice'), ('2024-01-01', 2, 'bob'), \ + ('2024-01-02', 3, 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Verify via scan_all_files: 2 level-0 files (one per partition) + let table = catalog + .get_table(&Identifier::new("test_db", "t_fr_ow")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!(file_count, 2, "After INSERT: 2 level-0 files (one per partition)"); + + // INSERT OVERWRITE partition 2024-01-01 — must delete old level-0 file + handler + .sql("INSERT OVERWRITE paimon.test_db.t_fr_ow VALUES ('2024-01-01', 10, 'new_alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_fr_ow")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!( + file_count, 2, + "After OVERWRITE: 2 files (1 replaced for 2024-01-01 + 1 unchanged for 2024-01-02)" + ); + + // Second overwrite on the same partition — no stale files should accumulate + handler + .sql("INSERT OVERWRITE paimon.test_db.t_fr_ow VALUES ('2024-01-01', 20, 'newer_alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_fr_ow")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!( + file_count, 2, + "After second OVERWRITE: still 2 files (no stale level-0 files accumulated)" + ); +} diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 5aa4b590..5a91ee87 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -477,7 +477,8 @@ impl TableCommit { None, None, None, - ); + ) + .with_scan_all_files(); let current_entries = scan.plan_manifest_entries(snap).await?; for entry in current_entries { entries.push(entry.with_kind(FileKind::Delete)); @@ -567,7 +568,8 @@ impl TableCommit { }; // Read all current files from the latest snapshot. - let scan = TableScan::new(&self.table, None, vec![], None, None, None); + let scan = + TableScan::new(&self.table, None, vec![], None, None, None).with_scan_all_files(); let existing_entries = scan.plan_manifest_entries(snap).await?; // Build index: (partition, bucket, first_row_id, row_count) diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 3312375a..9b2a7e50 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -308,6 +308,10 @@ pub struct TableScan<'a> { /// When set, the scan will try to return only enough splits to satisfy the limit. limit: Option, row_ranges: Option>, + /// When true, disables level-0 filtering so all files are visible. + /// Used by non-read paths (overwrite, truncate, writer restore) that need + /// the complete file set. Normal read scans leave this as `false`. + scan_all_files: bool, } impl<'a> TableScan<'a> { @@ -326,9 +330,19 @@ impl<'a> TableScan<'a> { bucket_predicate, limit, row_ranges, + scan_all_files: false, } } + /// Disable level-0 filtering so all files are visible. + /// + /// Used by non-read paths (overwrite, truncate, writer restore) that need + /// the complete file set regardless of merge engine or DV settings. + pub fn with_scan_all_files(mut self) -> Self { + self.scan_all_files = true; + self + } + /// Set row ranges for scan-time filtering. /// /// This replaces any existing row_ranges. Typically used to inject @@ -467,7 +481,12 @@ impl<'a> TableScan<'a> { // - DV mode: level-0 files are unmerged, DV handles dedup at higher levels // - FirstRow engine without DV: reads go through DataFileReader (no merge), // so only compacted (level > 0) files are safe to read directly - let skip_level_zero = if has_primary_keys { + // + // Non-read paths (overwrite, truncate, writer restore) set scan_all_files=true + // to see all files including level-0, matching Java's CommitScanner behavior. + let skip_level_zero = if self.scan_all_files { + false + } else if has_primary_keys { deletion_vectors_enabled || core_options .merge_engine() diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 728209d8..77b8e97d 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -239,7 +239,8 @@ impl TableWrite { let mut bucket_seq: HashMap = HashMap::new(); if let Some(snapshot) = latest_snapshot { let partition_predicate = Self::build_partition_predicate(table, partition_bytes)?; - let scan = TableScan::new(table, partition_predicate, vec![], None, None, None); + let scan = TableScan::new(table, partition_predicate, vec![], None, None, None) + .with_scan_all_files(); let entries = scan.plan_manifest_entries(&snapshot).await?; for entry in &entries { let bucket = entry.bucket(); From dc9471d099e14d07dd5e3988098c7cb893e6304e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 16 Apr 2026 12:07:38 +0800 Subject: [PATCH 3/3] minor --- crates/integrations/datafusion/tests/pk_tables.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index 629c13b6..753963d1 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -1202,7 +1202,10 @@ async fn test_pk_first_row_insert_overwrite() { .await .unwrap(); let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); - assert_eq!(file_count, 2, "After INSERT: 2 level-0 files (one per partition)"); + assert_eq!( + file_count, 2, + "After INSERT: 2 level-0 files (one per partition)" + ); // INSERT OVERWRITE partition 2024-01-01 — must delete old level-0 file handler