Adaptive filter pushdown for the parquet scan#22237
Draft
adriangb wants to merge 4 commits into
Draft
Conversation
Introduce `OptionalFilterPhysicalExpr`, a transparent `PhysicalExpr` wrapper that marks a filter as *optional* — droppable without affecting query correctness. It delegates every `PhysicalExpr` method to the inner expression, so it is behavior-neutral until a consumer explicitly checks for the marker. This is the foundation for adaptive filter scheduling: a scan can detect the wrapper and drop a performance-hint filter (e.g. a hash-join dynamic filter) when it is not cost-effective, knowing correctness is enforced elsewhere. Also adds proto serialization (`PhysicalOptionalFilterNode`) so physical plans containing the wrapper round-trip faithfully. No caller wraps anything yet — that arrives with the adaptive parquet scan later in the stack. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add an opt-in way to learn, per individual conjunct, how effective each predicate was during pruning — without running any extra pruning passes. - `PruningPredicate::try_new_tagged_conjuncts` builds a predicate from AND-conjuncts, each carrying a caller-supplied tag. - `PruningPredicate::prune_per_conjunct` returns the usual prune mask plus per-conjunct `PerConjunctPruneStats` (rows/containers seen vs. skipped) as a side effect of the pruning iteration that already runs. - `RowGroupAccessPlanFilter::prune_by_statistics_with_per_conjunct_stats` and `PagePruningAccessPlanFilter::prune_plan_with_per_conjunct_stats` surface those stats for row-group and page-index pruning respectively. The existing untagged `prune` / `prune_by_statistics` / `prune_plan_with_page_index` paths are preserved and unchanged; the new methods return empty stats on the untagged path. No in-tree caller uses the tagged path yet — the adaptive parquet scan consumes it later in the stack as a selectivity prior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduce `SelectivityTracker`, the cross-file cost model behind adaptive filter pushdown. It accumulates per-filter selectivity and throughput statistics and, given a confidence interval, decides whether each filter conjunct should be evaluated at row level, deferred to post-scan, or dropped entirely (for optional filters). This commit adds the module, its `total_compressed_bytes` helper in `row_filter`, a criterion benchmark, and ~45 unit tests covering the partitioning / promote / demote / drop logic. Nothing wires it into the parquet scan yet — that integration is the final commit in this stack. A couple of `pub(crate)` items (`count_skippable_bytes`, `skip_flag`, `is_filter_skipped`) are only exercised by that integration and carry a temporary `#[expect(dead_code)]` until then. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wire the SelectivityTracker into the parquet scan so filter placement adapts to measured selectivity and throughput. - `ParquetMorselizer` carries tagged predicate conjuncts and a shared `SelectivityTracker`; at file open the tracker partitions conjuncts into row-level vs post-scan buckets, seeded by the per-conjunct row-group / page-index pruning rates collected for free during pruning. - `AdaptiveParquetStream` drives the push decoder one row group at a time, re-partitioning at row-group boundaries and swapping the decoder strategy (row filter + projection mask) when placement changes. - Integrates with upstream's fully-matched run splitting: fully-matched runs get a no-filter decoder; needs-filter runs get the adaptive setup. - `HashJoinExec` wraps its pushed-down dynamic filter in `OptionalFilterPhysicalExpr` so the tracker may drop it when it is not cost-effective; join correctness is unaffected. - Adds the `filter_pushdown_min_bytes_per_sec`, `filter_collecting_byte_ratio_threshold` and `filter_confidence_z` config knobs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
With the wrapper type (#22234), per-conjunct pruning stats (#22235), and the cost model (#22236) in place, this PR wires them into the parquet scan so filter placement adapts to measured selectivity and throughput instead of a fixed pushdown decision.
What changes are included in this PR?
ParquetMorselizercarries tagged predicate conjuncts and a sharedSelectivityTracker; at file open the tracker partitions conjuncts into row-level vs post-scan buckets, seeded by the per-conjunct row-group / page-index pruning rates collected for free during pruning.AdaptiveParquetStreamdrives the push decoder one row group at a time, re-partitioning at row-group boundaries and swapping the decoder strategy (row filter + projection mask) when placement changes.HashJoinExecwraps its pushed-down dynamic filter inOptionalFilterPhysicalExprso the tracker may drop it when it is not cost-effective; join correctness is unaffected.filter_pushdown_min_bytes_per_sec,filter_collecting_byte_ratio_threshold,filter_confidence_z.Are these changes tested?
Yes — parquet filter-pushdown integration tests, physical-optimizer filter-pushdown tests, proto round-trip, and sqllogictest coverage.
Are there any user-facing changes?
New parquet read config knobs (documented in
configs.md). Behavior change to the parquet scan's filter placement. Note: this PR pins a customarrow-rsbranch for the push-decoderStrategySwapAPIs; landing upstream requires those APIs in a releasedarrow-rsfirst.Stacked PR — diff is cumulative against
main. Review the top commit "feat: adaptive filter pushdown for the parquet scan"; the commits below it are PRs #22234, #22235, #22236.Stack (review/merge in order):