Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
Expand Down Expand Up @@ -191,13 +192,15 @@ private static class BranchesRead implements InnerTableRead {
private final FileIO fileIO;
private RowType readType;

@Nullable private Predicate postFilter;

public BranchesRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
this.postFilter = predicate;
return this;
}

Expand Down Expand Up @@ -228,6 +231,10 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new UncheckedIOException(e);
}

if (postFilter != null) {
rows = Iterators.filter(rows, postFilter::test);
}

if (readType != null) {
rows =
Iterators.transform(
Expand Down Expand Up @@ -261,7 +268,6 @@ private List<InternalRow> branches(FileStoreTable table, Predicate predicate)
BranchManager branchManager = table.branchManager();
Path tablePath = table.location();
List<InternalRow> result = new ArrayList<>();
// Handle predicate filtering for branch_name
if (predicate != null) {
// Handle Equal predicate
if (predicate instanceof LeafPredicate
Expand All @@ -273,32 +279,28 @@ private List<InternalRow> branches(FileStoreTable table, Predicate predicate)
if (branchManager.branchExists(equalValue)) {
result.add(createBranchRow(equalValue, tablePath));
}
return result;
}

// Handle CompoundPredicate (OR case for IN filter)
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
if ((compoundPredicate.function()) instanceof Or) {
List<String> branchNames = new ArrayList<>();
InPredicateVisitor.extractInElements(predicate, BRANCH_NAME)
.ifPresent(
e ->
e.stream()
.map(Object::toString)
.forEach(branchNames::add));
for (String branchName : branchNames) {
if (predicate instanceof CompoundPredicate
&& ((CompoundPredicate) predicate).function() instanceof Or) {
Optional<List<Object>> inElements =
InPredicateVisitor.extractInElements(predicate, BRANCH_NAME);
if (inElements.isPresent()) {
for (Object element : inElements.get()) {
String branchName = element.toString();
if (branchManager.branchExists(branchName)) {
result.add(createBranchRow(branchName, tablePath));
}
}
return result;
}
}
} else {
// Fallback to original logic if no predicate
List<String> branches = branchManager.branches();
for (String branch : branches) {
result.add(createBranchRow(branch, tablePath));
}
}
// Fallback: list all branches; the read-side post-filter refines the result.
for (String branch : branchManager.branches()) {
result.add(createBranchRow(branch, tablePath));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,27 @@
package org.apache.paimon.table.system;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,4 +98,65 @@ void testBranches() throws Exception {
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3");
}

@Test
void testReadWithBranchNameEqualFilter() throws Exception {
table.createBranch("my_branch1", "2023-07-17");
table.createBranch("my_branch2", "2023-07-18");
table.createBranch("my_branch3", "2023-07-18");

PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE);
assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("my_branch2"))))
.containsExactly("my_branch2");
assertThat(readBranchNames(builder.equal(0, BinaryString.fromString("nope")))).isEmpty();
}

@Test
void testReadWithBranchNameInFilter() throws Exception {
table.createBranch("my_branch1", "2023-07-17");
table.createBranch("my_branch2", "2023-07-18");
table.createBranch("my_branch3", "2023-07-18");

PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE);
assertThat(
readBranchNames(
builder.in(
0,
Arrays.asList(
(Object) BinaryString.fromString("my_branch1"),
BinaryString.fromString("my_branch3")))))
.containsExactlyInAnyOrder("my_branch1", "my_branch3");
}

@Test
void testReadWithBranchNameNotEqualFilter() throws Exception {
table.createBranch("my_branch1", "2023-07-17");
table.createBranch("my_branch2", "2023-07-18");
table.createBranch("my_branch3", "2023-07-18");

PredicateBuilder builder = new PredicateBuilder(BranchesTable.TABLE_TYPE);
assertThat(readBranchNames(builder.notEqual(0, BinaryString.fromString("my_branch2"))))
.containsExactlyInAnyOrder("my_branch1", "my_branch3");
}

@Test
void testReadWithNullFilterReturnsAll() throws Exception {
table.createBranch("my_branch1", "2023-07-17");
table.createBranch("my_branch2", "2023-07-18");

assertThat(readBranchNames(null)).containsExactlyInAnyOrder("my_branch1", "my_branch2");
}

private List<String> readBranchNames(Predicate predicate) throws IOException {
ReadBuilder readBuilder = branchesTable.newReadBuilder();
if (predicate != null) {
readBuilder = readBuilder.withFilter(predicate);
}
List<String> names = new ArrayList<>();
try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
reader.forEachRemaining(row -> names.add(row.getString(0).toString()));
}
return names;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,23 @@ public void testBranchesTableFilter() throws Exception {
collectResult(
"SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'"))
.isEmpty();

// not equals
assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name <> 'b2'"))
.containsExactlyInAnyOrder("+I[b1]", "+I[b3]");

// like
assertThat(
collectResult(
"SELECT branch_name FROM `T$branches` WHERE branch_name LIKE 'b%'"))
.containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]");

// or that is not equivalent to an in list
assertThat(
collectResult(
"SELECT branch_name FROM `T$branches` "
+ "WHERE branch_name = 'b1' OR branch_name <> 'b2'"))
.containsExactlyInAnyOrder("+I[b1]", "+I[b3]");
}

@Test
Expand Down