feat(parquet): row-group and row-range sampling on ParquetSource#22024
Open
adriangb wants to merge 2 commits intoapache:mainfrom
Open
feat(parquet): row-group and row-range sampling on ParquetSource#22024adriangb wants to merge 2 commits intoapache:mainfrom
adriangb wants to merge 2 commits intoapache:mainfrom
Conversation
Adds two opt-in sampling primitives to parquet scans, both built on the existing `ParquetAccessPlan` infrastructure: * `ParquetSource::with_row_group_sampling(fraction)` — keep `fraction` of row groups in each scanned file. Selection is deferred until the opener has loaded the parquet footer (so we sample by real row-group index, not guess) and is deterministic per `(file_name, row_group_count, fraction)` via a seeded `SmallRng`. * `ParquetSource::with_row_fraction(fraction)` — within each kept row group, keep `fraction` of rows by translating to a `RowSelection` of K small contiguous windows (size controlled by `with_row_cluster_size`, default 32 768 rows). The parquet reader uses the page index to read only the data pages covering the selected rows, so this gives "page-level" IO savings without requiring per-column page alignment. Falls back gracefully (no IO win, still correct) when the page index is missing. The two layers compose: scanning with both `row_group_fraction=0.1` and `row_fraction=0.1` reads ~1% of the rows in ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group. Selection within a row group is deterministic-but-random per `(file_name, row_group_index, fraction, cluster_size)` — same inputs yield the same windows, so re-runs are repeatable. ## Why this lives on `ParquetSource` The natural entry-point for "I want a sample" is at config time, before any metadata IO. The actual *which* row groups / *which* rows selection still has to be deferred to the opener (after the footer is parsed) — that's why `ParquetSampling` carries fractions plus a cluster size, and the opener pulls them through to its lazy decision points. This is intentionally orthogonal to file-level sampling: `ParquetSource` doesn't own the file list (`FileScanConfig.file_groups` does), so a file-fraction setter here would have been a confusing no-op. Callers that want to drop files should rebuild the `FileScanConfig` directly. ## Use cases * `TABLESAMPLE` SQL syntax (any future implementation can lower to these primitives). * Ad-hoc data exploration / `EXPLAIN ANALYZE` against a sample. * Mini-query-style stats sampling (a layered helper can call these to bound the cost of computing approximate min/max/NDV/histograms for the optimizer — out of scope here, see the linked POC in the PR description). * `EXPLAIN ANALYZE`-driven debug runs against a representative slice. ## Tests 5 unit tests on `apply_row_group_sampling` (target count, determinism, file-name dependence, no-op at fraction=1.0, target floor of 1) plus 2 end-to-end tests that build a real parquet file in `InMemory` object store and confirm the row counts emitted are what the sampling implies. `cargo build --workspace`, `cargo fmt --all`, and `cargo clippy -p datafusion-datasource-parquet --all-targets -- -D warnings` are clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes responding to review on the parent commit: 1. Key sampling on a stable `file_index` instead of `file_name` (apache#22000 (comment)). Both `apply_row_group_sampling` and `apply_row_fraction_sampling` now take `file_index: usize` rather than `file_name: &str`. The parquet opener passes the execution `partition_index`. This makes sampling reproducible across environments (no dependency on the on-disk path), while still decorrelating files assigned to different partitions. 2. Extract the row-window selection into `build_row_window_selectors` and add fuzz coverage (apache#22000 (comment)). The previous inline arithmetic could produce overlapping windows when `target_rows` was close to `total_rows`: `window_size = ceil(target / n_windows)` could exceed `stride = total / n_windows`, so adjacent strides' windows would intersect. The extracted function caps `window_size` at `stride` (the construction that guarantees disjointness) and is covered by: * `row_window_selection_basic_layout` — hand-checked anchor case. * `row_window_selection_returns_none_on_invalid_input` — degenerate inputs return `None` cleanly. * `row_window_selection_full_target_no_overlap` — the previously buggy `target_rows == total_rows` case. * `row_window_selection_fuzz_invariants` — 5 000 randomized `(total_rows, target_rows, cluster_size, seed)` configurations, asserting full coverage, in-bounds positions, and no overlap. * `row_window_selection_fuzz_determinism` — 1 000 iterations verifying identical seeds produce identical layouts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
datafusion.execution.collect_statisticson wide tables #21624 (datafusion.execution.collect_statisticson wide tables).This PR extracts the first layer of #22000 — the opt-in
ParquetSourcesampling primitives — as a self-contained change. The TABLESAMPLE SQL surface,Samplelogical/physical nodes, andSamplePushdownrule are deliberately not included; they will land in follow-ups.Rationale for this change
DataFusion has the machinery for fine-grained parquet sampling (
ParquetAccessPlanwithSkip/Scan/Selection(RowSelection)) but no public way to ask for a sample without constructing the access plan by hand and stuffing it intoPartitionedFile.extensions. That works for one-off code but is awkward for ad-hoc data exploration, layered helpers that want to compute approximate stats over a bounded slice, andEXPLAIN ANALYZE-driven debug runs against a representative slice.This PR adds the lowest layer: opt-in builders on
ParquetSourcethat translate fractions into aParquetAccessPlanlazily inside the opener (after the footer is loaded, so we sample by real row-group index). It is additive and has no behavior change for existing scans. The SQL surface in #22000 is built on top of these primitives.What changes are included in this PR?
with_row_group_sampling(fraction):(file_index, row_group_count, fraction)— re-runs match. The opener passes the executionpartition_indexas the stablefile_index, so sampling is reproducible across environments without depending on object-store paths.max(1, ceil(N * fraction))).fraction >= 1.0.with_row_fraction(fraction):ceil(target / cluster_size).RowSelectionper kept row group; the parquet reader uses the page index to read only the data pages covering the selected rows. This gives "page-level" IO savings without requiring per-column page alignment (which doesn't exist in parquet).(file_index, row_group_index, fraction, cluster_size).build_row_window_selectorsfunction and fuzz-tested across thousands of configurations to guarantee no overlap, in-bounds positions, and full coverage.The two layers compose:
row_group_fraction = 0.1×row_fraction = 0.1reads ~1% of the rows from ~10% of the row groups, with windows spread out so the sample isn't clustered at one end of each row group.Internals
ParquetSamplingstruct re-exported at the crate root.ParquetMorselizer→PreparedParquetOpen.prune_row_groupsright aftercreate_initial_plan.randwith thesmall_rngfeature (already in workspaceCargo.toml).Differences vs. the original commit in #22000
Two pieces of review feedback on the parent PR are folded in here:
apply_*_samplingkeys on a stablefile_index: usizerather thanfile_name: &str, addressing feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource #22000 (comment). The opener passes the executionpartition_index. This removes the on-disk-path dependency from the seed inputs while still decorrelating files in different partitions.build_row_window_selectorsand fuzz-tested (feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource #22000 (comment)). Fuzzing surfaced an overlap bug attarget_rows ≈ total_rowswherewindow_size = ceil(target / n_windows)could exceedstride = total_rows / n_windows; the fix capswindow_sizeatstride.Are these changes tested?
12 tests in
datafusion-datasource-parquet:Row-group sampling (
sampling::tests):row_group_sampling_keeps_target_count—ceil(N * fraction)math.row_group_sampling_is_deterministic— same inputs → same selection.row_group_sampling_differs_per_file_index— differentfile_index→ different sample.row_group_sampling_no_op_when_fraction_is_one— fraction ≥ 1.0 keeps everything.row_group_sampling_target_at_least_one—fraction = 0.001over 100 row groups still keeps 1.row_group_sampling_no_op_when_unset—Noneis a no-op.Row-window selection (
sampling::tests):row_window_selection_basic_layout— hand-checked anchor case.row_window_selection_returns_none_on_invalid_input— degenerate inputs (zero row group, zero target, zero cluster) returnNone.row_window_selection_full_target_no_overlap— the previously-buggytarget_rows == total_rowscase.row_window_selection_fuzz_invariants— 5 000 randomized(total_rows, target_rows, cluster_size, seed)configurations, asserting full coverage, in-bounds positions, and no overlap.row_window_selection_fuzz_determinism— 1 000 iterations verifying identical seeds produce identical layouts.End-to-end (
opener::test):row_group_sampling_end_to_end— writes a 4-row-group parquet toInMemory, scans withfraction = 0.5, asserts exactly 6 rows out (2 row groups × 3 rows).row_fraction_end_to_end— writes a 100-row single-row-group parquet, scans withrow_fraction = 0.1andcluster_size = 4, asserts the result is in the expected range.cargo build,cargo fmt --all, andcargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warningsare clean.Are there any user-facing changes?
ParquetSource:with_row_group_sampling,with_row_fraction,with_row_cluster_size,sampling(), plus theParquetSamplingstruct.🤖 Generated with Claude Code