perf(core): prune partition subtrees during descent and parallelize listing#597
Open
sushiljacksparrow wants to merge 2 commits intoapache:mainfrom
Open
perf(core): prune partition subtrees during descent and parallelize listing#597sushiljacksparrow wants to merge 2 commits intoapache:mainfrom
sushiljacksparrow wants to merge 2 commits intoapache:mainfrom
Conversation
…isting `FileLister::list_relevant_partition_paths` previously walked every partition subtree to leaf depth, then filtered the results — and the recursive descent in `get_leaf_dirs` ran sequentially, paying full LIST latency at every level. On remote object stores with selective partition filters this is the dominant cost. Two changes per the re-scoped scope in issue apache#205: 1. `get_leaf_dirs` (`crates/core/src/storage/mod.rs`) now takes an optional per-child predicate and a parallelism bound. Subtrees the predicate rejects are skipped without listing. Sibling subdirs at each level are walked concurrently via `buffer_unordered`. 2. `PartitionPruner::should_include_prefix` evaluates filters whose field is already present in a partial partition path. Filters for deeper fields are deferred. The single `_hoodie_partition_path` field used by timestamp-based key generators conservatively admits at intermediate levels — `should_include` still runs at the leaf. `FileLister::list_relevant_partition_paths` plumbs both: the predicate filters out lake-format metadata dirs (`.hoodie`, `_delta_log`, `metadata`) and calls `should_include_prefix` for partition pruning. Parallelism uses the existing `hoodie.plan.listing.parallelism` config (default 10). Scope is the no-MDT listing path only; the metadata-table FILES path is unchanged. Closes apache#205. Tests: - `get_leaf_dirs_descends_in_parallel_and_returns_all_leaves` - `get_leaf_dirs_predicate_prunes_subtree` - `test_partition_pruner_should_include_prefix_*` (6 cases: empty prefix, deferred filter, evaluable filter prunes, leaf parity with `should_include`, single `_hoodie_partition_path` admits, and too-many-parts fails open) - `list_partition_paths_prunes_subtrees_when_top_level_filter_rejects` - `list_partition_paths_prunes_at_deeper_level_when_only_inner_field_filtered`
`buffer_unordered(parallelism)` only bounds fan-out per recursion level. A recursive descent over a D-level partition tree could therefore reach up to P^D concurrent `list_dirs` calls — for the default `parallelism=10` and a 3-level hive-style table, that is 1000 simultaneous LIST requests; for a date/hour/minute/second layout, 10000. Replace the per-level cap with a `tokio::sync::Semaphore` shared across the whole descent. Each recursive call acquires a permit before issuing `list_dirs` and releases it before fanning out, so total in-flight requests stay bounded by `parallelism` regardless of depth. `buffer_unordered` is kept on the recursive stream — it now bounds memory of pending child futures, while the semaphore bounds actual I/O. `hoodie.plan.listing.parallelism` becomes a meaningful global knob: the default 10 is conservative for shallow trees but can be raised for wider or deeper layouts without risking exponential concurrency growth.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Change Logs
Implements the re-scoped scope from #205 (xushiyan's 2026-04-28 comment): parallelize
get_leaf_dirsand add an optional per-level predicate so partition pruning short-circuits subtrees during descent.get_leaf_dirs(crates/core/src/storage/mod.rs) gains an optional predicate and a global concurrency cap. Subtrees the predicate rejects are skipped without listing; sibling subdirs at each level are walked concurrently. Total in-flightlist_dirscalls are bounded globally byparallelismvia atokio::sync::Semaphoreshared across the whole descent — recursion does not multiply concurrency.buffer_unorderedis kept on the recursive stream to bound memory of pending child futures, while the semaphore bounds the actual I/O. Parallelism reads fromhoodie.plan.listing.parallelism(default 10).PartitionPruner::should_include_prefixevaluates filters whose field is already present in a partial path. Filters for deeper fields are deferred. The single_hoodie_partition_pathfield used by timestamp-based key generators conservatively admits at intermediate levels —should_includeruns at the leaf as before.FileLister::list_relevant_partition_pathsplumbs both: the predicate filters lake-format metadata dirs (.hoodie,_delta_log,metadata) and callsshould_include_prefixfor partition pruning.Scope is the no-MDT listing path only. The metadata-table FILES path is unchanged.
Note on the concurrency cap
A first cut of this PR used
buffer_unordered(parallelism)per level, which only bounds fan-out at each recursion step. A D-level partition tree could therefore reach up to P^D concurrentlist_dirscalls — for the defaultparallelism=10on a 3-level hive-style table, 1000 simultaneous LIST requests; on a 4-level (e.g.date/hour/minute/second) layout, 10000. The semaphore fix makesparallelisma true global cap, at the cost of slowing down un-pruned full-table descents at the default — but the config is now a meaningful knob (raising it scales linearly without exponential blow-up).Closes #205.
Impact
Performance. On remote object stores with selective partition filters, every
LISTfor a pruned subtree is eliminated. Even un-filtered listings benefit from parallel descent under a predictable concurrency budget.No behavior change to the listed set on either path.
Risk level
low
Tested
hudi-core: 11 new (get_leaf_dirsparallel + predicate,should_include_prefixcorrectness, end-to-end pruning viaFileLister) plus the existing 675 lib tests + 89 integration tests all pass.cargo clippy --workspace --all-targets -- -D warningsandcargo fmt --checkclean.maturin develop+pytest python/tests/test_table_read.py: 10/10 pass — the new code path is exercised through PyO3.level1=*/level2=*/level3=*tree of 10×10×10 = 1000 leaves on a local filesystem (median of 3 runs, release build, post semaphore fix):par=4, no predicatepar=10, no predicatepar=10, accept-all predicatepar=10, predicate keeps 10% subtreepar=10, predicate keeps 1% subtreeLocal FS saturates around
par=4–10. Remote object stores with per-LISTRTT in the tens of ms should see parallel-descent speedup scale roughly linearly withparallelismon un-pruned paths, up to the underlying request-rate ceiling.us-west-2, queried from outside the VPC over the internet, post-fix):Selective queries are unaffected by the parallelism setting (work is dominated by the small number of leaves the predicate admits). Un-filtered or weakly-filtered descents scale with the parallelism knob, which is now safe to raise because it is a global bound.
Documentation Update
None required.