diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index f6525497030e..c0ac17f21fa8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; @@ -191,13 +192,15 @@ private static class BranchesRead implements InnerTableRead { private final FileIO fileIO; private RowType readType; + @Nullable private Predicate postFilter; + public BranchesRead(FileIO fileIO) { this.fileIO = fileIO; } @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + this.postFilter = predicate; return this; } @@ -228,6 +231,10 @@ public RecordReader createReader(Split split) { throw new UncheckedIOException(e); } + if (postFilter != null) { + rows = Iterators.filter(rows, postFilter::test); + } + if (readType != null) { rows = Iterators.transform( @@ -261,7 +268,6 @@ private List branches(FileStoreTable table, Predicate predicate) BranchManager branchManager = table.branchManager(); Path tablePath = table.location(); List result = new ArrayList<>(); - // Handle predicate filtering for branch_name if (predicate != null) { // Handle Equal predicate if (predicate instanceof LeafPredicate @@ -273,32 +279,28 @@ private List branches(FileStoreTable table, Predicate predicate) if (branchManager.branchExists(equalValue)) { result.add(createBranchRow(equalValue, tablePath)); } + return result; } // Handle CompoundPredicate (OR case for IN filter) - if (predicate instanceof CompoundPredicate) { - CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; - if ((compoundPredicate.function()) instanceof Or) { - List branchNames = new ArrayList<>(); - InPredicateVisitor.extractInElements(predicate, BRANCH_NAME) - .ifPresent( - e -> - e.stream() - .map(Object::toString) - .forEach(branchNames::add)); - for (String branchName : branchNames) { + if (predicate instanceof CompoundPredicate + && ((CompoundPredicate) predicate).function() instanceof Or) { + Optional> inElements = + InPredicateVisitor.extractInElements(predicate, BRANCH_NAME); + if (inElements.isPresent()) { + for (Object element : inElements.get()) { + String branchName = element.toString(); if (branchManager.branchExists(branchName)) { result.add(createBranchRow(branchName, tablePath)); } } + return result; } } - } else { - // Fallback to original logic if no predicate - List branches = branchManager.branches(); - for (String branch : branches) { - result.add(createBranchRow(branch, tablePath)); - } + } + // Fallback: list all branches; the read-side post-filter refines the result. + for (String branch : branchManager.branches()) { + result.add(createBranchRow(branch, tablePath)); } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java index f1fbbe0177c1..734989444bf1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java @@ -19,19 +19,27 @@ package org.apache.paimon.table.system; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -90,4 +98,65 @@ void testBranches() throws Exception { .collect(Collectors.toList())) .containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3"); } + + @Test + void testReadWithBranchNameEqualFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("my_branch2")))) + .containsExactly("my_branch2"); + assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("nope")))).isEmpty(); + } + + @Test + void testReadWithBranchNameInFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat( + readBranchNames( + builder.in( + 0, + Arrays.asList( + (Object) BinaryString.fromString("my_branch1"), + BinaryString.fromString("my_branch3"))))) + .containsExactlyInAnyOrder("my_branch1", "my_branch3"); + } + + @Test + void testReadWithBranchNameNotEqualFilter() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + + PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE); + assertThat(readBranchNames(builder.notEqual(0, BinaryString.fromString("my_branch2")))) + .containsExactlyInAnyOrder("my_branch1", "my_branch3"); + } + + @Test + void testReadWithNullFilterReturnsAll() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + + assertThat(readBranchNames(null)).containsExactlyInAnyOrder("my_branch1", "my_branch2"); + } + + private List readBranchNames(Predicate predicate) throws IOException { + ReadBuilder readBuilder = branchesTable.newReadBuilder(); + if (predicate != null) { + readBuilder = readBuilder.withFilter(predicate); + } + List names = new ArrayList<>(); + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + reader.forEachRemaining(row -> names.add(row.getString(0).toString())); + } + return names; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 7c2b1a33b261..2f164ca28fc0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -866,6 +866,23 @@ public void testBranchesTableFilter() throws Exception { collectResult( "SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'")) .isEmpty(); + + // not equals + assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name <> 'b2'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); + + // like + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name LIKE 'b%'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]"); + + // or that is not equivalent to an in list + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` " + + "WHERE branch_name = 'b1' OR branch_name <> 'b2'")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); } @Test