Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.And;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.InPredicateVisitor;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -47,8 +54,10 @@
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -171,16 +180,49 @@ private class ConsumersRead implements InnerTableRead {

private final FileIO fileIO;
private RowType readType;
private final List<String> consumerIds = new ArrayList<>();

public ConsumersRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
if (predicate == null) {
return this;
}

String leafName = "consumer_id";
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
if ((compoundPredicate.function()) instanceof Or) {
// optimize for IN filter
InPredicateVisitor.extractInElements(predicate, leafName)
.ifPresent(
leafs ->
leafs.forEach(
leaf -> consumerIds.add(leaf.toString())));
} else if ((compoundPredicate.function()) instanceof And) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
handleLeafPredicate(leaf, leafName);
}
}
} else {
handleLeafPredicate(predicate, leafName);
}

return this;
}

public void handleLeafPredicate(Predicate predicate, String leafName) {
LeafPredicate consumerPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
if (consumerPred != null && consumerPred.function() instanceof Equal) {
consumerIds.add(consumerPred.literals().get(0).toString());
}
}

@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
Expand All @@ -198,7 +240,19 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((ConsumersTable.ConsumersSplit) split).location;
Map<String, Long> consumers = new ConsumerManager(fileIO, location, branch).consumers();
Map<String, Long> consumers;
if (!consumerIds.isEmpty()) {
consumers = new HashMap<>();
ConsumerManager consumerManager = new ConsumerManager(fileIO, location, branch);
for (String consumerId : consumerIds) {
consumerManager
.consumer(consumerId)
.ifPresent(
consumer -> consumers.put(consumerId, consumer.nextSnapshot()));
}
} else {
consumers = new ConsumerManager(fileIO, location, branch).consumers();
}
Iterator<InternalRow> rows =
Iterators.transform(consumers.entrySet().iterator(), this::toRow);
if (readType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -75,6 +83,40 @@ public void testPartitionRecordCount() throws Exception {
assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
}

@Test
public void testFilterByConsumerIdEqual() throws Exception {
Predicate predicate = consumerIdEqual("id1");
List<InternalRow> expectedRow =
Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 5L));
List<InternalRow> result = readWithFilter(consumersTable, predicate);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testFilterByConsumerIdEqualNoMatch() throws Exception {
Predicate predicate = consumerIdEqual("id999");
List<InternalRow> result = readWithFilter(consumersTable, predicate);
assertThat(result).isEmpty();
}

@Test
public void testFilterByConsumerIdIn() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
Predicate predicate = builder.in(0, Arrays.asList("id1", "id999"));
List<InternalRow> expectedRow =
Arrays.asList(GenericRow.of(BinaryString.fromString("id1"), 5L));
List<InternalRow> result = readWithFilter(consumersTable, predicate);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testFilterByConsumerIdInNoMatch() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
Predicate predicate = builder.in(0, Arrays.asList("id998", "id999"));
List<InternalRow> result = readWithFilter(consumersTable, predicate);
assertThat(result).isEmpty();
}

private List<InternalRow> getExpectedResult() throws IOException {
Map<String, Long> consumers = manager.consumers();
return consumers.entrySet().stream()
Expand All @@ -84,4 +126,19 @@ private List<InternalRow> getExpectedResult() throws IOException {
BinaryString.fromString(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
}

private Predicate consumerIdEqual(String consumerId) {
PredicateBuilder builder = new PredicateBuilder(ConsumersTable.TABLE_TYPE);
return builder.equal(0, BinaryString.fromString(consumerId));
}

private List<InternalRow> readWithFilter(Table table, Predicate predicate) throws Exception {
ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
InternalRowSerializer serializer = new InternalRowSerializer(table.rowType());
List<InternalRow> rows = new ArrayList<>();
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
return rows;
}
}