[branch-0.9] Cherry pick feat!(datafusion): enable parallel file scanning with eager task bucketing (#2298)#18
Draft
toutane wants to merge 16 commits intobranch-0.9-cherry-pick-8from
Draft
Conversation
…itionedScan for parallel file scanning (cherry picked from commit b9819b4)
(cherry picked from commit c362b0f)
Co-authored-by: Tim Saucer <timsaucer@gmail.com> (cherry picked from commit ec1bd37)
…:with_new_children (cherry picked from commit 5e53cb8)
(cherry picked from commit cc6a833)
…identity-hash partitioning Replace the one-task-per-partition layout in IcebergPartitionedScan with N buckets sized from the session's target_partitions. When the table's default spec exposes identity-transform columns and every task carries the corresponding partition values, tasks are bucketed by hashing those values via DataFusion's REPARTITION_RANDOM_STATE so the resulting partitioning matches what RepartitionExec would produce. The scan then declares Partitioning::Hash(exprs, N), letting downstream joins and aggregates skip an extra repartition. Hash declaration is conservative and only stands when: - the table has a single partition spec (no spec evolution) - every identity source column is present in the output projection - every column type is supported by literal_to_array - every task supplied a full identity key Any miss collapses to UnknownPartitioning(N) while bucketing falls back to a hash of data_file_path so partitions still distribute. IcebergPartitionedScan now stores Vec<Vec<FileScanTask>> and execute(i) streams every task in buckets[i] through to_arrow_with_tasks. Bucket count is capped at min(target_partitions, num_files), and an empty table still yields zero partitions to avoid out-of-bounds execute calls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 9edd54a)
`IcebergPartitionedTableProvider::supports_filters_pushdown` previously
returned `Inexact` for every filter, forcing DataFusion to re-evaluate
even filters that Iceberg's manifest-level pruning has fully resolved.
Per-filter the provider now returns `Exact` when both:
- the iceberg conversion can represent the filter, so manifest pruning
will remove every row that fails it, and
- every leaf is a comparison or null check against an identity-
partition column with a literal RHS.
Identity-partitioned column names are cached at `try_new` from the
table's default spec; tables with spec evolution (>1 historical specs)
fall back to an empty set so all filters stay `Inexact`. Supported
shapes: =, !=, <, <=, >, >=, IS NULL, IS NOT NULL, IN/NOT IN, plus
AND/OR/NOT compositions of the above. Every other shape is `Inexact`.
`convert_filter_to_predicate` is promoted to `pub(crate)` so the
provider can probe convertibility per filter without rebuilding the
whole AND-collapsed predicate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit add5e35)
…column intersection Previously identity_partition_col_names returned an empty set whenever the table had more than one historical partition spec, forcing every filter back to Inexact under spec evolution. This was overly conservative: Iceberg evaluates partition predicates against each manifest's own spec, so a column that is identity-partitioned in every spec is fully prunable across the entire table regardless of which spec a given file was written under. Replace the multi-spec gate with an intersection across every spec's identity-source set. A column survives only if every spec includes it with Transform::Identity; columns that appear with non-identity transforms in some spec, or are missing from a spec entirely, are dropped. The result remains an honest set of columns for which Exact pushdown is provably safe across all surviving files. Hash bucketing (compute_identity_cols) keeps its single-spec gate because slot-order alignment with the table's default spec depends on each task carrying its own spec id, which the native plan flow does not yet do. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit e8771e4)
…ergTableProvider IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced to enable parallel file scanning by bucketing FileScanTasks across DataFusion partitions. However, maintaining two TableProvider implementations is redundant: the new provider is strictly more capable, and its degenerate case (target_partitions=1) reproduces the old single-partition behavior exactly. This commit folds the partitioned provider into IcebergTableProvider and the partitioned scan into IcebergTableScan, eliminating the parallel types. Changes: - IcebergTableProvider::scan() now eagerly calls plan_files() and distributes FileScanTasks into buckets using the same identity-hash strategy (REPARTITION_RANDOM_STATE + create_hashes) that was in IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations that align with DataFusion's RepartitionExec. - IcebergTableScan gains a new_with_tasks() constructor that accepts pre-planned buckets and a caller-supplied Partitioning. execute(i) streams the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding the TableScan per-partition to avoid serializing PlanContext Arc-shared caches across workers. - The original new() constructor and the to_arrow() lazy path are kept unchanged for IcebergStaticTableProvider, which does not pre-plan tasks. - Limit slicing (try_filter_map truncation) from the old IcebergTableScan is preserved in both execution paths. - Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks, identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are moved verbatim into a new private table/bucketing.rs module. - Unit tests from partitioned.rs are migrated to table/mod.rs and updated to use IcebergTableProvider and IcebergTableScan. - integration_datafusion_test.rs: fix test_provider_plan_stream_schema to call execute(0) instead of execute(1). The old call worked only because the previous IcebergTableScan silently ignored the partition index. (cherry picked from commit d2e5e04) (cherry picked from commit 23f3d8f)
Review pass over the partitioned-scan branch ahead of upstream contribution. - Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` — `from` better signals that the tasks are the input source rather than a builder-style modifier. - Restructure the doc with a `# Correctness` section that calls out the projection/filter contract while clarifying that reader-side configuration (concurrency, batch size, row-group filtering, row selection) is taken from `self`. - Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were `pub(crate)`) so external users can construct the node directly, matching the public visibility of the struct itself. - Drop the `convert_filters_to_predicate` re-export from `physical_plan/mod.rs`: it was unused outside the module. - Extract a private `new_inner` constructor on `IcebergTableScan` so `new` and `new_with_tasks` share a single source of truth for the `PlanProperties` / projection / predicate setup. - Split `IcebergTableScan::execute` into a linear pipeline backed by three helpers: `build_table_scan` (synchronous scan-builder plumbing), `build_record_batch_stream` (async stream construction for the lazy/eager modes), and `apply_limit`. - Trim the `IcebergTableScan` struct doc and field comments to match the rest of the file's style; drop the verbose `to_arrow_with_tasks` rationale (the `# Correctness` doc carries the load-bearing info). - Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file count alone is enough for `EXPLAIN`) and factor the common prefix. - Trim several narrating comments in `table/mod.rs` and the module doc that duplicated information already evident from the code. - Add `test_identity_partitioned_declares_hash`: verifies the happy path where an identity-partitioned table with the partition column in the projection produces `Partitioning::Hash` referencing that column. This was the main missing coverage for the bucketing logic. - Add `test_projection_without_partition_col_falls_back_to_unknown`: verifies the `compute_identity_cols → None` branch when the projection omits the partition source column. - Add helpers (`make_partitioned_catalog_and_table_for_bucketing`, `append_partitioned_fake_data_files`) to build identity-partitioned fixtures without writing real Parquet files. (cherry picked from commit b1f2d66) (cherry picked from commit 616dcdc)
IcebergTableProvider::scan now plans files eagerly and buckets them across DataFusion partitions before returning the ExecutionPlan. As a result, IcebergTableScan's DisplayAs output always includes `buckets:[N] file_count:[M]` - even for unpartitioned tables where N = 1. Update the four .slt files whose EXPLAIN snapshots were missing this suffix, and fix the like_predicate_pushdown snapshots that also had a stale input_partitions count on RepartitionExec (the table now has multiple files across multiple buckets). (cherry picked from commit 6ae4a71) (cherry picked from commit 581dde7)
(cherry picked from commit 7ad9dcc)
(cherry picked from commit 7ff1f6d)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Merge after [branch-0.9] Cherry pick feat!(runtime): Support custom Runtime in Catalog (#2308)[branch-0.9] Cherry pick feat!(runtime): Support custom Runtime in Catalog (#2308)
Cherry pick commits from b9819b4 to 7ff1f6d from PR feat!(datafusion): enable parallel file scanning with eager task bucketingfeat!(datafusion): enable parallel file scanning with eager task bucketing