Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public ChainTableBatchScan withBucketFilter(Filter<Integer> bucketFilter) {
@Override
public Plan plan() {
List<Split> splits = new ArrayList<>();
PartitionPredicate partitionPredicate = getPartitionPredicate();
PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
Expand All @@ -256,11 +255,11 @@ public Plan plan() {

Set<BinaryRow> snapshotPartitions =
new HashSet<>(
newChainPartitionListingScan(true, partitionPredicate)
newChainPartitionListingScan(true, getMainPartitionPredicate())
.listPartitions());

DataTableScan deltaPartitionScan =
newChainPartitionListingScan(false, partitionPredicate);
newChainPartitionListingScan(false, getFallbackPartitionPredicate());
List<BinaryRow> deltaPartitions =
deltaPartitionScan.listPartitions().stream()
.filter(p -> !snapshotPartitions.contains(p))
Expand Down Expand Up @@ -433,9 +432,10 @@ public Plan plan() {

@Override
public List<PartitionEntry> listPartitionEntries() {
PartitionPredicate partitionPredicate = getPartitionPredicate();
DataTableScan snapshotScan = newChainPartitionListingScan(true, partitionPredicate);
DataTableScan deltaScan = newChainPartitionListingScan(false, partitionPredicate);
DataTableScan snapshotScan =
newChainPartitionListingScan(true, getMainPartitionPredicate());
DataTableScan deltaScan =
newChainPartitionListingScan(false, getFallbackPartitionPredicate());
List<PartitionEntry> partitionEntries =
new ArrayList<>(snapshotScan.listPartitionEntries());
Set<BinaryRow> partitions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ public static class FallbackReadScan implements DataTableScan {
protected final Function<FileStoreTable, DataTableScan> scanCreator;
protected final DataTableScan mainScan;
protected final DataTableScan fallbackScan;
private PartitionPredicate partitionPredicate;
private PartitionPredicate mainPartitionPredicate;
private PartitionPredicate fallbackPartitionPredicate;

public FallbackReadScan(
FileStoreTable wrappedTable,
Expand Down Expand Up @@ -475,6 +476,15 @@ public FallbackReadScan withPartitionFilter(Predicate partitionPredicate) {
return this;
}

public InnerTableScan withPartitionFilter(
PartitionPredicate mainPartitionPredicate,
PartitionPredicate fallbackPartitionPredicate) {
mainScan.withPartitionFilter(mainPartitionPredicate);
fallbackScan.withPartitionFilter(fallbackPartitionPredicate);
setPartitionPredicate(mainPartitionPredicate, fallbackPartitionPredicate);
return this;
}

@Override
public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter) {
mainScan.withBucketFilter(bucketFilter);
Expand Down Expand Up @@ -521,15 +531,14 @@ public InnerTableScan dropStats() {
public TableScan.Plan plan() {
List<Split> splits = new ArrayList<>();
Set<BinaryRow> completePartitions =
new HashSet<>(
newPartitionListingScan(true, partitionPredicate).listPartitions());
new HashSet<>(newPartitionListingScan(true).listPartitions());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(toFallbackSplit(dataSplit, false));
}

List<BinaryRow> remainingPartitions =
newPartitionListingScan(false, partitionPredicate).listPartitions().stream()
newPartitionListingScan(false).listPartitions().stream()
.filter(p -> !completePartitions.contains(p))
.collect(Collectors.toList());
if (!remainingPartitions.isEmpty()) {
Expand All @@ -543,8 +552,8 @@ public TableScan.Plan plan() {

@Override
public List<PartitionEntry> listPartitionEntries() {
DataTableScan mainListingScan = newPartitionListingScan(true, partitionPredicate);
DataTableScan fallbackListingScan = newPartitionListingScan(false, partitionPredicate);
DataTableScan mainListingScan = newPartitionListingScan(true);
DataTableScan fallbackListingScan = newPartitionListingScan(false);
List<PartitionEntry> partitionEntries =
new ArrayList<>(mainListingScan.listPartitionEntries());
Set<BinaryRow> partitions =
Expand All @@ -560,18 +569,31 @@ public List<PartitionEntry> listPartitionEntries() {
}

protected void setPartitionPredicate(PartitionPredicate predicate) {
this.partitionPredicate = predicate;
this.mainPartitionPredicate = predicate;
this.fallbackPartitionPredicate = predicate;
}

protected void setPartitionPredicate(
PartitionPredicate mainPartitionPredicate,
PartitionPredicate fallbackPartitionPredicate) {
this.mainPartitionPredicate = mainPartitionPredicate;
this.fallbackPartitionPredicate = fallbackPartitionPredicate;
}

protected PartitionPredicate getMainPartitionPredicate() {
return mainPartitionPredicate;
}

protected PartitionPredicate getPartitionPredicate() {
return partitionPredicate;
protected PartitionPredicate getFallbackPartitionPredicate() {
return fallbackPartitionPredicate;
}

private DataTableScan newPartitionListingScan(
boolean isMain, PartitionPredicate scanPartitionPredicate) {
private DataTableScan newPartitionListingScan(boolean isMain) {
DataTableScan scan = scanCreator.apply(isMain ? wrappedTable : fallbackTable);
if (scanPartitionPredicate != null) {
scan.withPartitionFilter(scanPartitionPredicate);
if (isMain && getMainPartitionPredicate() != null) {
scan.withPartitionFilter(getMainPartitionPredicate());
} else if (!isMain && getFallbackPartitionPredicate() != null) {
scan.withPartitionFilter(getFallbackPartitionPredicate());
}
return scan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
Expand All @@ -53,9 +56,11 @@
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
Expand Down Expand Up @@ -333,6 +338,70 @@ public org.apache.paimon.reader.RecordReader<InternalRow> createReader(Split spl
};
}

/**
* Test that FallbackReadScan uses separate partition predicates for main and fallback scans.
* When withPartitionFilter(mainPredicate, fallbackPredicate) is called, plan() should only list
* partitions matching the corresponding predicate from each branch.
*/
@Test
public void testMainAndFallbackPartitionPredicates() throws Exception {
FileStoreTable mainTable = createTable();
writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20));

mainTable.createBranch("bc");
FileStoreTable branchTable = createTableFromBranch(mainTable, "bc");
writeDataIntoTable(
branchTable, 0, rowData(1, 100), rowData(2, 200), rowData(3, 300), rowData(4, 400));

FallbackReadFileStoreTable table =
new FallbackReadFileStoreTable(mainTable, branchTable, true);

RowType partitionType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"pt"});
PartitionPredicate mainPredicate =
PartitionPredicate.fromMultiple(
partitionType, Collections.singletonList(BinaryRow.singleColumn(1)));
PartitionPredicate fallbackPredicate =
PartitionPredicate.fromMultiple(
partitionType, Collections.singletonList(BinaryRow.singleColumn(3)));

// Case 1: both predicates set, pt=1 from main, pt=3 from fallback
assertThat(
readAndCollect(
table,
scan -> scan.withPartitionFilter(mainPredicate, fallbackPredicate)))
.containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(3, 300));

// Case 2: main predicate is null, fallback predicate set
assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(null, fallbackPredicate)))
.containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(2, 20), Pair.of(3, 300));

// Case 3: main predicate set, fallback predicate is null
assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(mainPredicate, null)))
.containsExactlyInAnyOrder(
Pair.of(1, 10), Pair.of(2, 200), Pair.of(3, 300), Pair.of(4, 400));

// Case 4: both null
assertThat(readAndCollect(table, scan -> scan.withPartitionFilter(null, null)))
.containsExactlyInAnyOrder(
Pair.of(1, 10), Pair.of(2, 20), Pair.of(3, 300), Pair.of(4, 400));
}

private List<Pair<Integer, Integer>> readAndCollect(
FallbackReadFileStoreTable table,
Consumer<FallbackReadFileStoreTable.FallbackReadScan> consumer)
throws Exception {
FallbackReadFileStoreTable.FallbackReadScan scan =
(FallbackReadFileStoreTable.FallbackReadScan) table.newScan();
consumer.accept(scan);
List<Pair<Integer, Integer>> result = new ArrayList<>();
for (Split split : scan.plan().splits()) {
RecordReader<InternalRow> reader = table.newRead().createReader(split);
reader.forEachRemaining(r -> result.add(Pair.of(r.getInt(0), r.getInt(1))));
reader.close();
}
return result;
}

@Test
void testSwitchToBranch() throws Exception {
String branchName = "bc";
Expand Down