From 5e84445ab25fc76d4d2935da3318acb46a5fa43a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sat, 16 May 2026 22:23:18 +0800 Subject: [PATCH 1/4] [core] Add test reproducing branch snapshot loader mismatch (Frankfurt regression) When a REST catalog returns the main table's snapshot for a branch query, SnapshotManager.latestSnapshotId() returns the wrong snapshot ID. This causes a mismatch between latestSnapshotId() (from REST) and safelyGetAllSnapshots() (from filesystem), which is the root cause of the Frankfurt eu-central-1 "safelyGetAllSnapshots did not include latest snapshot" error on tables with branches. The test verifies: 1. The mismatch when a buggy REST loader returns the main table's snapshot 2. No mismatch when the loader correctly returns the branch's snapshot 3. copyWithBranch correctly propagates the loader Co-Authored-By: Claude Opus 4.6 --- .../SnapshotManagerBranchLoaderTest.java | 346 ++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java new file mode 100644 index 000000000000..0f58a1d775de --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.Instant; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Reproduces the Frankfurt eu-central-1 regression: when a Paimon table has branches and the REST + * catalog (Bennett) returns the main table's snapshot for a branch query, {@link + * SnapshotManager#latestSnapshotId()} returns a snapshot ID that does not exist in the branch's + * snapshot directory. Any caller that compares {@code latestSnapshotId()} against the actual + * snapshots on disk (e.g. Morax orphan-clean validation) will fail with "safelyGetAllSnapshots did + * not include latest snapshot". + * + *

Root cause: the REST server may not correctly resolve branch-qualified table identifiers + * (e.g., {@code table$branch_cluster}) and instead returns the main table's snapshot. Paimon's + * {@link SnapshotManager#latestSnapshotId()} trusts the {@link SnapshotLoader} result without + * cross-checking the filesystem. + * + *

Workaround: for non-main branches, use {@link + * SnapshotManager#latestSnapshotIdFromFileSystem()} to bypass the REST catalog entirely. + */ +public class SnapshotManagerBranchLoaderTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path tablePath; + private FileStoreTable table; + private TableWriteImpl write; + private TableCommitImpl commit; + private long incrementalIdentifier; + + @BeforeEach + public void setUp() throws Exception { + fileIO = LocalFileIO.create(); + tablePath = new Path(tempDir.toString()); + + RowType rowType = + RowType.of( + new org.apache.paimon.types.DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.STRING() + }, + new String[] {"pk", "part", "value"}); + + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + options.set(CoreOptions.BUCKET, 1); + + TableSchema schema = + SchemaUtils.forceCommit( + new SchemaManager(fileIO, tablePath), + new Schema( + rowType.getFields(), + Collections.singletonList("part"), + Arrays.asList("pk", "part"), + options.toMap(), + "")); + table = FileStoreTableFactory.create(fileIO, tablePath, schema); + + String commitUser = UUID.randomUUID().toString(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + incrementalIdentifier = 0; + } + + @AfterEach + public void tearDown() throws Exception { + if (write != null) { + write.close(); + } + if (commit != null) { + commit.close(); + } + } + + /** + * Reproduces the Frankfurt regression. + * + *

Setup: main branch has snapshots 1..10, branch "cluster" was created at snapshot 3 (so the + * branch directory contains snapshots 1..3). + * + *

A mock {@link SnapshotLoader} simulates a REST catalog that always returns the main + * table's latest snapshot (ID=10) regardless of the branch identifier. This is what happens + * when Bennett does not correctly resolve {@code table$branch_cluster} and falls back to the + * main table. + * + *

Expected (bug): {@code latestSnapshotId()} returns 10 (main's), but the branch directory + * only has snapshots up to 3. The 166944-snapshot gap from Frankfurt (main=166971, branch=27) + * is the same class of bug at production scale. + */ + @Test + public void testLatestSnapshotIdMismatchWithBuggyRestLoader() throws Exception { + // P0: Write 3 snapshots to main branch + for (int i = 0; i < 3; i++) { + writeRow(i, 0, "value_" + i); + } + SnapshotManager mainSnapshotManager = table.snapshotManager(); + long branchPointId = mainSnapshotManager.latestSnapshotId(); + assertThat(branchPointId).isGreaterThanOrEqualTo(3L); + + // P1: Create branch "cluster" at the current snapshot via tag + table.createTag("tag_for_branch", branchPointId); + table.createBranch("cluster", "tag_for_branch"); + + // P2: Write 7 more snapshots to main branch + for (int i = 3; i < 10; i++) { + writeRow(i, 0, "value_" + i); + } + long mainLatestId = mainSnapshotManager.latestSnapshotId(); + assertThat(mainLatestId).isGreaterThan(branchPointId); + + // P3: Verify branch filesystem has snapshots only up to branchPointId + SnapshotManager branchFsManager = + new SnapshotManager(fileIO, tablePath, "cluster", null, null); + Long branchLatestFromFs = branchFsManager.latestSnapshotIdFromFileSystem(); + assertThat(branchLatestFromFs).isEqualTo(branchPointId); + + List branchSnapshots = branchFsManager.safelyGetAllSnapshots(); + long maxBranchSnapshotId = + branchSnapshots.stream().mapToLong(Snapshot::id).max().orElse(-1); + assertThat(maxBranchSnapshotId).isEqualTo(branchPointId); + + // P4: Simulate the REST catalog bug — create a SnapshotLoader that returns + // the MAIN table's latest snapshot when asked for the branch's snapshot. + // This is exactly what happens when Bennett receives + // GET /databases/dwd/tables/dwd_flow_sdk_log_i_s%24branch_cluster/snapshot + // but resolves it to the main table instead of the branch entry. + Snapshot mainLatestSnapshot = mainSnapshotManager.latestSnapshot(); + assertThat(mainLatestSnapshot.id()).isEqualTo(mainLatestId); + + SnapshotLoader buggyLoader = new BuggyRestSnapshotLoader(mainLatestSnapshot); + SnapshotManager branchWithBuggyLoader = + new SnapshotManager(fileIO, tablePath, "cluster", buggyLoader, null); + + // P5: Demonstrate the bug — latestSnapshotId() returns main's ID instead of branch's + Long latestIdFromLoader = branchWithBuggyLoader.latestSnapshotId(); + assertThat(latestIdFromLoader) + .as( + "Bug: latestSnapshotId() via REST loader returns main table's snapshot ID " + + "instead of branch's. This is the root cause of Frankfurt's " + + "'safelyGetAllSnapshots did not include latest snapshot' error. " + + "main=%d, branch=%d", + mainLatestId, branchPointId) + .isEqualTo(mainLatestId); + + // P6: safelyGetAllSnapshots() reads from the correct branch directory (filesystem) + List branchSnapshotsFromBuggyManager = + branchWithBuggyLoader.safelyGetAllSnapshots(); + long maxIdFromBranchDir = + branchSnapshotsFromBuggyManager.stream().mapToLong(Snapshot::id).max().orElse(-1); + assertThat(maxIdFromBranchDir).isEqualTo(branchPointId); + + // P7: The mismatch — latestSnapshotId()=mainLatestId but + // max(safelyGetAllSnapshots)=branchPointId. + // Any code that validates "latestSnapshotId must be in safelyGetAllSnapshots" will fail. + // At Frankfurt scale this was main=166971 vs branch=27. + assertThat(latestIdFromLoader).isNotEqualTo(maxIdFromBranchDir); + + // P8: Verify the workaround — latestSnapshotIdFromFileSystem() returns the correct value + Long latestFromFs = branchWithBuggyLoader.latestSnapshotIdFromFileSystem(); + assertThat(latestFromFs) + .as("Workaround: latestSnapshotIdFromFileSystem() bypasses the REST loader") + .isEqualTo(branchPointId); + assertThat(latestFromFs).isEqualTo(maxIdFromBranchDir); + } + + /** + * Verifies that when the SnapshotLoader correctly returns the branch's snapshot, there is no + * mismatch. + */ + @Test + public void testLatestSnapshotIdCorrectWithProperBranchLoader() throws Exception { + // Write 3 snapshots to main, create branch, write 7 more to main + for (int i = 0; i < 3; i++) { + writeRow(i, 0, "value_" + i); + } + long branchPointId = table.snapshotManager().latestSnapshotId(); + table.createTag("tag_for_branch", branchPointId); + table.createBranch("cluster", "tag_for_branch"); + for (int i = 3; i < 10; i++) { + writeRow(i, 0, "value_" + i); + } + + // Correct loader: returns the branch's actual latest snapshot + SnapshotManager branchFsManager = + new SnapshotManager(fileIO, tablePath, "cluster", null, null); + Snapshot branchLatestSnapshot = branchFsManager.latestSnapshot(); + assertThat(branchLatestSnapshot.id()).isEqualTo(branchPointId); + + SnapshotLoader correctLoader = new BuggyRestSnapshotLoader(branchLatestSnapshot); + SnapshotManager branchWithCorrectLoader = + new SnapshotManager(fileIO, tablePath, "cluster", correctLoader, null); + + // No mismatch — latestSnapshotId() matches safelyGetAllSnapshots() + Long latestIdFromLoader = branchWithCorrectLoader.latestSnapshotId(); + assertThat(latestIdFromLoader).isEqualTo(branchPointId); + + List branchSnapshots = branchWithCorrectLoader.safelyGetAllSnapshots(); + long maxIdFromBranch = branchSnapshots.stream().mapToLong(Snapshot::id).max().orElse(-1); + assertThat(latestIdFromLoader).isEqualTo(maxIdFromBranch); + } + + /** + * Verifies that copyWithBranch propagates the SnapshotLoader correctly — the new + * SnapshotLoader's copyWithBranch is called with the target branch name. + */ + @Test + public void testCopyWithBranchPropagatesLoader() throws Exception { + for (int i = 0; i < 3; i++) { + writeRow(i, 0, "value_" + i); + } + + SnapshotManager mainManager = table.snapshotManager(); + Snapshot mainSnapshot = mainManager.latestSnapshot(); + + // Create a loader that tracks copyWithBranch calls + TrackingSnapshotLoader trackingLoader = new TrackingSnapshotLoader(mainSnapshot); + SnapshotManager withLoader = + new SnapshotManager(fileIO, tablePath, "main", trackingLoader, null); + + // copyWithBranch should create a new SnapshotManager with the loader's copyWithBranch + // result + SnapshotManager copied = withLoader.copyWithBranch("cluster"); + + // The copied manager should use the new loader (which is a TrackingSnapshotLoader + // that was created by copyWithBranch with branch="cluster") + assertThat(trackingLoader.lastCopyBranch).isEqualTo("cluster"); + } + + private void writeRow(int pk, int part, String value) throws Exception { + InternalRow row = + GenericRow.ofKind(RowKind.INSERT, pk, part, BinaryString.fromString(value)); + write.write(row); + List messages = write.prepareCommit(false, incrementalIdentifier); + commit.commit(incrementalIdentifier, messages); + incrementalIdentifier++; + } + + /** + * A SnapshotLoader that always returns a fixed snapshot, simulating a REST catalog (Bennett) + * that returns the main table's snapshot when queried for a branch's snapshot. + */ + private static class BuggyRestSnapshotLoader implements SnapshotLoader { + private static final long serialVersionUID = 1L; + private final Snapshot fixedSnapshot; + + BuggyRestSnapshotLoader(Snapshot fixedSnapshot) { + this.fixedSnapshot = fixedSnapshot; + } + + @Override + public Optional load() { + return Optional.of(fixedSnapshot); + } + + @Override + public void rollback(Instant instant) { + throw new UnsupportedOperationException(); + } + + @Override + public SnapshotLoader copyWithBranch(String branch) { + // In the buggy scenario, even after copyWithBranch, the REST catalog + // still returns the main table's snapshot. + return new BuggyRestSnapshotLoader(fixedSnapshot); + } + } + + /** A SnapshotLoader that tracks copyWithBranch calls for verification. */ + private static class TrackingSnapshotLoader implements SnapshotLoader { + private static final long serialVersionUID = 1L; + private final Snapshot snapshot; + String lastCopyBranch; + + TrackingSnapshotLoader(Snapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public Optional load() { + return Optional.of(snapshot); + } + + @Override + public void rollback(Instant instant) { + throw new UnsupportedOperationException(); + } + + @Override + public SnapshotLoader copyWithBranch(String branch) { + lastCopyBranch = branch; + return new TrackingSnapshotLoader(snapshot); + } + } +} From df2df2661543f70cc3bb40ac2da81207eec359a9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 17 May 2026 12:10:47 +0800 Subject: [PATCH 2/4] [core] Fix FallbackReadFileStoreTable.switchToBranch using wrong CatalogEnvironment When a table has scan.fallback-branch or scan.primary-branch set, getTable() returns a FallbackReadFileStoreTable. Calling switchToBranch() on it delegates to switchWrappedToBranch(), which previously passed the main table's CatalogEnvironment to the branch table. This caused REST snapshot loading (via SnapshotLoader) to query the main table's endpoint instead of the branch's (e.g. tableName$branch_branchName), returning the wrong snapshot ID. Fix: create a branch-aware CatalogEnvironment by copying the wrapped table's environment with an Identifier that includes the branch name. Co-Authored-By: Claude Opus 4.6 --- .../table/FallbackReadFileStoreTable.java | 23 ++++++++--- .../table/FallbackReadFileStoreTableTest.java | 38 +++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 5c32135f764a..75353983bb2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -154,12 +155,24 @@ protected FileStoreTable switchWrappedToBranch(String branchName) { Options branchOptions = new Options(branchSchema.options()); branchOptions.set(CoreOptions.BRANCH, branchName); branchSchema = branchSchema.copy(branchOptions.toMap()); + + // Create branch-aware CatalogEnvironment so that REST snapshot loading + // targets the branch table (e.g. tableName$branch_branchName) instead of main. + CatalogEnvironment wrappedEnv = wrapped.catalogEnvironment(); + CatalogEnvironment branchEnv = wrappedEnv; + Identifier wrappedId = wrappedEnv.identifier(); + if (wrappedId != null) { + branchEnv = + wrappedEnv.copy( + new Identifier( + wrappedId.getDatabaseName(), + wrappedId.getTableName(), + branchName, + wrappedId.getSystemTableName())); + } + return FileStoreTableFactory.createWithoutFallbackBranch( - wrapped.fileIO(), - wrapped.location(), - branchSchema, - new Options(), - wrapped.catalogEnvironment()); + wrapped.fileIO(), wrapped.location(), branchSchema, new Options(), branchEnv); } protected Map rewriteOtherOptions(Map options) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 7f586875a193..bac3fa443c91 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; @@ -332,6 +333,43 @@ public org.apache.paimon.reader.RecordReader createReader(Split spl }; } + @Test + void testSwitchToBranch() throws Exception { + String branchName = "bc"; + + Identifier mainId = Identifier.create("mydb", "mytable"); + CatalogEnvironment env = + new CatalogEnvironment(mainId, "uuid-1", null, null, null, null, false, false); + + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + ROW_TYPE.getFields(), + Collections.singletonList("pt"), + Collections.emptyList(), + Collections.emptyMap(), + "")); + AppendOnlyFileStoreTable mainTable = + new AppendOnlyFileStoreTable(fileIO, tablePath, tableSchema, env); + + writeDataIntoTable(mainTable, 0, rowData(1, 10)); + mainTable.createBranch(branchName); + + FileStoreTable branchTable = createTableFromBranch(mainTable, branchName); + writeDataIntoTable(branchTable, 0, rowData(2, 20)); + + FallbackReadFileStoreTable fallbackTable = + new FallbackReadFileStoreTable(mainTable, branchTable, true); + + FileStoreTable switched = fallbackTable.switchToBranch(branchName); + Identifier switchedId = switched.catalogEnvironment().identifier(); + + assertThat(switchedId).isNotNull(); + assertThat(switchedId.getDatabaseName()).isEqualTo("mydb"); + assertThat(switchedId.getBranchName()).isEqualTo(branchName); + } + private void writeDataIntoTable( FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception { StreamTableWrite write = table.newWrite(commitUser); From 5d84f8385012b928c620b85e0e581810920a050e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 17 May 2026 12:15:33 +0800 Subject: [PATCH 3/4] [core] Remove obsolete SnapshotManagerBranchLoaderTest --- .../SnapshotManagerBranchLoaderTest.java | 346 ------------------ 1 file changed, 346 deletions(-) delete mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java deleted file mode 100644 index 0f58a1d775de..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerBranchLoaderTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.SchemaUtils; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; -import org.apache.paimon.table.Instant; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.TableCommitImpl; -import org.apache.paimon.table.sink.TableWriteImpl; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowKind; -import org.apache.paimon.types.RowType; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Reproduces the Frankfurt eu-central-1 regression: when a Paimon table has branches and the REST - * catalog (Bennett) returns the main table's snapshot for a branch query, {@link - * SnapshotManager#latestSnapshotId()} returns a snapshot ID that does not exist in the branch's - * snapshot directory. Any caller that compares {@code latestSnapshotId()} against the actual - * snapshots on disk (e.g. Morax orphan-clean validation) will fail with "safelyGetAllSnapshots did - * not include latest snapshot". - * - *

Root cause: the REST server may not correctly resolve branch-qualified table identifiers - * (e.g., {@code table$branch_cluster}) and instead returns the main table's snapshot. Paimon's - * {@link SnapshotManager#latestSnapshotId()} trusts the {@link SnapshotLoader} result without - * cross-checking the filesystem. - * - *

Workaround: for non-main branches, use {@link - * SnapshotManager#latestSnapshotIdFromFileSystem()} to bypass the REST catalog entirely. - */ -public class SnapshotManagerBranchLoaderTest { - - @TempDir java.nio.file.Path tempDir; - - private FileIO fileIO; - private Path tablePath; - private FileStoreTable table; - private TableWriteImpl write; - private TableCommitImpl commit; - private long incrementalIdentifier; - - @BeforeEach - public void setUp() throws Exception { - fileIO = LocalFileIO.create(); - tablePath = new Path(tempDir.toString()); - - RowType rowType = - RowType.of( - new org.apache.paimon.types.DataType[] { - DataTypes.INT(), DataTypes.INT(), DataTypes.STRING() - }, - new String[] {"pk", "part", "value"}); - - Options options = new Options(); - options.set(CoreOptions.PATH, tablePath.toString()); - options.set(CoreOptions.BUCKET, 1); - - TableSchema schema = - SchemaUtils.forceCommit( - new SchemaManager(fileIO, tablePath), - new Schema( - rowType.getFields(), - Collections.singletonList("part"), - Arrays.asList("pk", "part"), - options.toMap(), - "")); - table = FileStoreTableFactory.create(fileIO, tablePath, schema); - - String commitUser = UUID.randomUUID().toString(); - write = table.newWrite(commitUser); - commit = table.newCommit(commitUser); - incrementalIdentifier = 0; - } - - @AfterEach - public void tearDown() throws Exception { - if (write != null) { - write.close(); - } - if (commit != null) { - commit.close(); - } - } - - /** - * Reproduces the Frankfurt regression. - * - *

Setup: main branch has snapshots 1..10, branch "cluster" was created at snapshot 3 (so the - * branch directory contains snapshots 1..3). - * - *

A mock {@link SnapshotLoader} simulates a REST catalog that always returns the main - * table's latest snapshot (ID=10) regardless of the branch identifier. This is what happens - * when Bennett does not correctly resolve {@code table$branch_cluster} and falls back to the - * main table. - * - *

Expected (bug): {@code latestSnapshotId()} returns 10 (main's), but the branch directory - * only has snapshots up to 3. The 166944-snapshot gap from Frankfurt (main=166971, branch=27) - * is the same class of bug at production scale. - */ - @Test - public void testLatestSnapshotIdMismatchWithBuggyRestLoader() throws Exception { - // P0: Write 3 snapshots to main branch - for (int i = 0; i < 3; i++) { - writeRow(i, 0, "value_" + i); - } - SnapshotManager mainSnapshotManager = table.snapshotManager(); - long branchPointId = mainSnapshotManager.latestSnapshotId(); - assertThat(branchPointId).isGreaterThanOrEqualTo(3L); - - // P1: Create branch "cluster" at the current snapshot via tag - table.createTag("tag_for_branch", branchPointId); - table.createBranch("cluster", "tag_for_branch"); - - // P2: Write 7 more snapshots to main branch - for (int i = 3; i < 10; i++) { - writeRow(i, 0, "value_" + i); - } - long mainLatestId = mainSnapshotManager.latestSnapshotId(); - assertThat(mainLatestId).isGreaterThan(branchPointId); - - // P3: Verify branch filesystem has snapshots only up to branchPointId - SnapshotManager branchFsManager = - new SnapshotManager(fileIO, tablePath, "cluster", null, null); - Long branchLatestFromFs = branchFsManager.latestSnapshotIdFromFileSystem(); - assertThat(branchLatestFromFs).isEqualTo(branchPointId); - - List branchSnapshots = branchFsManager.safelyGetAllSnapshots(); - long maxBranchSnapshotId = - branchSnapshots.stream().mapToLong(Snapshot::id).max().orElse(-1); - assertThat(maxBranchSnapshotId).isEqualTo(branchPointId); - - // P4: Simulate the REST catalog bug — create a SnapshotLoader that returns - // the MAIN table's latest snapshot when asked for the branch's snapshot. - // This is exactly what happens when Bennett receives - // GET /databases/dwd/tables/dwd_flow_sdk_log_i_s%24branch_cluster/snapshot - // but resolves it to the main table instead of the branch entry. - Snapshot mainLatestSnapshot = mainSnapshotManager.latestSnapshot(); - assertThat(mainLatestSnapshot.id()).isEqualTo(mainLatestId); - - SnapshotLoader buggyLoader = new BuggyRestSnapshotLoader(mainLatestSnapshot); - SnapshotManager branchWithBuggyLoader = - new SnapshotManager(fileIO, tablePath, "cluster", buggyLoader, null); - - // P5: Demonstrate the bug — latestSnapshotId() returns main's ID instead of branch's - Long latestIdFromLoader = branchWithBuggyLoader.latestSnapshotId(); - assertThat(latestIdFromLoader) - .as( - "Bug: latestSnapshotId() via REST loader returns main table's snapshot ID " - + "instead of branch's. This is the root cause of Frankfurt's " - + "'safelyGetAllSnapshots did not include latest snapshot' error. " - + "main=%d, branch=%d", - mainLatestId, branchPointId) - .isEqualTo(mainLatestId); - - // P6: safelyGetAllSnapshots() reads from the correct branch directory (filesystem) - List branchSnapshotsFromBuggyManager = - branchWithBuggyLoader.safelyGetAllSnapshots(); - long maxIdFromBranchDir = - branchSnapshotsFromBuggyManager.stream().mapToLong(Snapshot::id).max().orElse(-1); - assertThat(maxIdFromBranchDir).isEqualTo(branchPointId); - - // P7: The mismatch — latestSnapshotId()=mainLatestId but - // max(safelyGetAllSnapshots)=branchPointId. - // Any code that validates "latestSnapshotId must be in safelyGetAllSnapshots" will fail. - // At Frankfurt scale this was main=166971 vs branch=27. - assertThat(latestIdFromLoader).isNotEqualTo(maxIdFromBranchDir); - - // P8: Verify the workaround — latestSnapshotIdFromFileSystem() returns the correct value - Long latestFromFs = branchWithBuggyLoader.latestSnapshotIdFromFileSystem(); - assertThat(latestFromFs) - .as("Workaround: latestSnapshotIdFromFileSystem() bypasses the REST loader") - .isEqualTo(branchPointId); - assertThat(latestFromFs).isEqualTo(maxIdFromBranchDir); - } - - /** - * Verifies that when the SnapshotLoader correctly returns the branch's snapshot, there is no - * mismatch. - */ - @Test - public void testLatestSnapshotIdCorrectWithProperBranchLoader() throws Exception { - // Write 3 snapshots to main, create branch, write 7 more to main - for (int i = 0; i < 3; i++) { - writeRow(i, 0, "value_" + i); - } - long branchPointId = table.snapshotManager().latestSnapshotId(); - table.createTag("tag_for_branch", branchPointId); - table.createBranch("cluster", "tag_for_branch"); - for (int i = 3; i < 10; i++) { - writeRow(i, 0, "value_" + i); - } - - // Correct loader: returns the branch's actual latest snapshot - SnapshotManager branchFsManager = - new SnapshotManager(fileIO, tablePath, "cluster", null, null); - Snapshot branchLatestSnapshot = branchFsManager.latestSnapshot(); - assertThat(branchLatestSnapshot.id()).isEqualTo(branchPointId); - - SnapshotLoader correctLoader = new BuggyRestSnapshotLoader(branchLatestSnapshot); - SnapshotManager branchWithCorrectLoader = - new SnapshotManager(fileIO, tablePath, "cluster", correctLoader, null); - - // No mismatch — latestSnapshotId() matches safelyGetAllSnapshots() - Long latestIdFromLoader = branchWithCorrectLoader.latestSnapshotId(); - assertThat(latestIdFromLoader).isEqualTo(branchPointId); - - List branchSnapshots = branchWithCorrectLoader.safelyGetAllSnapshots(); - long maxIdFromBranch = branchSnapshots.stream().mapToLong(Snapshot::id).max().orElse(-1); - assertThat(latestIdFromLoader).isEqualTo(maxIdFromBranch); - } - - /** - * Verifies that copyWithBranch propagates the SnapshotLoader correctly — the new - * SnapshotLoader's copyWithBranch is called with the target branch name. - */ - @Test - public void testCopyWithBranchPropagatesLoader() throws Exception { - for (int i = 0; i < 3; i++) { - writeRow(i, 0, "value_" + i); - } - - SnapshotManager mainManager = table.snapshotManager(); - Snapshot mainSnapshot = mainManager.latestSnapshot(); - - // Create a loader that tracks copyWithBranch calls - TrackingSnapshotLoader trackingLoader = new TrackingSnapshotLoader(mainSnapshot); - SnapshotManager withLoader = - new SnapshotManager(fileIO, tablePath, "main", trackingLoader, null); - - // copyWithBranch should create a new SnapshotManager with the loader's copyWithBranch - // result - SnapshotManager copied = withLoader.copyWithBranch("cluster"); - - // The copied manager should use the new loader (which is a TrackingSnapshotLoader - // that was created by copyWithBranch with branch="cluster") - assertThat(trackingLoader.lastCopyBranch).isEqualTo("cluster"); - } - - private void writeRow(int pk, int part, String value) throws Exception { - InternalRow row = - GenericRow.ofKind(RowKind.INSERT, pk, part, BinaryString.fromString(value)); - write.write(row); - List messages = write.prepareCommit(false, incrementalIdentifier); - commit.commit(incrementalIdentifier, messages); - incrementalIdentifier++; - } - - /** - * A SnapshotLoader that always returns a fixed snapshot, simulating a REST catalog (Bennett) - * that returns the main table's snapshot when queried for a branch's snapshot. - */ - private static class BuggyRestSnapshotLoader implements SnapshotLoader { - private static final long serialVersionUID = 1L; - private final Snapshot fixedSnapshot; - - BuggyRestSnapshotLoader(Snapshot fixedSnapshot) { - this.fixedSnapshot = fixedSnapshot; - } - - @Override - public Optional load() { - return Optional.of(fixedSnapshot); - } - - @Override - public void rollback(Instant instant) { - throw new UnsupportedOperationException(); - } - - @Override - public SnapshotLoader copyWithBranch(String branch) { - // In the buggy scenario, even after copyWithBranch, the REST catalog - // still returns the main table's snapshot. - return new BuggyRestSnapshotLoader(fixedSnapshot); - } - } - - /** A SnapshotLoader that tracks copyWithBranch calls for verification. */ - private static class TrackingSnapshotLoader implements SnapshotLoader { - private static final long serialVersionUID = 1L; - private final Snapshot snapshot; - String lastCopyBranch; - - TrackingSnapshotLoader(Snapshot snapshot) { - this.snapshot = snapshot; - } - - @Override - public Optional load() { - return Optional.of(snapshot); - } - - @Override - public void rollback(Instant instant) { - throw new UnsupportedOperationException(); - } - - @Override - public SnapshotLoader copyWithBranch(String branch) { - lastCopyBranch = branch; - return new TrackingSnapshotLoader(snapshot); - } - } -} From 536ae71201ac0fbea56a1c835aa995348ccebfb5 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 17 May 2026 14:16:54 +0800 Subject: [PATCH 4/4] [core] Assert getObjectName in testSwitchToBranch to cover REST request path --- .../org/apache/paimon/table/FallbackReadFileStoreTableTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index bac3fa443c91..483ef861eb20 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -368,6 +368,7 @@ void testSwitchToBranch() throws Exception { assertThat(switchedId).isNotNull(); assertThat(switchedId.getDatabaseName()).isEqualTo("mydb"); assertThat(switchedId.getBranchName()).isEqualTo(branchName); + assertThat(switchedId.getObjectName()).isEqualTo("mytable$branch_bc"); } private void writeDataIntoTable(