[core][flink] supports count star pushdown for all-global-indexed filters.#7680
[core][flink] supports count star pushdown for all-global-indexed filters.#7680steFaiz wants to merge 4 commits into
Conversation
|
I'm not sure whether it's safe. The result may be inaccurate. This PR assumes that: If the whole data predicates can be covered by global index, those predicates can be fully consumed by paimon. But actually this is not right, because the global index might be not covering all data. Moreover, current Index is not exact, for example for unsupported queries e.g. LIKE, NOT_LIKE, the index reader will return ALL_NON_NULL_ROWS. |
|
I think an alternative approach is to introduce a new procedure or a special option. |
JingsongLi
left a comment
There was a problem hiding this comment.
Review of count-star pushdown for globally-indexed filters
Thanks for the contribution. The idea of leveraging global btree indexes for count-star optimization is interesting. I have several concerns ranging from a likely crash to the broader correctness issue you already raised yourself.
- Potential IllegalArgumentException in allFieldsIndexed (Bug)
In FlinkTableSource.applyFilters, for a bounded source with only partition predicates (e.g., WHERE dt = dt1), the flow is: the partition predicate is convertible so hasUnconvertedFilter stays false; it visits onlyPartFieldsVisitor returning true so it goes to consumedFilters (not unConsumedFilters); but it is still added to converted, so predicate is non-null.
When the new code block is reached, all guards pass (!isUnbounded(), !hasUnconvertedFilter), and allFieldsIndexed(table, predicate, ...) is called. Inside, splitPartitionPredicatesAndDataPredicates splits out the partition predicate, leaving getRight() as an empty list. Then PredicateBuilder.and(emptyList) throws: Preconditions.checkArgument(!predicates.isEmpty(), "There must be at least 1 inner predicate...")
Fix: Add a guard !unConsumedFilters.isEmpty() before calling allFieldsIndexed. If there are no unconsumed filters, there is nothing to optimize.
- Correctness: partial index coverage
You raised this yourself in the comments, but to be explicit about the failure mode: if create_global_index was called with partitions => dt=dt1, only dt1 is indexed. For a query WHERE id = 1 (no partition filter), allFieldsIndexed scans index entries with a null partition predicate, finds that id is indexed (from dt1 entries), and returns true marking the filter as consumed. But dt2 data is NOT indexed.
If the source then only counts rows from the index for the pushdown, rows in dt2 matching id = 1 would be missed. The test passes because global-index likely enforces cross-partition uniqueness (so only one row exists), but this assumption should be documented or validated explicitly in the code.
- I/O during query planning
allFieldsIndexed calls newIndexFileHandler().scan(...) which reads index manifest files. This happens inside applyFilters, which is invoked during Flink plan optimization phase. For tables with many index entries, this could noticeably slow down query compilation. Consider whether this metadata can be cached or whether the check should be deferred.
- Minor: redundant split inside allFieldsIndexed
The predicate passed from FlinkTableSource is built from converted, which (for bounded sources) already includes partition predicates in the mix. Inside allFieldsIndexed, you split them out again. This works but the caller partitionPredicate and the one extracted inside allFieldsIndexed could diverge. The effectivePartPredicate logic handles this, but it would be clearer to pass only the data predicates from the caller.
- Test: testCountStarBTreeIndexPartitionedTable - WHERE id = 1 without partition filter
This test asserts pushDown = true and expectedCount = 1 for a query that spans all partitions on a table where only dt1 is indexed. The correctness depends on global-index uniqueness semantics. A comment in the test explaining this would help future readers understand why the count is 1, not 2.
- Suggestion: predicate allowlist
The visitor conservatively allows only Equal/In/Between/IsNull. Btree indexes naturally support range predicates (GreaterThan, LessThan, etc.) with exact results. If the index implementation can precisely answer these, the allowlist could be expanded in a follow-up.
Summary: The crash in point (1) should be fixed before merge. The correctness concern in point (2) is the deeper design question - the optimization is only safe if the global index is guaranteed to cover all data in the query scope. If partial indexing is supported, additional validation (e.g., verifying all partitions in scope have index coverage) would be needed.
Purpose
This introduce a PredicateVisitor to decide whether a Predicate is all covered by scalar index (e.g. btree now).
Then we can accelerate
count(*)queries on indexed columns without touching data files. This is useful in some AI situations. For example, before filtering high quality data, we might want to check the portion to decide the threshold.Tests
org.apache.paimon.globalindex.ScalarIndexedFieldsVisitorTestfor UnitTestorg.apache.paimon.flink.BatchFileStoreITCasefor ITCase