Skip to content

[branch-0.9] Cherry pick feat(datafusion): declare Hash partitioning for pure bucket-transform specs#19

Merged
toutane merged 5 commits into
branch-0.9from
branch-0.9-cherry-pick-10
May 19, 2026
Merged

[branch-0.9] Cherry pick feat(datafusion): declare Hash partitioning for pure bucket-transform specs#19
toutane merged 5 commits into
branch-0.9from
branch-0.9-cherry-pick-10

Conversation

@toutane
Copy link
Copy Markdown

@toutane toutane commented May 5, 2026

Part of QECO-1260

Cherry picks from d4f1170 to 09ae4df from feat(datafusion): declare Hash partitioning for pure bucket-transform specs

Which issue does this PR close?

  • Closes #.

What changes are included in this PR?

Expose a single-column Transform::Bucket(_) partition spec to DataFusion as Partitioning::Hash([source_col], n_partitions) instead of UnknownPartitioning. This lets the planner elide the redundant RepartitionExec that GROUP BY / joins on the bucket source column would otherwise insert.

Specs that are not a single bucket field, e.g. spec evolution, empty specs, multi-field specs (including multi-column pure-bucket), mixed transforms, or a bucket source missing from the projection keep falling back to UnknownPartitioning.

Correctness

DataFusion matches Partitioning::Hash against Distribution::HashPartitioned by expression equality only; it does not check the hash function. The real contract is "every row sharing a key value lands in the same DataFusion partition".

  • Writer side: Iceberg guarantees a file's partition slot equals bucket_N(V) for every row of source value V, and that value is identical for all rows in the file.
  • Reader side: tasks are distributed by bucket_index % n_partitions, so all files sharing a bucket index go to the same DataFusion partition.

Composing the two: every row with value V ends up in a single DataFusion partition.

Key changes

  • bucketing.rs: compute_bucket_cols detects the single-column pure-bucket case ; PartitionKeys::{Identity, Bucket} drives both task placement (bucket_tasks) and the Hash declaration ; bucket_index reads the Int32 slot and applies % n_partitions.
  • mod.rs: TableProvider::scan calls compute_partition_keys and uses its column_exprs() for the Hash. Identity / unpartitioned paths are unchanged.

Are these changes tested?

Yes. Unit tests in table/mod.rs cover both the positive case (single-column bucket outputs Hash) and every rejection path (multi-column bucket, mixed transforms, missing source, null slot, identity-first dispatch, distribution under varying target_partitions).

@toutane toutane force-pushed the branch-0.9-cherry-pick-9 branch from 6a07d9d to 92f0f2a Compare May 11, 2026 11:15
Base automatically changed from branch-0.9-cherry-pick-9 to branch-0.9 May 11, 2026 11:16
… specs

Extend scan-time partition detection in IcebergTableProvider so that a
default partition spec whose every field is a `Transform::Bucket(_)` is
exposed to DataFusion as `Partitioning::Hash([source_cols], n)`. This lets
the planner skip a `RepartitionExec` for GROUP BY / joins on the bucket
source column, mirroring the existing identity-transform path.

Correctness: DataFusion's `EquivalenceProperties::is_partition_satisfied`
compares `Partitioning::Hash` against `Distribution::HashPartitioned` by
expression equality only, not by the underlying hash function. Iceberg
`bucket[N]` already co-locates same-source-value rows at the file level
(same value -> same bucket index -> same files); the task distributor
sends every unique bucket index to a single DataFusion partition, so
co-location is preserved at the row level.

- bucketing.rs: add `BucketCol`, `compute_bucket_cols` (pure-bucket only,
  rejects spec evolution / mixed transforms / missing source), and a
  `PartitionKeys::{Identity, Bucket}` wrapper used by `bucket_tasks`.
- `bucket_tasks` now hashes the `i32` bucket-index slot (always Int32 per
  spec) for the bucket variant, keeping the identity branch unchanged.
- `compute_partition_keys` tries identity first, so mixed identity+bucket
  specs keep the current identity-only Hash behaviour.
- table/mod.rs::scan(): use `compute_partition_keys` + `column_exprs()`
  instead of inlining the identity-only branch.
- Five new tests: pure-bucket Hash declaration, projection excluding the
  source, null partition slot fallback, mixed bucket+truncate fallback,
  and identity+bucket regression lock.

(cherry picked from commit d4f1170)
@toutane toutane force-pushed the branch-0.9-cherry-pick-10 branch from 3cb825c to 25af0c7 Compare May 12, 2026 09:33
…tribution

`bucket_tasks` used to hash the Int32 bucket-index slot through
REPARTITION_RANDOM_STATE before taking `% n_partitions`. The intent was
to keep the `Partitioning::Hash` annotation aligned with DataFusion's
hash convention, but composing ahash with Iceberg's `bucket[N]` already
produced a function distinct from `ahash(source_value)`, so the
annotation was never literally honest in either mode. DataFusion's
`Partitioning::satisfaction` (datafusion-physical-expr 53.1
partitioning.rs:219-272) only checks expression equality and explicitly
documents that the hash function uniformity is *assumed*, not verified.
The downstream operators that consume the annotation (AggregateExec
SinglePartitioned, HashJoinExec Partitioned) require row co-location by
key, which Iceberg's `bucket[N]` already guarantees by being
deterministic on the source value.

The rehash therefore added a uniform-spread layer without any
correctness benefit, but caused birthday-paradox collisions in the
`N_buckets ≈ n_partitions` regime: with 8 buckets re-hashed modulo 8,
~75% of runs produced at least one empty scan partition. Replacing the
rehash with a deterministic positional linearisation
`((idx_1 * N_2 * ... * N_k) + ... + idx_k) % n_partitions` keeps
co-location, makes the single-column case the natural Iceberg
distribution (`idx % n_partitions`), and bounds the multi-column skew
to ±1 task per partition.

- bucketing.rs: BucketCol carries `bucket_n`; new `bucket_linear_index`
  replaces `bucket_hash`; `bucket_tasks` dispatches Identity (unchanged
  rehash, which is strictly aligned with DataFusion) vs Bucket (linear
  modulo). Doc comments on `compute_partition_keys` and `bucket_tasks`
  rewritten to explain the two regimes.
- Three new unit tests: deterministic identity mapping when N == n,
  modulo grouping when N > n, and multi-column linearisation.

(cherry picked from commit 9f9a214)
@toutane toutane marked this pull request as ready for review May 13, 2026 13:59
toutane added 2 commits May 13, 2026 17:41
compute_bucket_cols now retains bucket spec fields whose source column
survives the output projection instead of dropping the whole spec to
UnknownPartitioning. The positional linearisation in bucket_linear_index
already iterates only over the retained cols via spec_field_idx, so file-
level co-location on a bucket-dimension subset still implies row-level
co-location on that subset. Returns None only when no bucket source
survives the projection.

(cherry picked from commit b48f96d)
…umn specs

Multi-column pure-bucket specs no longer declare Partitioning::Hash;
they fall back to UnknownPartitioning. Drops the positional
linearisation of bucket-index tuples and the associated bucket_n field.
Tests covering the multi-column path are replaced with a single
negative test asserting the fallback.

(cherry picked from commit 1cf7d02)
Copy link
Copy Markdown
Member

@notfilippo notfilippo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logical change looks very very good.

I've spent a bit too much time parsing the tests, maybe there is the opportunity to reorganize in following PRs but it's alright!

@toutane toutane merged commit 4b23b0b into branch-0.9 May 19, 2026
2 checks passed
@toutane toutane deleted the branch-0.9-cherry-pick-10 branch May 19, 2026 14:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants