Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When reading a Parquet file with both a RowFilter chain and with_limit(N) (a typical TopK / LIMIT query), the limit today only trims the output after filter evaluation is complete. The filter chain still fully decodes the predicate columns of every batch in the row group and invokes every predicate over all of them, even though we only need N matching rows from the tail of the chain.
Concretely, ReadPlanBuilder::with_predicate (in parquet/src/arrow/arrow_reader/read_plan.rs) iterates a ParquetRecordBatchReader to completion regardless of how many matches have already been found, and RowGroupReaderBuilder (in parquet/src/arrow/push_decoder/reader_builder/mod.rs) enters Filters and fetches filter columns for the next row group even when the remaining limit is 0.
Describe the solution you'd like
Push the LIMIT (plus any OFFSET) down into the evaluation of the last predicate in the filter chain, and short-circuit at the row-group state machine when no more output rows are needed:
Describe alternatives you've considered
Additional context
i find this during test the datafusion.execution.parquet.pushdown_filters feature in datafusion,
my sql is like
select * from table where xxx order by time desc limit 10
and the file is already sorted by time desc, so the plan will like below
SortPreservingMergeExec
FilterExec
DataSourceExec
after enable the datafusion.execution.parquet.pushdown_filters , the plan is below
SortPreservingMergeExec
DataSourceExec limit=10
but enable pushdown_filters is 5x slow than disable pushdown_filters
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When reading a Parquet file with both a
RowFilterchain andwith_limit(N)(a typical TopK /LIMITquery), the limit today only trims the output after filter evaluation is complete. The filter chain still fully decodes the predicate columns of every batch in the row group and invokes every predicate over all of them, even though we only needNmatching rows from the tail of the chain.Concretely,
ReadPlanBuilder::with_predicate(in parquet/src/arrow/arrow_reader/read_plan.rs) iterates aParquetRecordBatchReaderto completion regardless of how many matches have already been found, andRowGroupReaderBuilder(in parquet/src/arrow/push_decoder/reader_builder/mod.rs) entersFiltersand fetches filter columns for the next row group even when the remaining limit is0.Describe the solution you'd like
Push the
LIMIT(plus anyOFFSET) down into the evaluation of the last predicate in the filter chain, and short-circuit at the row-group state machine when no more output rows are needed:Describe alternatives you've considered
Additional context
i find this during test the
datafusion.execution.parquet.pushdown_filtersfeature in datafusion,my sql is like
and the file is already sorted by time desc, so the plan will like below
after enable the
datafusion.execution.parquet.pushdown_filters, the plan is belowbut enable pushdown_filters is 5x slow than disable pushdown_filters