diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 2714e773a2ca..7a0cdec31293 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -99,7 +99,11 @@ public FileStoreScan withCompleteFilter(Predicate predicate) { public Iterator readManifestEntries( List manifestFiles, boolean useSequential) { Iterator result = super.readManifestEntries(manifestFiles, useSequential); - if (limit == null || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled) { + if (limit == null + || limit <= 0 + || deletionVectorsEnabled + || dataEvolutionEnabled + || inputFilter != null) { return result; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8595753cf0ab..a3623c1b903d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -225,7 +225,11 @@ public boolean limitPushdownEnabled() { return false; } - return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && !deletionVectorsEnabled; + return mergeEngine != PARTIAL_UPDATE + && mergeEngine != AGGREGATE + && !deletionVectorsEnabled + && valueFilter == null + && keyFilter == null; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 4e0416f86e24..255ff5f672fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -126,7 +126,7 @@ public List listPartitionEntries() { } private Optional applyPushDownLimit() { - if (pushDownLimit == null) { + if (pushDownLimit == null || snapshotReader.hasNonPartitionFilter()) { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f837daaaec67..1ea0854eea5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -121,6 +121,9 @@ public interface SnapshotReader { SnapshotReader withLimit(int limit); + /** Whether the pushed filter still contains non-partition predicates. */ + boolean hasNonPartitionFilter(); + /** Get splits plan from snapshot. */ Plan read(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 67d0b9a56633..a61f43b4eb11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -102,6 +102,7 @@ public class SnapshotReaderImpl implements SnapshotReader { @Nullable private final DVMetaCache dvMetaCache; private ScanMode scanMode = ScanMode.ALL; + private boolean hasNonPartitionFilter; private RecordComparator lazyPartitionComparator; private CacheMetrics dvMetaCacheMetrics; @@ -239,6 +240,7 @@ public SnapshotReader withFilter(Predicate predicate) { scan.withPartitionFilter(pair.getLeft().get()); } if (!pair.getRight().isEmpty()) { + this.hasNonPartitionFilter = true; nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight())); } scan.withCompleteFilter(predicate); @@ -338,6 +340,11 @@ public SnapshotReader withLimit(int limit) { return this; } + @Override + public boolean hasNonPartitionFilter() { + return hasNonPartitionFilter; + } + @Override public SnapshotReader dropStats() { scan.dropStats(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 980183d8012d..1e912d406895 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -477,6 +477,11 @@ public SnapshotReader withLimit(int limit) { return this; } + @Override + public boolean hasNonPartitionFilter() { + return wrapped.hasNonPartitionFilter(); + } + @Override public Plan read() { return wrapped.read(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 9a9c29030456..ac6825ed8cbe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -331,20 +332,28 @@ public void testLimitPushdownWithValueFilter() throws Exception { @Test public void testLimitPushdownWithKeyFilter() throws Exception { - // Write data with different shop IDs List data = generateData(200); Snapshot snapshot = writeData(data); - // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit - // pushdown) + Predicate keyPredicate = + new PredicateBuilder(RowType.of(new IntType(false))) + .equal(0, data.get(0).key().getInt(0)); + + // baseline: keyFilter without limit + KeyValueFileStoreScan scanFilterOnly = store.newScan(); + scanFilterOnly.withSnapshot(snapshot.id()); + scanFilterOnly.withKeyFilter(keyPredicate); + int filteredFiles = scanFilterOnly.plan().files().size(); + assertThat(filteredFiles).isGreaterThan(0); + + // keyFilter + limit: early-stop by rowCount() is unsafe, should be disabled KeyValueFileStoreScan scan = store.newScan(); scan.withSnapshot(snapshot.id()); - scan.withKeyFilter( - new PredicateBuilder(RowType.of(new IntType(false))) - .equal(0, data.get(0).key().getInt(0))); + scan.withKeyFilter(keyPredicate); scan.withLimit(5); - List files = scan.plan().files(); - assertThat(files.size()).isGreaterThan(0); + + assertThat(scan.limitPushdownEnabled()).isFalse(); + assertThat(scan.plan().files().size()).isEqualTo(filteredFiles); } @Test @@ -569,6 +578,48 @@ private Map getActualKvMap(FileStoreScan scan, Long expect return store.toKvMap(actualKvs); } + @Test + void testLimitPushdownWithFilter() throws Exception { + int numFiles = 10; + int rowsPerFile = 100; + + Snapshot snapshot = null; + for (int bucket = 0; bucket < numFiles; bucket++) { + List data = new ArrayList<>(); + for (int i = 0; i < rowsPerFile; i++) { + data.add(gen.nextInsert("", 0, (long) i, null, null)); + } + snapshot = writeData(data, bucket); + } + + KeyValueFileStoreScan scanAll = store.newScan(); + scanAll.withSnapshot(snapshot.id()); + List allFiles = scanAll.plan().files(); + assertThat(allFiles.size()).isEqualTo(numFiles); + + KeyValueFileStoreScan scanFilterOnly = store.newScan(); + scanFilterOnly.withSnapshot(snapshot.id()); + scanFilterOnly.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L)); + List filteredFiles = scanFilterOnly.plan().files(); + assertThat(filteredFiles.size()).isEqualTo(numFiles); // no file eliminated by stats + + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()); + scanWithLimit.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L)); + scanWithLimit.withLimit(5); + + assertThat(scanWithLimit.limitPushdownEnabled()).isFalse(); + + List limitedFiles = scanWithLimit.plan().files(); + + assertThat(limitedFiles.size()) + .as( + "When filter is present, limit pushdown should be disabled, returning all files") + .isEqualTo(numFiles); + } + private List generateData(int numRecords) { List data = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 32f604130715..90cfb7be1a01 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -146,10 +146,90 @@ public void testLimitPushdownWithFilter() throws Exception { table.newScan().withFilter(filter).withLimit(10).plan(); int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size(); - // Should read exactly 10 files (from index 25 to 34) to get 10 rows - assertThat(splitsWithFilterAndLimit).isLessThanOrEqualTo(10); - assertThat(splitsWithFilterAndLimit).isGreaterThan(0); - assertThat(splitsWithFilterAndLimit).isLessThan(totalSplits); + // With non-partition filter, limit pushdown should be disabled to avoid + // returning insufficient rows. All 25 matching files should be returned. + assertThat(splitsWithFilterAndLimit).isEqualTo(25); + + write.close(); + commit.close(); + } + + @Test + void testLimitPushdownWithNonPartitionFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + int filesCount = 10; + int rowsPerFile = 100; + int filterValue = 50; + + for (int fileIdx = 0; fileIdx < filesCount; fileIdx++) { + for (int i = 0; i < rowsPerFile; i++) { + write.write(rowData(fileIdx, i, (long) (fileIdx * rowsPerFile + i))); + } + commit.commit(fileIdx, write.prepareCommit(true, fileIdx)); + } + + TableScan.Plan planAll = table.newScan().plan(); + assertThat(planAll.splits().size()).isEqualTo(filesCount); + + Predicate filter = + new PredicateBuilder(table.schema().logicalRowType()).equal(1, filterValue); + TableScan.Plan planFilterOnly = table.newScan().withFilter(filter).plan(); + assertThat(planFilterOnly.splits().size()).isEqualTo(filesCount); + + List allRows = getResult(table.newRead(), planFilterOnly.splits()); + long totalMatchingRows = + allRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count(); + assertThat(totalMatchingRows).isEqualTo(filesCount); + + int limit = 5; + TableScan.Plan planWithFilterAndLimit = + table.newScan().withFilter(filter).withLimit(limit).plan(); + + List limitedAllRows = getResult(table.newRead(), planWithFilterAndLimit.splits()); + long limitedMatchingRows = + limitedAllRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count(); + + assertThat(limitedMatchingRows) + .as( + "Filter+limit bug: scan returned %d splits, but only %d rows match " + + "filter (expected >= %d). Total matching = %d", + planWithFilterAndLimit.splits().size(), + limitedMatchingRows, + limit, + totalMatchingRows) + .isGreaterThanOrEqualTo(limit); + + write.close(); + commit.close(); + } + + @Test + void testLimitPushdownWithPartitionFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + for (int i = 0; i < 10; i++) { + write.write(rowData(i, i, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + + Predicate partitionFilter = + new PredicateBuilder(table.schema().logicalRowType()).lessOrEqual(0, 4); + + TableScan.Plan planNoLimit = table.newScan().withFilter(partitionFilter).plan(); + assertThat(planNoLimit.splits().size()).isEqualTo(5); + + TableScan.Plan plan = table.newScan().withFilter(partitionFilter).withLimit(2).plan(); + + assertThat(plan.splits().size()) + .as("Partition filter + limit: limit pushdown should not be disabled") + .isEqualTo(2); write.close(); commit.close();