[spark] Support filter pushdown for log tables#3116
Conversation
|
@Yohahaha @YannByron @luoyuxia PTAL 🙏 |
Yohahaha
left a comment
There was a problem hiding this comment.
thank you! left some comments.
ee3d458 to
71a79c5
Compare
|
@Yohahaha @YannByron Ty for the review 👍 Redesigned to the newer API, PTAL 🙏 |
|
LGTM. thanks @fresh-borzoni |
|
@luoyuxia Can you take a look, pls? |
There was a problem hiding this comment.
Pull request overview
This PR adds Spark-side filter pushdown for Fluss log (append) tables by converting Spark V2 predicates into Fluss predicates and applying them as server-side record-batch filters, while still letting Spark re-apply predicates for row-exact correctness.
Changes:
- Introduce
SparkPredicateConverterto translate Spark V2Predicateexpressions into FlussPredicates. - Wire Spark V2 filter pushdown into
FlussAppendScanBuilder/FlussAppendScanand apply the pushed predicate inFlussAppendPartitionReaderviatable.newScan().filter(...). - Add unit/integration tests validating predicate conversion and verifying pushdown shows up in Spark scan plans.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala | New converter from Spark predicates to Fluss predicates for pushdown. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala | Add SupportsPushDownV2Filters mixin and pushdown plumbing for append scans. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala | Extend FlussAppendScan to carry pushed predicates and include them in scan description. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala | Thread pushed predicate into append batch reader factory creation. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala | Extend append reader factory to accept an optional pushed predicate. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala | Apply server-side batch filter via TableScan.filter(...) when reading. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala | Update append micro-batch reader factory call signature (currently still passes None). |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakeAppendBatch.scala | Update factory construction signature for fallback path. |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/lake/FlussLakePartitionReaderFactory.scala | Update append partition reader construction signature for log splits. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala | New unit tests covering predicate conversion semantics. |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala | New tests asserting Spark plans show pushed predicates for log-table reads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. Only one minor comments. Also, will the pushdown for lake reader be done in following pr?
|
@luoyuxia Ty for the review, addressed comments, PTAL 🙏 Yes, pushdown for the lake reader is planned as a follow-up, here it is primarly converter introduction for further use-cases. |
closes #3117
Adds SupportsPushDownFilters to FlussAppendScanBuilder and a SparkPredicateConverter mirroring Flink's PredicateConverter.
Record-batch pushdown uses the server-side filter from #2951, Spark re-applies every filter as a safety net, making pushdown a pure optimization.