feat(parquet): runtime row-group early stop via TopK dynamic filter#22450
feat(parquet): runtime row-group early stop via TopK dynamic filter#22450zhuqi-lucas wants to merge 5 commits into
Conversation
Closes apache#22407. ## What Adds runtime row-group pruning between push-decoder runs, driven by the dynamic predicate a TopK `SortExec` pushes down via `DynamicFilterPhysicalExpr`. As the heap fills, the threshold tightens, and subsequent row groups whose statistics prove they cannot contribute are skipped without ever invoking their decoder — zero IO, zero decode. ## Why DataFusion already prunes parquet at three granularities — file (`EarlyStoppingStream`), row group at scan-startup (`PruningPredicate`), and row (`RowFilter`). There is a gap: once `Layer 1` selects a file's row groups, that decision is **frozen** at scan startup, when the dynamic filter is still `lit(true)`. As `TopK` tightens at runtime, subsequent RGs in the already-opened file keep being decoded even when stats prove they can't beat the threshold. This is the dominant cost for `ORDER BY ... LIMIT` queries on multi-RG files. See apache#22407 for the full architectural trace. ## How Two coordinated pieces: 1. **`RowGroupPruner`** (in `push_decoder.rs`). Mirrors `FilePruner`'s pattern at row-group granularity: tracks `snapshot_generation` so the cached `PruningPredicate` is rebuilt only when the dynamic filter has actually moved; evaluates against the next pending run's row-group stats via the existing `RowGroupPruningStatistics` adapter from `row_group_filter.rs`. Errors fall back to "don't prune" — a flaky pruning path never silently drops data. 2. **Per-RG decoder splitting when the predicate is dynamic**. `RowGroupAccessPlan::split_runs` previously coalesced consecutive same-`fully_matched` RGs into a single run. For ORDER BY + LIMIT the initial dynamic filter is `lit(true)`, the static fully-matched analysis marks nothing, and `split_runs` collapsed every RG into one run — leaving no inter-run hook for runtime pruning. A new `force_per_row_group` flag (set by `is_dynamic_physical_expr`) disables coalescing for dynamic predicates only, so static-WHERE queries pay nothing. Plumbing: `PendingDecoderRun` wraps each queued decoder with its row group indices. `PushDecoderStreamState::transition` consults the pruner at every run boundary and skips runs whose row groups are proved unwinnable. ## Observability - New `Count` metric `row_groups_pruned_dynamic_filter` on `ParquetFileMetrics` surfaces the runtime saving. - New `dynamic_rg_pruning=eligible` marker on `ParquetSource`'s `EXPLAIN` (`fmt_extra` Default + Verbose) signals plan-time eligibility — *eligible* rather than *true* because the static plan can't predict the runtime outcome. ## Benchmarks (`benchmarks/sort_pushdown_inexact`, 5 iters) | Query | main | this PR | Δ | |---|---|---|---| | Q1 `ORDER BY l_orderkey DESC LIMIT 100` | 6.99 ms | 3.80 ms | **−46%** | | Q2 `ORDER BY l_orderkey DESC LIMIT 1000` | 3.29 ms | 1.33 ms | **−60%** | | Q3 `SELECT * ... DESC LIMIT 100` | 11.17 ms | 9.91 ms | −11% | | Q4 `SELECT * ... DESC LIMIT 1000` | 9.28 ms | 7.95 ms | −14% | Narrow-projection queries gain the most — their per-RG cost is dominated by metadata + sort-column read, which this PR eliminates for unwinnable RGs. Wide-projection queries gain less because the *kept* RG's all-column decode dominates total time, but still see meaningful savings. ## Test coverage - 6 new unit tests: 3 on `RowGroupPruner` (basic pruning, generation-tracked dynamic updates, fallback when predicate has no analyzable bounds) + 3 on `fmt_extra` marker (present on dynamic predicate, absent on static, absent on no-predicate). - 2 new integration tests in `datafusion/core/tests/parquet/dynamic_row_group_pruning.rs`: asserts `row_groups_pruned_dynamic_filter >= 1` end-to-end on a 5-RG TopK query, and asserts the metric stays at 0 when no TopK is present (no spurious firing). - New SLT `datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt` asserts both `EXPLAIN` surfaces: plain EXPLAIN shows `dynamic_rg_pruning=eligible`, and EXPLAIN ANALYZE pins `row_groups_pruned_dynamic_filter=4` (five RGs, four pruned). 129 parquet unit + 204 parquet integration + SLT all pass. `cargo clippy -D warnings` clean.
Two CI failures on PR apache#22450: 1. **cargo doc** — broken intra-doc link in `ParquetFileMetrics::row_groups_pruned_dynamic_filter`. Switch from `[\`row_groups_pruned_statistics\`]` to `[\`Self::row_groups_pruned_statistics\`]` so rustdoc can resolve it. 2. **sqllogictest substrait round-trip** — adding `dynamic_rg_pruning=eligible` to ParquetSource's `fmt_extra` output shifted every `EXPLAIN` line that already showed a `DynamicFilter` predicate. Add the marker to 13 SLT expectations: - clickbench, explain_analyze, limit, limit_pruning, dynamic_filter_pushdown_config, preserve_file_partitioning, projection_pushdown, push_down_filter_parquet, push_down_filter_regression, repartition_subset_satisfaction, sort_pushdown, statistics_registry, topk - 134 marker insertions total, all on `DataSourceExec:` lines whose predicate contains `DynamicFilter [`. Two summary-level analyze tests also need the new `row_groups_pruned_dynamic_filter=0` counter in their metrics block (`limit_pruning.slt`, `dynamic_filter_pushdown_config.slt`). Dev-level analyze output elides zero-valued counters so the other files don't need it. No behavior change beyond what was already in the previous commit.
CI runs `cargo doc --document-private-items` which catches links on private items (the previous fix only covered public items). The `row_groups_pruned_dynamic` field's doc comment referenced `[\`row_group_pruner\`]` — same-struct field, needs `Self::` to resolve.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (691926f) to 077f08a (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
This PR adds runtime row-group pruning for Parquet scans driven by TopK’s dynamic filter, closing the gap where row groups selected at file open couldn’t be re-pruned after the TopK threshold tightens during execution.
Changes:
- Introduces a runtime
RowGroupPrunerthat re-evaluates a dynamic predicate at decoder-run boundaries and skips row groups proven unreachable. - Forces per-row-group decoder splitting when the predicate is dynamic so the runtime pruner has a boundary at every RG.
- Adds observability:
dynamic_rg_pruning=eligibleinEXPLAINand a new metricrow_groups_pruned_dynamic_filterinEXPLAIN ANALYZE, plus tests/SLTs updated accordingly.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/datasource-parquet/src/push_decoder.rs | Adds RowGroupPruner, tracks row-group indices per decoder run, and skips prunable runs at runtime. |
| datafusion/datasource-parquet/src/opener/mod.rs | Forces per-RG runs for dynamic predicates; wires pending runs + runtime pruner into PushDecoderStreamState. |
| datafusion/datasource-parquet/src/access_plan.rs | Extends split_runs with force_per_row_group to avoid coalescing runs for dynamic predicates. |
| datafusion/datasource-parquet/src/source.rs | Adds dynamic_rg_pruning=eligible marker in fmt_extra and unit tests for the marker. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Exposes RowGroupPruningStatistics to reuse stats adapter for runtime pruning. |
| datafusion/datasource-parquet/src/metrics.rs | Adds row_groups_pruned_dynamic_filter metric to ParquetFileMetrics. |
| datafusion/core/tests/parquet/mod.rs | Adds helper to read row_groups_pruned_dynamic_filter from metrics. |
| datafusion/core/tests/parquet/dynamic_row_group_pruning.rs | New integration tests validating metric fires for TopK and stays quiet otherwise. |
| datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt | New SLT covering both EXPLAIN marker and EXPLAIN ANALYZE metric value. |
| datafusion/sqllogictest/test_files/topk.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/statistics_registry.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_regression.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | Updates expected plans/metrics to include dynamic_rg_pruning=eligible and (where relevant) the new counter. |
| datafusion/sqllogictest/test_files/projection_pushdown.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/preserve_file_partitioning.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Updates expected metrics to include row_groups_pruned_dynamic_filter=0 plus eligibility marker. |
| datafusion/sqllogictest/test_files/explain_analyze.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
| datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt | Updates expected plans/metrics to include eligibility marker and row_groups_pruned_dynamic_filter=0 where applicable. |
| datafusion/sqllogictest/test_files/clickbench.slt | Updates expected plans to include dynamic_rg_pruning=eligible. |
Comments suppressed due to low confidence (1)
datafusion/datasource-parquet/src/access_plan.rs:458
split_runscomputesrow_group_needs_filteras!fully_matchedwithout considering theneeds_filterargument. Whenforce_per_row_group=trueand the scan has no row filter (needs_filter=false), this will still mark all runs asneeds_filter=true, causing the opener to treat them as filtered runs (e.g. attempting to fetch row filters / applying predicate-cache settings) even though no row-level filter exists.row_group_needs_filtershould be derived asneeds_filter && !fully_matchedso the run metadata stays consistent with the caller’s capabilities.
for (idx, (access, fully_matched)) in
row_groups.into_iter().zip(fully_matched).enumerate()
{
if !access.should_scan() {
continue;
}
let row_group_needs_filter = !fully_matched;
// Coalesce consecutive RGs into a run only when (a) they share
// the same filter requirement and (b) we're not forcing per-RG
// splitting for runtime pruning.
let can_coalesce = !force_per_row_group;
if can_coalesce
&& let Some(run) = runs
.last_mut()
.filter(|run| run.needs_filter == row_group_needs_filter)
{
run.access_plan.set(idx, access);
if fully_matched {
run.access_plan.mark_fully_matched(idx);
}
} else {
let mut run_plan = ParquetAccessPlan::new_none(num_row_groups);
run_plan.set(idx, access);
if fully_matched {
run_plan.mark_fully_matched(idx);
}
runs.push(RowGroupRun::new(row_group_needs_filter, run_plan));
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
Per Copilot review on apache#22450: `RowGroupPruner` was using a single `predicate_creation_errors` counter for both predicate construction (`build_pruning_predicate`) AND predicate evaluation (`PruningPredicate::prune`) failures. The log message also said "Ignoring error building..." when the failure was during evaluation. This misattributed evaluation failures and made the metric semantics inconsistent with the static row-group pruning path in `RowGroupAccessPlanFilter::prune_by_statistics`, which already separates the two. `RowGroupPruner::new` now takes both counters: - `predicate_creation_errors`: bumped on `build_pruning_predicate` failures. Wired to `prepared.predicate_creation_errors` from the opener — same field the static path uses. - `predicate_evaluation_errors`: bumped on `PruningPredicate::prune` failures. Wired to `prepared.file_metrics.predicate_evaluation_errors` — same field the static `prune_by_statistics` path uses, so the two paths accumulate into a shared counter. The error log message is updated to say "evaluating" so the metric and the log agree.
|
run benchmark sort_pushdown_inexact |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: sort_pushdown_inexact File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagesort_pushdown_inexact — base (merge-base)
sort_pushdown_inexact — branch
File an issue against this benchmark runner |
|
run benchmark topk_tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/topk-rg-level-dynamic-pruning (0828f1b) to a8f03fd (merge-base) diff using: topk_tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetopk_tpch — base (merge-base)
topk_tpch — branch
File an issue against this benchmark runner |
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ HEAD ┃ feat_topk-rg-level-dynamic-pruning ┃ Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2.14 / 2.74 ±0.76 / 4.10 ms │ 2.12 / 2.79 ±0.68 / 4.02 ms │ no change │
│ Q2 │ 10.66 / 11.36 ±0.68 / 12.23 ms │ 2.81 / 3.61 ±0.87 / 4.72 ms │ +3.15x faster │
│ Q3 │ 31.77 / 32.15 ±0.43 / 32.83 ms │ 31.71 / 31.92 ±0.16 / 32.18 ms │ no change │
│ Q4 │ 11.83 / 12.29 ±0.77 / 13.82 ms │ 3.13 / 3.25 ±0.13 / 3.48 ms │ +3.78x faster │
│ Q5 │ 9.94 / 10.14 ±0.18 / 10.46 ms │ 9.95 / 10.02 ±0.05 / 10.09 ms │ no change │
│ Q6 │ 17.19 / 17.39 ±0.15 / 17.56 ms │ 17.11 / 17.36 ±0.37 / 18.09 ms │ no change │
│ Q7 │ 37.07 / 38.08 ±1.17 / 40.08 ms │ 37.00 / 37.41 ±0.37 / 38.07 ms │ no change │
│ Q8 │ 28.13 / 28.59 ±0.60 / 29.71 ms │ 6.86 / 7.16 ±0.42 / 7.98 ms │ +3.99x faster │
│ Q9 │ 35.34 / 36.86 ±1.54 / 38.77 ms │ 8.36 / 8.50 ±0.08 / 8.60 ms │ +4.34x faster │
│ Q10 │ 54.13 / 55.29 ±1.83 / 58.93 ms │ 12.77 / 13.00 ±0.45 / 13.89 ms │ +4.25x faster │
│ Q11 │ 3.75 / 3.91 ±0.11 / 4.05 ms │ 3.82 / 4.08 ±0.31 / 4.68 ms │ no change │
└───────┴────────────────────────────────┴────────────────────────────────────┴───────────────┘cc @alamb @adriangb @Dandandan |
|
Nice, impressive 🚀🚀🚀 |
Which issue does this PR close?
Closes #22407.
Rationale for this change
DataFusion already prunes parquet at three granularities — file
(
EarlyStoppingStream+FilePruner), row group at scan-startup(
PruningPredicate→RowGroupAccessPlanFilter), and row inside anopen RG (
RowFilter).There's a gap in the middle: once Layer 1 (RG-static) picks the row
groups at file open, that decision is frozen because the dynamic
filter is still
lit(true)then. AsTopKtightens its threshold atruntime, subsequent RGs in the already-opened file keep getting decoded
even when their stats already prove they can't beat the threshold. This
is the dominant cost for
ORDER BY ... LIMITqueries on multi-RG fileswhere file-level pruning can't help (single large file, or scrambled-RG
multi-file).
See the issue for a full architectural diagram and a concrete trace
showing where the wasted I/O / decompression / decode lives.
What changes are included in this PR?
Two coordinated pieces that close the gap:
RowGroupPruner(indatafusion/datasource-parquet/src/push_decoder.rs)mirrors
FilePruner's pattern at row-group granularity. Trackssnapshot_generation(&predicate)so the cachedPruningPredicateis rebuilt only when the dynamic filter has actually moved, then
evaluates against the next pending decoder run's row-group stats
via the existing
RowGroupPruningStatisticsadapter. Errors fallback to "don't prune" — a flaky pruning path never silently drops
data.
Per-row-group decoder splitting when the predicate is dynamic.
ParquetAccessPlan::split_runspreviously coalesced consecutivesame-
fully_matchedRGs into a single run. For ORDER BY + LIMITthe initial dynamic filter is
lit(true), so the staticfully-matched analysis marks nothing and
split_runscollapsedevery RG into one run — leaving no inter-run hook. A new
force_per_row_groupflag (set byis_dynamic_physical_expr)disables coalescing for dynamic predicates only, so static
WHERE queries pay nothing.
PendingDecoderRunwraps each queued decoder with its row groupindices.
PushDecoderStreamState::transitionconsults the pruner atevery run boundary and skips runs whose row groups are proved
unwinnable.
Observability
Countmetricrow_groups_pruned_dynamic_filteronParquetFileMetricssurfaces the runtime saving.dynamic_rg_pruning=eligiblemarker onParquetSource'sEXPLAIN(fmt_extraDefault + Verbose) signals plan-timeeligibility. Eligible rather than true because the static
plan can't predict the runtime outcome.
Benchmarks (
benchmarks/sort_pushdown_inexact, 5 iterations)ORDER BY l_orderkey DESC LIMIT 100ORDER BY l_orderkey DESC LIMIT 1000SELECT * ... DESC LIMIT 100SELECT * ... DESC LIMIT 1000Narrow-projection queries gain the most — their per-RG cost is
dominated by metadata + sort-column read, which this PR eliminates
for unwinnable RGs. Wide-projection queries gain less because the
kept RG's all-column decode dominates total time, but still see
meaningful savings.
Are these changes tested?
Yes. Three layers:
push_decoder.rs::tests:RowGroupPrunerbasic pruning,generation-tracked dynamic-filter updates, fallback when the
predicate has no analyzable bounds.
source.rs::tests:dynamic_rg_pruning=eligiblemarkerpresent on dynamic predicate, absent on static predicate, absent
when there is no predicate at all.
datafusion/core/tests/parquet/dynamic_row_group_pruning.rs:asserts
row_groups_pruned_dynamic_filter >= 1end-to-end on a5-RG
ORDER BY DESC LIMIT 5scan, and asserts the metric stays at0 when there is no TopK (no spurious firing).
datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt:asserts both
EXPLAINsurfaces — plainEXPLAINshowsdynamic_rg_pruning=eligible, andEXPLAIN ANALYZEpinsrow_groups_pruned_dynamic_filter=4(five RGs, four pruned atruntime).
129 parquet unit + 204 parquet integration + SLT all pass.
cargo clippy --all-targets --all-features -- -D warningsclean.Are there any user-facing changes?
Two visible additions, both opt-in via existing dynamic-filter
infrastructure:
row_groups_pruned_dynamic_filtercounter visible inEXPLAIN ANALYZEfor queries whose plan carries aDynamicFilterPhysicalExpr(today: only TopK withenable_topk_dynamic_filter_pushdown=true, which is the default).dynamic_rg_pruning=eligiblemarker visible inEXPLAINoutput for the same queries.
No config changes, no API breakage, no behavior change for queries
without a dynamic predicate.