From c8f7e9e4e42deb16ff4c252a289485d12e50e10a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 7 May 2026 17:11:33 +0800 Subject: [PATCH 1/2] [core] fix inaccurate record count in snapshot for data-evolution tables --- .../apache/paimon/operation/FileStoreCommitImpl.java | 10 ++++++++-- .../apache/paimon/table/DataEvolutionTableTest.java | 9 +++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 2e931d16cf73..945fa27fd555 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -932,8 +932,14 @@ CommitResult tryCommitOnce( deltaFiles = assigned.assignedEntries; } - // the added records subtract the deleted records from - long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); + long deltaRecordCount; + if (options.dataEvolutionEnabled()) { + // for data evolution table, we use row id to track the inserted records num + deltaRecordCount = nextRowIdStart - firstRowIdStart; + } else { + // the added records subtract the deleted records from + deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); + } long totalRecordCount = previousTotalRecordCount + deltaRecordCount; // write new delta files into manifest files diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index 118a4fdefede..b7bfe07ea6a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -102,6 +102,11 @@ public void testBasic() throws Exception { assertThat(noMergedRowCount).isEqualTo(2); assertThat(mergedRowCount).isEqualTo(1); + // assert record count tracked by snapshot. The second commit reuses rowId 0, so + // totalRecordCount should be 1 instead of 2. + assertThat(getTableDefault().snapshotManager().latestSnapshot().totalRecordCount()) + .isEqualTo(1L); + RecordReader reader = readBuilder.newRead().createReader(plan); assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); reader.forEachRemaining( @@ -951,6 +956,10 @@ public void testCompact() throws Exception { assertThat(entries.size()).isEqualTo(1); assertThat(entries.get(0).file().nonNullFirstRowId()).isEqualTo(0); assertThat(entries.get(0).file().rowCount()).isEqualTo(500000L); + + // assert record count tracked by snapshot. Compact commit should not introduce new rowIds, + // and totalRecordCount should still match the total inserted records (5 * 100000). + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(500000L); } @Test From 0bbfc23192f8768f5be193292f394b38866c6584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 7 May 2026 19:35:40 +0800 Subject: [PATCH 2/2] fix for insert overwrite & drop partition --- .../paimon/operation/FileStoreCommitImpl.java | 7 +- .../commit/DataEvolutionCommitUtils.java | 66 ++++++++++ .../paimon/table/DataEvolutionTableTest.java | 123 ++++++++++++++++++ .../paimon/table/DataEvolutionTestBase.java | 11 ++ 4 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/commit/DataEvolutionCommitUtils.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 945fa27fd555..3c8c4dfdfeb1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -98,6 +98,7 @@ import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount; import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd; import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; +import static org.apache.paimon.operation.commit.DataEvolutionCommitUtils.dataEvolutionDeltaRecordCount; import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions; import static org.apache.paimon.operation.commit.RowTrackingCommitUtils.assignRowTracking; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; @@ -934,8 +935,10 @@ CommitResult tryCommitOnce( long deltaRecordCount; if (options.dataEvolutionEnabled()) { - // for data evolution table, we use row id to track the inserted records num - deltaRecordCount = nextRowIdStart - firstRowIdStart; + // inserted rows count can be calculated by nextRowId + deltaRecordCount = + dataEvolutionDeltaRecordCount( + deltaFiles, nextRowIdStart - firstRowIdStart, commitKind); } else { // the added records subtract the deleted records from deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/DataEvolutionCommitUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/DataEvolutionCommitUtils.java new file mode 100644 index 000000000000..4d23621f7509 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/DataEvolutionCommitUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.utils.RangeHelper; + +import java.util.List; +import java.util.stream.Collectors; + +/** Utils for data evolution commit. */ +public class DataEvolutionCommitUtils { + + /** + * To calculate the delta record count of data evolution. We cannot determine row count changes + * based solely on delta files. For example, delta files generated by INSERT OVERWRITE and + * compaction operations both contain ADD and DELETE records. + */ + public static long dataEvolutionDeltaRecordCount( + List deltaFiles, long insertedRowNum, Snapshot.CommitKind commitKind) { + if (commitKind == Snapshot.CommitKind.COMPACT) { + return 0; + } + + // Delete files could be produced by INSERT OVERWRITE and Drop partition operations. + // When reaches here, delete files should not contain partial records. + return insertedRowNum - dataEvolutionRecordCount(deltaFiles, FileKind.DELETE); + } + + public static long dataEvolutionRecordCount( + List manifestEntries, FileKind kind) { + List entries = + manifestEntries.stream().filter(e -> e.kind() == kind).collect(Collectors.toList()); + + long sum = 0L; + RangeHelper rangeHelper = + new RangeHelper<>(entry -> entry.file().nonNullRowIdRange()); + List> ranges = rangeHelper.mergeOverlappingRanges(entries); + for (List group : ranges) { + long maxCount = 0; + for (ManifestEntry file : group) { + maxCount = Math.max(maxCount, file.rowCount()); + } + sum += maxCount; + } + return sum; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index b7bfe07ea6a9..72fa15f336dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -54,8 +54,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -1005,4 +1007,125 @@ public void testIndexPath() throws Exception { null)); assertThat(path4.toString()).isEqualTo(testExternalpath2); } + + @Test + public void testInsertOverwrite() throws Exception { + Schema schema = schemaWithPartition(); + catalog.createTable(identifier(), schema, true); + FileStoreTable table = getTableDefault(); + + // Write 10 rows to partition "a" + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite()) { + for (int i = 0; i < 10; i++) { + write.write( + GenericRow.of( + i, BinaryString.fromString("v" + i), BinaryString.fromString("a"))); + } + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(10L); + + // Write 5 rows to partition "b" + builder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite()) { + for (int i = 0; i < 5; i++) { + write.write( + GenericRow.of( + i + 10, + BinaryString.fromString("v" + (i + 10)), + BinaryString.fromString("b"))); + } + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(15L); + + // INSERT OVERWRITE partition "a" with 3 new rows + Map overwritePartition = new HashMap<>(); + overwritePartition.put("pt", "a"); + BatchWriteBuilder overwriteBuilder = + table.newBatchWriteBuilder().withOverwrite(overwritePartition); + try (BatchTableWrite write = overwriteBuilder.newWrite()) { + for (int i = 0; i < 3; i++) { + write.write( + GenericRow.of( + i + 100, + BinaryString.fromString("new" + i), + BinaryString.fromString("a"))); + } + try (BatchTableCommit commit = overwriteBuilder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + + // totalRecordCount should be: 15 (previous) - 10 (deleted from partition "a") + 3 (new) = 8 + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(8L); + + // Verify data: partition "a" should have 3 rows, partition "b" should have 5 rows + ReadBuilder readBuilder = table.newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger rowCount = new AtomicInteger(0); + reader.forEachRemaining(r -> rowCount.incrementAndGet()); + assertThat(rowCount.get()).isEqualTo(8); + } + + @Test + public void testDropPartition() throws Exception { + Schema schema = schemaWithPartition(); + catalog.createTable(identifier(), schema, true); + FileStoreTable table = getTableDefault(); + + // Write 10 rows to partition "a" + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite()) { + for (int i = 0; i < 10; i++) { + write.write( + GenericRow.of( + i, BinaryString.fromString("v" + i), BinaryString.fromString("a"))); + } + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(10L); + + // Write 5 rows to partition "b" + builder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = builder.newWrite()) { + for (int i = 0; i < 5; i++) { + write.write( + GenericRow.of( + i + 10, + BinaryString.fromString("v" + (i + 10)), + BinaryString.fromString("b"))); + } + try (BatchTableCommit commit = builder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(15L); + + // Drop partition "a" + Map partitionToDrop = new HashMap<>(); + partitionToDrop.put("pt", "a"); + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { + commit.truncatePartitions(Collections.singletonList(partitionToDrop)); + } + + // totalRecordCount should be: 15 (previous) - 10 (deleted from partition "a") = 5 + assertThat(table.snapshotManager().latestSnapshot().totalRecordCount()).isEqualTo(5L); + + // Verify data: only partition "b" should remain with 5 rows + ReadBuilder readBuilder = table.newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + AtomicInteger rowCount = new AtomicInteger(0); + reader.forEachRemaining(r -> rowCount.incrementAndGet()); + assertThat(rowCount.get()).isEqualTo(5); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTestBase.java index aeb3912b183f..8c6998aa39b1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTestBase.java @@ -50,6 +50,17 @@ protected Schema schemaDefault() { return schemaBuilder.build(); } + protected Schema schemaWithPartition() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("pt", DataTypes.STRING()); + schemaBuilder.partitionKeys("pt"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + return schemaBuilder.build(); + } + protected void write(long count) throws Exception { createTableDefault();