From 137203c299a208a49b3fc8aaf8ccb8325020ff27 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 16:10:08 +0800 Subject: [PATCH 01/10] [core] Do not limit pushdown with filter is present --- .../operation/AppendOnlyFileStoreScan.java | 6 +- .../operation/KeyValueFileStoreScan.java | 5 +- .../table/source/DataTableBatchScan.java | 4 +- .../operation/KeyValueFileStoreScanTest.java | 42 +++++++++++++ .../paimon/table/source/TableScanTest.java | 63 +++++++++++-------- 5 files changed, 91 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 2714e773a2ca..7a0cdec31293 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -99,7 +99,11 @@ public FileStoreScan withCompleteFilter(Predicate predicate) { public Iterator readManifestEntries( List manifestFiles, boolean useSequential) { Iterator result = super.readManifestEntries(manifestFiles, useSequential); - if (limit == null || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled) { + if (limit == null + || limit <= 0 + || deletionVectorsEnabled + || dataEvolutionEnabled + || inputFilter != null) { return result; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8595753cf0ab..cdc7aa360330 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -225,7 +225,10 @@ public boolean limitPushdownEnabled() { return false; } - return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && !deletionVectorsEnabled; + return mergeEngine != PARTIAL_UPDATE + && mergeEngine != AGGREGATE + && !deletionVectorsEnabled + && valueFilter == null; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 4e0416f86e24..329503d57010 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -50,6 +50,7 @@ public class DataTableBatchScan extends AbstractDataTableScan { private boolean hasNext; private Integer pushDownLimit; + private Predicate filter; private TopN topN; private final SchemaManager schemaManager; @@ -78,6 +79,7 @@ public DataTableBatchScan( @Override public InnerTableScan withFilter(Predicate predicate) { + this.filter = predicate; super.withFilter(predicate); return this; } @@ -126,7 +128,7 @@ public List listPartitionEntries() { } private Optional applyPushDownLimit() { - if (pushDownLimit == null) { + if (pushDownLimit == null || filter != null) { return Optional.empty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 9a9c29030456..8c3bd1e0ca57 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -569,6 +569,48 @@ private Map getActualKvMap(FileStoreScan scan, Long expect return store.toKvMap(actualKvs); } + @Test + void testLimitPushdownWithFilter() throws Exception { + int numFiles = 10; + int rowsPerFile = 100; + + Snapshot snapshot = null; + for (int bucket = 0; bucket < numFiles; bucket++) { + List data = new ArrayList<>(); + for (int i = 0; i < rowsPerFile; i++) { + data.add(gen.nextInsert("", 0, (long) i, null, null)); + } + snapshot = writeData(data, bucket); + } + + KeyValueFileStoreScan scanAll = store.newScan(); + scanAll.withSnapshot(snapshot.id()); + List allFiles = scanAll.plan().files(); + assertThat(allFiles.size()).isEqualTo(numFiles); + + KeyValueFileStoreScan scanFilterOnly = store.newScan(); + scanFilterOnly.withSnapshot(snapshot.id()); + scanFilterOnly.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L)); + List filteredFiles = scanFilterOnly.plan().files(); + assertThat(filteredFiles.size()).isEqualTo(numFiles); // no file eliminated by stats + + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()); + scanWithLimit.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE).equal(4, 50L)); + scanWithLimit.withLimit(5); + + assertThat(scanWithLimit.limitPushdownEnabled()).isFalse(); + + List limitedFiles = scanWithLimit.plan().files(); + + assertThat(limitedFiles.size()) + .as( + "When filter is present, limit pushdown should be disabled, returning all files") + .isEqualTo(numFiles); + } + private List generateData(int numRecords) { List data = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 32f604130715..64954062a812 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -114,42 +114,53 @@ public void testPushDownLimit() throws Exception { } @Test - public void testLimitPushdownWithFilter() throws Exception { + void testLimitPushdownWithFilter() throws Exception { createAppendOnlyTable(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); - // Write 50 files, each with 1 row. Rows 0-24 have 'a' = 10, rows 25-49 have 'a' = 20. - for (int i = 0; i < 25; i++) { - write.write(rowData(i, 10, (long) i * 100)); - commit.commit(i, write.prepareCommit(true, i)); - } - for (int i = 25; i < 50; i++) { - write.write(rowData(i, 20, (long) i * 100)); - commit.commit(i, write.prepareCommit(true, i)); + int filesCount = 10; + int rowsPerFile = 100; + int filterValue = 50; + + for (int fileIdx = 0; fileIdx < filesCount; fileIdx++) { + for (int i = 0; i < rowsPerFile; i++) { + write.write(rowData(fileIdx, i, (long) (fileIdx * rowsPerFile + i))); + } + commit.commit(fileIdx, write.prepareCommit(true, fileIdx)); } - // Without limit, should read all 50 files - TableScan.Plan planWithoutLimit = table.newScan().plan(); - int totalSplits = planWithoutLimit.splits().size(); - assertThat(totalSplits).isEqualTo(50); + TableScan.Plan planAll = table.newScan().plan(); + assertThat(planAll.splits().size()).isEqualTo(filesCount); - // With filter (a = 20) and limit (10) - // filterByStats has already been applied in baseIterator, so only files 25-49 will be - // returned - // To get 10 rows, it should read 10 files (from index 25 to 34) Predicate filter = - new PredicateBuilder(table.schema().logicalRowType()) - .equal(1, 20); // Filter on 'a' = 20 - TableScan.Plan planWithFilterAndLimit = - table.newScan().withFilter(filter).withLimit(10).plan(); - int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size(); + new PredicateBuilder(table.schema().logicalRowType()).equal(1, filterValue); + TableScan.Plan planFilterOnly = table.newScan().withFilter(filter).plan(); + assertThat(planFilterOnly.splits().size()).isEqualTo(filesCount); - // Should read exactly 10 files (from index 25 to 34) to get 10 rows - assertThat(splitsWithFilterAndLimit).isLessThanOrEqualTo(10); - assertThat(splitsWithFilterAndLimit).isGreaterThan(0); - assertThat(splitsWithFilterAndLimit).isLessThan(totalSplits); + List allRows = getResult(table.newRead(), planFilterOnly.splits()); + long totalMatchingRows = + allRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count(); + assertThat(totalMatchingRows).isEqualTo(filesCount); + + int limit = 5; + TableScan.Plan planWithFilterAndLimit = + table.newScan().withFilter(filter).withLimit(limit).plan(); + + List limitedAllRows = getResult(table.newRead(), planWithFilterAndLimit.splits()); + long limitedMatchingRows = + limitedAllRows.stream().filter(r -> r.contains("|" + filterValue + "|")).count(); + + assertThat(limitedMatchingRows) + .as( + "Filter+limit bug: scan returned %d splits, but only %d rows match " + + "filter (expected >= %d). Total matching = %d", + planWithFilterAndLimit.splits().size(), + limitedMatchingRows, + limit, + totalMatchingRows) + .isGreaterThanOrEqualTo(limit); write.close(); commit.close(); From f74d63607fe20118d2584cb25aa3232a8d234fef Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 16:46:58 +0800 Subject: [PATCH 02/10] [core] fix limit pushdown not working in partition filter issue --- .../operation/AbstractFileStoreScan.java | 4 +++ .../operation/AppendOnlyFileStoreScan.java | 3 +- .../table/source/DataTableBatchScan.java | 10 +++++-- .../paimon/table/source/TableScanTest.java | 28 +++++++++++++++++++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 30908cce3e43..c5180d5f9967 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -141,6 +141,10 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { return this; } + protected boolean hasPartitionFilter() { + return manifestsReader.partitionFilter() != null; + } + @Override public FileStoreScan onlyReadRealBuckets() { manifestsReader.onlyReadRealBuckets(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 7a0cdec31293..760444209c73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -103,7 +103,8 @@ public Iterator readManifestEntries( || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled - || inputFilter != null) { + || inputFilter != null + || hasPartitionFilter()) { return result; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 329503d57010..37ae1849fbc2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.schema.SchemaManager; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -50,7 +52,7 @@ public class DataTableBatchScan extends AbstractDataTableScan { private boolean hasNext; private Integer pushDownLimit; - private Predicate filter; + private boolean hasNonPartitionFilter; private TopN topN; private final SchemaManager schemaManager; @@ -79,7 +81,9 @@ public DataTableBatchScan( @Override public InnerTableScan withFilter(Predicate predicate) { - this.filter = predicate; + this.hasNonPartitionFilter = + !new HashSet<>(schema.partitionKeys()) + .containsAll(PredicateVisitor.collectFieldNames(predicate)); super.withFilter(predicate); return this; } @@ -128,7 +132,7 @@ public List listPartitionEntries() { } private Optional applyPushDownLimit() { - if (pushDownLimit == null || filter != null) { + if (pushDownLimit == null || hasNonPartitionFilter) { return Optional.empty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 64954062a812..9f8a0f1196c4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -166,6 +166,34 @@ void testLimitPushdownWithFilter() throws Exception { commit.close(); } + @Test + void testLimitPushdownWithPartitionFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + for (int i = 0; i < 10; i++) { + write.write(rowData(i, i, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + + Predicate partitionFilter = + new PredicateBuilder(table.schema().logicalRowType()).lessOrEqual(0, 4); + + TableScan.Plan planNoLimit = table.newScan().withFilter(partitionFilter).plan(); + assertThat(planNoLimit.splits().size()).isEqualTo(5); + + TableScan.Plan plan = table.newScan().withFilter(partitionFilter).withLimit(2).plan(); + + assertThat(plan.splits().size()) + .as("Partition filter + limit: limit pushdown should not be disabled") + .isEqualTo(2); + + write.close(); + commit.close(); + } + @Test public void testLimitPushdownWhenDataLessThanLimit() throws Exception { createAppendOnlyTable(); From a43b8cf9dc04fa3a9017ed005a623a207345e36c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 17:05:16 +0800 Subject: [PATCH 03/10] [core] remove hasPartitionFilter in AbstractFileStoreScan --- .../operation/AbstractFileStoreScan.java | 4 --- .../operation/AppendOnlyFileStoreScan.java | 30 ------------------- 2 files changed, 34 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index c5180d5f9967..30908cce3e43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -141,10 +141,6 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { return this; } - protected boolean hasPartitionFilter() { - return manifestsReader.partitionFilter() != null; - } - @Override public FileStoreScan onlyReadRealBuckets() { manifestsReader.onlyReadRealBuckets(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 760444209c73..609cbf83c0c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -22,7 +22,6 @@ import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; -import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -34,9 +33,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -95,32 +91,6 @@ public FileStoreScan withCompleteFilter(Predicate predicate) { return this; } - @Override - public Iterator readManifestEntries( - List manifestFiles, boolean useSequential) { - Iterator result = super.readManifestEntries(manifestFiles, useSequential); - if (limit == null - || limit <= 0 - || deletionVectorsEnabled - || dataEvolutionEnabled - || inputFilter != null - || hasPartitionFilter()) { - return result; - } - - List filtered = new ArrayList<>(); - long accumulatedRowCount = 0; - while (result.hasNext()) { - ManifestEntry next = result.next(); - filtered.add(next); - accumulatedRowCount += next.file().rowCount(); - if (accumulatedRowCount >= limit) { - break; - } - } - return filtered.iterator(); - } - /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { From 8e591c7a3640ebe263681ea7356c98203f53fce3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 17:12:58 +0800 Subject: [PATCH 04/10] [core] add readManifestEntries back --- .../operation/AppendOnlyFileStoreScan.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 609cbf83c0c9..e67e207cc28f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -22,6 +22,7 @@ import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -33,6 +34,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -91,6 +95,32 @@ public FileStoreScan withCompleteFilter(Predicate predicate) { return this; } + @Override + public Iterator readManifestEntries( + List manifestFiles, boolean useSequential) { + Iterator result = super.readManifestEntries(manifestFiles, useSequential); + if (limit == null + || limit <= 0 + || deletionVectorsEnabled + || dataEvolutionEnabled + || inputFilter != null + || manifestsReader().partitionFilter() != null) { + return result; + } + + List filtered = new ArrayList<>(); + long accumulatedRowCount = 0; + while (result.hasNext()) { + ManifestEntry next = result.next(); + filtered.add(next); + accumulatedRowCount += next.file().rowCount(); + if (accumulatedRowCount >= limit) { + break; + } + } + return filtered.iterator(); + } + /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { From 2aa9099ec5fe935aed591efca04b6c45abf4a6a9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 17:47:50 +0800 Subject: [PATCH 05/10] [core] add old testLimitPushdownWithFilter back --- .../operation/AppendOnlyFileStoreScan.java | 2 + .../paimon/table/source/TableScanTest.java | 43 ++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index e67e207cc28f..36907ef499e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -104,6 +104,8 @@ public Iterator readManifestEntries( || deletionVectorsEnabled || dataEvolutionEnabled || inputFilter != null + // Partition filter runs after this method, so early truncation here + // may discard files belonging to the target partition. || manifestsReader().partitionFilter() != null) { return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java index 9f8a0f1196c4..90cfb7be1a01 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java @@ -114,7 +114,48 @@ public void testPushDownLimit() throws Exception { } @Test - void testLimitPushdownWithFilter() throws Exception { + public void testLimitPushdownWithFilter() throws Exception { + createAppendOnlyTable(); + + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + // Write 50 files, each with 1 row. Rows 0-24 have 'a' = 10, rows 25-49 have 'a' = 20. + for (int i = 0; i < 25; i++) { + write.write(rowData(i, 10, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + for (int i = 25; i < 50; i++) { + write.write(rowData(i, 20, (long) i * 100)); + commit.commit(i, write.prepareCommit(true, i)); + } + + // Without limit, should read all 50 files + TableScan.Plan planWithoutLimit = table.newScan().plan(); + int totalSplits = planWithoutLimit.splits().size(); + assertThat(totalSplits).isEqualTo(50); + + // With filter (a = 20) and limit (10) + // filterByStats has already been applied in baseIterator, so only files 25-49 will be + // returned + // To get 10 rows, it should read 10 files (from index 25 to 34) + Predicate filter = + new PredicateBuilder(table.schema().logicalRowType()) + .equal(1, 20); // Filter on 'a' = 20 + TableScan.Plan planWithFilterAndLimit = + table.newScan().withFilter(filter).withLimit(10).plan(); + int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size(); + + // With non-partition filter, limit pushdown should be disabled to avoid + // returning insufficient rows. All 25 matching files should be returned. + assertThat(splitsWithFilterAndLimit).isEqualTo(25); + + write.close(); + commit.close(); + } + + @Test + void testLimitPushdownWithNonPartitionFilter() throws Exception { createAppendOnlyTable(); StreamTableWrite write = table.newWrite(commitUser); From 79451a56df05d1327606914103cf1b0b2f01dd77 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 18:03:46 +0800 Subject: [PATCH 06/10] remove partitionFilter check in readManifestEntries --- .../org/apache/paimon/operation/AppendOnlyFileStoreScan.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 36907ef499e4..7a0cdec31293 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -103,10 +103,7 @@ public Iterator readManifestEntries( || limit <= 0 || deletionVectorsEnabled || dataEvolutionEnabled - || inputFilter != null - // Partition filter runs after this method, so early truncation here - // may discard files belonging to the target partition. - || manifestsReader().partitionFilter() != null) { + || inputFilter != null) { return result; } From 9a650f3a46d476b0fea12aa690b1c8be590ebed0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 18:43:48 +0800 Subject: [PATCH 07/10] do not pushdown limit if key filter is present --- .../paimon/operation/KeyValueFileStoreScan.java | 3 ++- .../operation/KeyValueFileStoreScanTest.java | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index cdc7aa360330..a3623c1b903d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -228,7 +228,8 @@ public boolean limitPushdownEnabled() { return mergeEngine != PARTIAL_UPDATE && mergeEngine != AGGREGATE && !deletionVectorsEnabled - && valueFilter == null; + && valueFilter == null + && keyFilter == null; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 8c3bd1e0ca57..3be62d026b2c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -331,20 +331,24 @@ public void testLimitPushdownWithValueFilter() throws Exception { @Test public void testLimitPushdownWithKeyFilter() throws Exception { - // Write data with different shop IDs List data = generateData(200); Snapshot snapshot = writeData(data); - // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit - // pushdown) + KeyValueFileStoreScan scanAll = store.newScan(); + scanAll.withSnapshot(snapshot.id()); + int totalFiles = scanAll.plan().files().size(); + assertThat(totalFiles).isGreaterThan(0); + + // keyFilter + limit: early-stop by rowCount() is unsafe, should be disabled KeyValueFileStoreScan scan = store.newScan(); scan.withSnapshot(snapshot.id()); scan.withKeyFilter( new PredicateBuilder(RowType.of(new IntType(false))) .equal(0, data.get(0).key().getInt(0))); scan.withLimit(5); - List files = scan.plan().files(); - assertThat(files.size()).isGreaterThan(0); + + assertThat(scan.limitPushdownEnabled()).isFalse(); + assertThat(scan.plan().files().size()).isEqualTo(totalFiles); } @Test From 1e1acaa4eb28fceec4ca5805bdcb3a48c5eabe21 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 18:51:38 +0800 Subject: [PATCH 08/10] fix test case failure --- .../operation/KeyValueFileStoreScanTest.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 3be62d026b2c..ac6825ed8cbe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; @@ -334,21 +335,25 @@ public void testLimitPushdownWithKeyFilter() throws Exception { List data = generateData(200); Snapshot snapshot = writeData(data); - KeyValueFileStoreScan scanAll = store.newScan(); - scanAll.withSnapshot(snapshot.id()); - int totalFiles = scanAll.plan().files().size(); - assertThat(totalFiles).isGreaterThan(0); + Predicate keyPredicate = + new PredicateBuilder(RowType.of(new IntType(false))) + .equal(0, data.get(0).key().getInt(0)); + + // baseline: keyFilter without limit + KeyValueFileStoreScan scanFilterOnly = store.newScan(); + scanFilterOnly.withSnapshot(snapshot.id()); + scanFilterOnly.withKeyFilter(keyPredicate); + int filteredFiles = scanFilterOnly.plan().files().size(); + assertThat(filteredFiles).isGreaterThan(0); // keyFilter + limit: early-stop by rowCount() is unsafe, should be disabled KeyValueFileStoreScan scan = store.newScan(); scan.withSnapshot(snapshot.id()); - scan.withKeyFilter( - new PredicateBuilder(RowType.of(new IntType(false))) - .equal(0, data.get(0).key().getInt(0))); + scan.withKeyFilter(keyPredicate); scan.withLimit(5); assertThat(scan.limitPushdownEnabled()).isFalse(); - assertThat(scan.plan().files().size()).isEqualTo(totalFiles); + assertThat(scan.plan().files().size()).isEqualTo(filteredFiles); } @Test From 0a5c7ac0a4ae6f3be43ee6457f3b8773964666e9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 19:09:43 +0800 Subject: [PATCH 09/10] fix --- .../java/org/apache/paimon/table/source/DataTableBatchScan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 37ae1849fbc2..3d24be336fbd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -81,7 +81,7 @@ public DataTableBatchScan( @Override public InnerTableScan withFilter(Predicate predicate) { - this.hasNonPartitionFilter = + this.hasNonPartitionFilter |= !new HashSet<>(schema.partitionKeys()) .containsAll(PredicateVisitor.collectFieldNames(predicate)); super.withFilter(predicate); From 616344670a0c669738c4046d0a2d1dab05a85dba Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 17 Apr 2026 20:20:59 +0800 Subject: [PATCH 10/10] [core] move non-partition filter detection to SnapshotReader --- .../apache/paimon/table/source/DataTableBatchScan.java | 8 +------- .../paimon/table/source/snapshot/SnapshotReader.java | 3 +++ .../paimon/table/source/snapshot/SnapshotReaderImpl.java | 7 +++++++ .../org/apache/paimon/table/system/AuditLogTable.java | 5 +++++ 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 3d24be336fbd..255ff5f672fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.predicate.SortValue; import org.apache.paimon.predicate.TopN; import org.apache.paimon.schema.SchemaManager; @@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -52,7 +50,6 @@ public class DataTableBatchScan extends AbstractDataTableScan { private boolean hasNext; private Integer pushDownLimit; - private boolean hasNonPartitionFilter; private TopN topN; private final SchemaManager schemaManager; @@ -81,9 +78,6 @@ public DataTableBatchScan( @Override public InnerTableScan withFilter(Predicate predicate) { - this.hasNonPartitionFilter |= - !new HashSet<>(schema.partitionKeys()) - .containsAll(PredicateVisitor.collectFieldNames(predicate)); super.withFilter(predicate); return this; } @@ -132,7 +126,7 @@ public List listPartitionEntries() { } private Optional applyPushDownLimit() { - if (pushDownLimit == null || hasNonPartitionFilter) { + if (pushDownLimit == null || snapshotReader.hasNonPartitionFilter()) { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f837daaaec67..1ea0854eea5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -121,6 +121,9 @@ public interface SnapshotReader { SnapshotReader withLimit(int limit); + /** Whether the pushed filter still contains non-partition predicates. */ + boolean hasNonPartitionFilter(); + /** Get splits plan from snapshot. */ Plan read(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 67d0b9a56633..a61f43b4eb11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -102,6 +102,7 @@ public class SnapshotReaderImpl implements SnapshotReader { @Nullable private final DVMetaCache dvMetaCache; private ScanMode scanMode = ScanMode.ALL; + private boolean hasNonPartitionFilter; private RecordComparator lazyPartitionComparator; private CacheMetrics dvMetaCacheMetrics; @@ -239,6 +240,7 @@ public SnapshotReader withFilter(Predicate predicate) { scan.withPartitionFilter(pair.getLeft().get()); } if (!pair.getRight().isEmpty()) { + this.hasNonPartitionFilter = true; nonPartitionFilterConsumer.accept(scan, PredicateBuilder.and(pair.getRight())); } scan.withCompleteFilter(predicate); @@ -338,6 +340,11 @@ public SnapshotReader withLimit(int limit) { return this; } + @Override + public boolean hasNonPartitionFilter() { + return hasNonPartitionFilter; + } + @Override public SnapshotReader dropStats() { scan.dropStats(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 980183d8012d..1e912d406895 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -477,6 +477,11 @@ public SnapshotReader withLimit(int limit) { return this; } + @Override + public boolean hasNonPartitionFilter() { + return wrapped.hasNonPartitionFilter(); + } + @Override public Plan read() { return wrapped.read();