From 3490ad9dd9c9b906075bd94d2d1a6fa7a480876e Mon Sep 17 00:00:00 2001 From: yehe Date: Thu, 14 May 2026 10:57:12 +0800 Subject: [PATCH 1/2] [core] Fix BranchesTable returning empty result for non Equal/IN predicates BranchesRead#withFilter is currently a // TODO, and BranchesRead#branches() only handles two scan-side fast paths (Equal and OR-IN on branch_name) inside if (predicate != null) with no fallback. As a result, any caller that pushes a non-Equal/IN predicate into BranchesRead gets an empty result instead of "list all and let the post-filter refine": ReadBuilder rb = branchesTable.newReadBuilder() .withFilter(builder.notEqual(0, BinaryString.fromString("b2"))); // returns 0 rows on master, should return all branches except b2 Today this is a latent issue at the SQL layer: Flink does not push <>, LIKE, or cross-field OR into the paimon source for system tables, so the filter is applied above the source and users see correct results. But the InnerTableRead contract is still broken, and any future caller (other engines, custom readers, or expanded Flink push-down) that hands such a predicate to BranchesRead will silently lose data. Fix: * drop the implicit else: each fast path now returns as soon as it fully consumes the predicate, and the listing loop sits at the end of the method, reached by any predicate shape no fast path can answer (Equal on a different field, AND, NotEqual, LIKE, etc.); * tighten the OR fast path so it only short-circuits when InPredicateVisitor can actually flatten the OR back to an IN over branch_name; otherwise fall through to the listing fallback; * implement BranchesRead#withFilter as a read-side post-filter that evaluates the predicate on the materialized row before projection. Tests: * BranchesTableTest drives InnerTableRead directly via PredicateBuilder (the bug is reproducible there: testReadWithBranchNameNotEqualFilter fails on master, passes after the fix). * BranchSqlITCase#testBranchesTableFilter is extended with <>, LIKE, and "OR that is not an IN" cases. These also pass on master since Flink does not push these predicates down today; included as smoke tests to ensure the fix does not break the planner-level fallback path. --- .../paimon/table/system/BranchesTable.java | 40 ++++++----- .../table/system/BranchesTableTest.java | 69 +++++++++++++++++++ .../apache/paimon/flink/BranchSqlITCase.java | 17 +++++ 3 files changed, 107 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index f6525497030e..c0ac17f21fa8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; @@ -191,13 +192,15 @@ private static class BranchesRead implements InnerTableRead { private final FileIO fileIO; private RowType readType; + @Nullable private Predicate postFilter; + public BranchesRead(FileIO fileIO) { this.fileIO = fileIO; } @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + this.postFilter = predicate; return this; } @@ -228,6 +231,10 @@ public RecordReader createReader(Split split) { throw new UncheckedIOException(e); } + if (postFilter != null) { + rows = Iterators.filter(rows, postFilter::test); + } + if (readType != null) { rows = Iterators.transform( @@ -261,7 +268,6 @@ private List branches(FileStoreTable table, Predicate predicate) BranchManager branchManager = table.branchManager(); Path tablePath = table.location(); List result = new ArrayList<>(); - // Handle predicate filtering for branch_name if (predicate != null) { // Handle Equal predicate if (predicate instanceof LeafPredicate @@ -273,32 +279,28 @@ private List branches(FileStoreTable table, Predicate predicate) if (branchManager.branchExists(equalValue)) { result.add(createBranchRow(equalValue, tablePath)); } + return result; } // Handle CompoundPredicate (OR case for IN filter) - if (predicate instanceof CompoundPredicate) { - CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; - if ((compoundPredicate.function()) instanceof Or) { - List branchNames = new ArrayList<>(); - InPredicateVisitor.extractInElements(predicate, BRANCH_NAME) - .ifPresent( - e -> - e.stream() - .map(Object::toString) - .forEach(branchNames::add)); - for (String branchName : branchNames) { + if (predicate instanceof CompoundPredicate + && ((CompoundPredicate) predicate).function() instanceof Or) { + Optional> inElements = + InPredicateVisitor.extractInElements(predicate, BRANCH_NAME); + if (inElements.isPresent()) { + for (Object element : inElements.get()) { + String branchName = element.toString(); if (branchManager.branchExists(branchName)) { result.add(createBranchRow(branchName, tablePath)); } } + return result; } } - } else { - // Fallback to original logic if no predicate - List branches = branchManager.branches(); - for (String branch : branches) { - result.add(createBranchRow(branch, tablePath)); - } + } + // Fallback: list all branches; the read-side post-filter refines the result. + for (String branch : branchManager.branches()) { + result.add(createBranchRow(branch, tablePath)); } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java index f1fbbe0177c1..734989444bf1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java @@ -19,19 +19,27 @@ package org.apache.paimon.table.system; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -90,4 +98,65 @@ void testBranches() throws Exception { .collect(Collectors.toList())) .containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3"); } + + @Test + void testReadWithBranchNameEqualFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("my_branch2")))) + .containsExactly("my_branch2"); + assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("nope")))).isEmpty(); + } + + @Test + void testReadWithBranchNameInFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat( + readBranchNames( + builder.in( + 0, + Arrays.asList( + (Object) BinaryString.fromString("my_branch1"), + BinaryString.fromString("my_branch3"))))) + .containsExactlyInAnyOrder("my_branch1", "my_branch3"); + } + + @Test + void testReadWithBranchNameNotEqualFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat(readBranchNames(builder.notEqual(0, BinaryString.fromString("my_branch2")))) + .containsExactlyInAnyOrder("my_branch1", "my_branch3"); + } + + @Test + void testReadWithNullFilterReturnsAll() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + + assertThat(readBranchNames(null)).containsExactlyInAnyOrder("my_branch1", "my_branch2"); + } + + private List readBranchNames(Predicate predicate) throws IOException { + ReadBuilder readBuilder = branchesTable.newReadBuilder(); + if (predicate != null) { + readBuilder = readBuilder.withFilter(predicate); + } + List names = new ArrayList<>(); + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + reader.forEachRemaining(row -> names.add(row.getString(0).toString())); + } + return names; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 7c2b1a33b261..2f164ca28fc0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -866,6 +866,23 @@ public void testBranchesTableFilter() throws Exception { collectResult( "SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'")) .isEmpty(); + + // not equals + assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name <> 'b2'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); + + // like + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name LIKE 'b%'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]"); + + // or that is not equivalent to an in list + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` " + + "WHERE branch_name = 'b1' OR branch_name <> 'b2'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); } @Test From f1e16158578d0a7b9851ca8a9f759ee1b0ca2fc8 Mon Sep 17 00:00:00 2001 From: yehe Date: Thu, 14 May 2026 14:54:33 +0800 Subject: [PATCH 2/2] Trigger CI