Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public FileStoreScan withCompleteFilter(Predicate predicate) {
public Iterator<ManifestEntry> readManifestEntries(
List<ManifestFileMeta> manifestFiles, boolean useSequential) {
Iterator<ManifestEntry> result = super.readManifestEntries(manifestFiles, useSequential);
if (limit == null || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled) {
if (limit == null
|| limit <= 0
|| deletionVectorsEnabled
|| dataEvolutionEnabled
|| inputFilter != null) {
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ public boolean limitPushdownEnabled() {
return false;
}

return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && !deletionVectorsEnabled;
return mergeEngine != PARTIAL_UPDATE
&& mergeEngine != AGGREGATE
&& !deletionVectorsEnabled
&& valueFilter == null
&& keyFilter == null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public List<PartitionEntry> listPartitionEntries() {
}

private Optional<StartingScanner.Result> applyPushDownLimit() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in SnapshotReaderImpl, withFilter already know if there is non-part filter. (!pair.getRight().isEmpty())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be in SnapshotReaderImpl, withFilter already know if there is non-part filter. (!pair.getRight().isEmpty())

Updated

if (pushDownLimit == null) {
if (pushDownLimit == null || snapshotReader.hasNonPartitionFilter()) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public interface SnapshotReader {

SnapshotReader withLimit(int limit);

/** Whether the pushed filter still contains non-partition predicates. */
boolean hasNonPartitionFilter();

/** Get splits plan from snapshot. */
Plan read();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Nullable private final DVMetaCache dvMetaCache;

private ScanMode scanMode = ScanMode.ALL;
private boolean hasNonPartitionFilter;
private RecordComparator lazyPartitionComparator;
private CacheMetrics dvMetaCacheMetrics;

Expand Down Expand Up @@ -239,6 +240,7 @@ public SnapshotReader withFilter(Predicate predicate) {
scan.withPartitionFilter(pair.getLeft().get());
}
if (!pair.getRight().isEmpty()) {
this.hasNonPartitionFilter = true;
nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight()));
}
scan.withCompleteFilter(predicate);
Expand Down Expand Up @@ -338,6 +340,11 @@ public SnapshotReader withLimit(int limit) {
return this;
}

@Override
public boolean hasNonPartitionFilter() {
return hasNonPartitionFilter;
}

@Override
public SnapshotReader dropStats() {
scan.dropStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ public SnapshotReader withLimit(int limit) {
return this;
}

@Override
public boolean hasNonPartitionFilter() {
return wrapped.hasNonPartitionFilter();
}

@Override
public Plan read() {
return wrapped.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -331,20 +332,28 @@ public void testLimitPushdownWithValueFilter() throws Exception {

@Test
public void testLimitPushdownWithKeyFilter() throws Exception {
// Write data with different shop IDs
List<KeyValue> data = generateData(200);
Snapshot snapshot = writeData(data);

// With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit
// pushdown)
Predicate keyPredicate =
new PredicateBuilder(RowType.of(new IntType(false)))
.equal(0, data.get(0).key().getInt(0));

// baseline: keyFilter without limit
KeyValueFileStoreScan scanFilterOnly = store.newScan();
scanFilterOnly.withSnapshot(snapshot.id());
scanFilterOnly.withKeyFilter(keyPredicate);
int filteredFiles = scanFilterOnly.plan().files().size();
assertThat(filteredFiles).isGreaterThan(0);

// keyFilter + limit: early-stop by rowCount() is unsafe, should be disabled
KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withKeyFilter(
new PredicateBuilder(RowType.of(new IntType(false)))
.equal(0, data.get(0).key().getInt(0)));
scan.withKeyFilter(keyPredicate);
scan.withLimit(5);
List<ManifestEntry> files = scan.plan().files();
assertThat(files.size()).isGreaterThan(0);

assertThat(scan.limitPushdownEnabled()).isFalse();
assertThat(scan.plan().files().size()).isEqualTo(filteredFiles);
}

@Test
Expand Down Expand Up @@ -569,6 +578,48 @@ private Map<BinaryRow, BinaryRow> getActualKvMap(FileStoreScan scan, Long expect
return store.toKvMap(actualKvs);
}

@Test
void testLimitPushdownWithFilter() throws Exception {
int numFiles = 10;
int rowsPerFile = 100;

Snapshot snapshot = null;
for (int bucket = 0; bucket < numFiles; bucket++) {
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < rowsPerFile; i++) {
data.add(gen.nextInsert("", 0, (long) i, null, null));
}
snapshot = writeData(data, bucket);
}

KeyValueFileStoreScan scanAll = store.newScan();
scanAll.withSnapshot(snapshot.id());
List<ManifestEntry> allFiles = scanAll.plan().files();
assertThat(allFiles.size()).isEqualTo(numFiles);

KeyValueFileStoreScan scanFilterOnly = store.newScan();
scanFilterOnly.withSnapshot(snapshot.id());
scanFilterOnly.withValueFilter(
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L));
List<ManifestEntry> filteredFiles = scanFilterOnly.plan().files();
assertThat(filteredFiles.size()).isEqualTo(numFiles); // no file eliminated by stats

KeyValueFileStoreScan scanWithLimit = store.newScan();
scanWithLimit.withSnapshot(snapshot.id());
scanWithLimit.withValueFilter(
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L));
scanWithLimit.withLimit(5);

assertThat(scanWithLimit.limitPushdownEnabled()).isFalse();

List<ManifestEntry> limitedFiles = scanWithLimit.plan().files();

assertThat(limitedFiles.size())
.as(
"When filter is present, limit pushdown should be disabled, returning all files")
.isEqualTo(numFiles);
}

private List<KeyValue> generateData(int numRecords) {
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,90 @@ public void testLimitPushdownWithFilter() throws Exception {
table.newScan().withFilter(filter).withLimit(10).plan();
int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size();

// Should read exactly 10 files (from index 25 to 34) to get 10 rows
assertThat(splitsWithFilterAndLimit).isLessThanOrEqualTo(10);
assertThat(splitsWithFilterAndLimit).isGreaterThan(0);
assertThat(splitsWithFilterAndLimit).isLessThan(totalSplits);
// With non-partition filter, limit pushdown should be disabled to avoid
// returning insufficient rows. All 25 matching files should be returned.
assertThat(splitsWithFilterAndLimit).isEqualTo(25);

write.close();
commit.close();
}

@Test
void testLimitPushdownWithNonPartitionFilter() throws Exception {
createAppendOnlyTable();

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

int filesCount = 10;
int rowsPerFile = 100;
int filterValue = 50;

for (int fileIdx = 0; fileIdx < filesCount; fileIdx++) {
for (int i = 0; i < rowsPerFile; i++) {
write.write(rowData(fileIdx, i, (long) (fileIdx * rowsPerFile + i)));
}
commit.commit(fileIdx, write.prepareCommit(true, fileIdx));
}

TableScan.Plan planAll = table.newScan().plan();
assertThat(planAll.splits().size()).isEqualTo(filesCount);

Predicate filter =
new PredicateBuilder(table.schema().logicalRowType()).equal(1, filterValue);
TableScan.Plan planFilterOnly = table.newScan().withFilter(filter).plan();
assertThat(planFilterOnly.splits().size()).isEqualTo(filesCount);

List<String> allRows = getResult(table.newRead(), planFilterOnly.splits());
long totalMatchingRows =
allRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count();
assertThat(totalMatchingRows).isEqualTo(filesCount);

int limit = 5;
TableScan.Plan planWithFilterAndLimit =
table.newScan().withFilter(filter).withLimit(limit).plan();

List<String> limitedAllRows = getResult(table.newRead(), planWithFilterAndLimit.splits());
long limitedMatchingRows =
limitedAllRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count();

assertThat(limitedMatchingRows)
.as(
"Filter+limit bug: scan returned %d splits, but only %d rows match "
+ "filter (expected >= %d). Total matching = %d",
planWithFilterAndLimit.splits().size(),
limitedMatchingRows,
limit,
totalMatchingRows)
.isGreaterThanOrEqualTo(limit);

write.close();
commit.close();
}

@Test
void testLimitPushdownWithPartitionFilter() throws Exception {
createAppendOnlyTable();

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);

for (int i = 0; i < 10; i++) {
write.write(rowData(i, i, (long) i * 100));
commit.commit(i, write.prepareCommit(true, i));
}

Predicate partitionFilter =
new PredicateBuilder(table.schema().logicalRowType()).lessOrEqual(0, 4);

TableScan.Plan planNoLimit = table.newScan().withFilter(partitionFilter).plan();
assertThat(planNoLimit.splits().size()).isEqualTo(5);

TableScan.Plan plan = table.newScan().withFilter(partitionFilter).withLimit(2).plan();

assertThat(plan.splits().size())
.as("Partition filter + limit: limit pushdown should not be disabled")
.isEqualTo(2);

write.close();
commit.close();
Expand Down
Loading