Skip to content

feat: Pinot-style colocated-join optimizer for hash-bucketed tables#1676

Draft
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:feat/colocated-join-optimizer
Draft

feat: Pinot-style colocated-join optimizer for hash-bucketed tables#1676
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:feat/colocated-join-optimizer

Conversation

@wirybeaver
Copy link
Copy Markdown

@wirybeaver wirybeaver commented May 11, 2026

Closes #1677. Follow-up tracked in #1679.

Summary

Implements the colocated-join optimizer described in #1677: a small metadata trait in ballista-core lets any TableProvider declare on-disk hash bucketing, and two new PhysicalOptimizerRules slot into the existing AdaptivePlanner rule chain just before DistributedExchangeRule. Together they cover three cases — colocated joins (skip the shuffle), divisor sub-partitioning (16 vs 8 buckets, locally coalesce instead of shuffle), and small-side broadcast in the AQE code path.

DataFusion physical planner
  → DataFusion optimizer rules (incl. EnforceDistribution, JoinSelection)
  → ColocatedJoinRule           ← new: strip redundant RepartitionExec
                                       (also handles divisor sub-partitioning)
  → BroadcastSmallSideRule      ← new: replace partitioned join with CollectLeft
  → DistributedExchangeRule     ← existing: maps remaining repartitions to ExchangeExec
  → DefaultDistributedPlanner   ← existing: cuts stages at ExchangeExec

Metadata layer (ballista-core): BallistaPartitionMetadata is an optional contract any TableProvider can implement to declare on-disk hash bucketing (keys + hash function + bucket count). PartitionedTableProvider is a wrapper that attaches a HashDistribution to any existing provider without modifying it, and HashDistributedScanExec re-advertises the wrapped scan's output_partitioning() as Partitioning::Hash so optimizer rules can read it. BucketSubPartitionExec chains input partitions per output partition via stream::iter + flatten — pure local concat, no network — for the divisor case (relies on the identity (hash(k) % 16) % 8 == hash(k) % 8).

Optimizer rules (ballista-scheduler): ColocatedJoinRule walks each HashJoinExec and, when both inputs declare matching keys + hash function and either equal or divisor-related bucket counts, strips the RepartitionExec above each input or wraps the larger side in BucketSubPartitionExec. BroadcastSmallSideRule promotes a Partitioned HashJoinExec to CollectLeft when one side's total_byte_size is below the configured threshold, using HashJoinExec::swap_inputs when the small side is on the right. It is restricted to JoinType::Inner pending #1055. default_optimizers now takes the SessionConfig so the broadcast threshold flows through, and ColocatedJoinRule runs before BroadcastSmallSideRule so colocation wins when both could apply.

Relationship to PR #1647: that PR added small-side broadcast lowering to the non-AQE DefaultDistributedPlanner::plan_query_stages_internal, and the implementation explicitly noted (TODO at state/aqe/mod.rs) that the same lowering does not fire in the AQE path. BroadcastSmallSideRule is the AQE-side counterpart that resolves that TODO; this PR removes the resolved comment. The two rules cover disjoint code paths (AQE is opt-in via ballista.planner.adaptive.enabled, default false), share the same BALLISTA_BROADCAST_JOIN_THRESHOLD_BYTES config key, and behave identically.

Both features are opt-in. Tables only get colocation behavior when the user wraps them with PartitionedTableProvider. BroadcastSmallSideRule::from_session_config returns a no-op rule (threshold = 0) when the BallistaConfig extension is not registered on the SessionConfig, so existing tests using plain SessionConfig::new() are unaffected.

Test plan

  • cargo test -p ballista-core — 92 passed (trait, wrapper, scan adapter, BucketSubPartitionExec correctness + rejection of non-divisor inputs)
  • cargo test -p ballista-scheduler — 108 passed (5 ColocatedJoinRule cases, 4 divisor cases, 5 BroadcastSmallSideRule cases, 3 end-to-end plan-snapshot tests, plus all pre-existing)
  • cargo check -p ballista-scheduler --tests — clean
  • End-to-end plan snapshots cover three scenarios: matching bucket counts → no ExchangeExec; 8/4 divisor → BucketSubPartitionExec(out=4, factor=2), no shuffle; plain MemTableExchangeExec retained, optimizer is silent
  • TPC-H Q5 against bucketed customer/orders/lineitem — environment-dependent, planned as a follow-up benchmark validation

Caveats / risks

@wirybeaver wirybeaver force-pushed the feat/colocated-join-optimizer branch 3 times, most recently from 7aa326d to 590a8d8 Compare May 11, 2026 02:57
Brings three Pinot V2 Physical Optimizer ideas to Ballista so queries
against pre-bucketed tables can avoid the network shuffle on every join.

Architecture (all changes additive; no DataFusion fork required):

  DataFusion physical planner
    → DataFusion optimizer rules (incl. EnforceDistribution, JoinSelection)
    → ColocatedJoinRule           ← new: strip redundant RepartitionExec
                                          (also handles divisor sub-partitioning)
    → BroadcastSmallSideRule      ← new: replace partitioned join with CollectLeft
    → DistributedExchangeRule     ← existing: maps remaining repartitions to ExchangeExec
    → DefaultDistributedPlanner   ← existing: cuts stages at ExchangeExec

Metadata layer (ballista-core):
- BallistaPartitionMetadata trait — optional contract any TableProvider
  can implement to declare on-disk hash bucketing (keys + hash function +
  bucket count).
- HashDistribution / HashFn — the declared layout.
- PartitionedTableProvider wrapper — attaches a HashDistribution to any
  existing TableProvider without modifying it.
- HashDistributedScanExec adapter — re-advertises the wrapped scan's
  output_partitioning() as Partitioning::Hash so optimizer rules can
  read it.
- BucketSubPartitionExec — chains input partitions per output partition
  via stream::iter + flatten (pure local concat, no network) for the
  divisor case.

Optimizer rules (ballista-scheduler):
- ColocatedJoinRule — for each HashJoinExec, when both inputs declare
  matching keys + hash function and either equal or divisor-related
  bucket counts, strips the RepartitionExec above each input or wraps
  the larger side in BucketSubPartitionExec. Divisor case relies on
  (hash(k) % 16) % 8 == hash(k) % 8.
- BroadcastSmallSideRule — promotes Partitioned HashJoinExec to
  CollectLeft when one side's total_byte_size is below the configured
  threshold; uses HashJoinExec::swap_inputs when the small side is on
  the right. Restricted to JoinType::Inner pending issue apache#1055.
- default_optimizers now takes the SessionConfig so the broadcast
  threshold flows through. ColocatedJoinRule runs before
  BroadcastSmallSideRule so colocation wins when both could apply.

Both features are opt-in. Tables only get colocation behavior if the
user wraps them with PartitionedTableProvider; broadcast only fires when
ballista.optimizer.broadcast_threshold_bytes > 0 (default 0). All
existing snapshot tests are unchanged.

Tests:
- 86 ballista-core tests pass (trait, wrapper, scan adapter,
  BucketSubPartitionExec correctness + non-divisor rejection).
- 84 ballista-scheduler tests pass: 5 ColocatedJoinRule, 4 divisor,
  5 BroadcastSmallSideRule, 3 end-to-end plan-snapshot tests
  (matching → no exchange; 8/4 divisor → BucketSubPartitionExec; plain
  MemTable → ExchangeExec retained), plus all pre-existing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@metegenez
Copy link
Copy Markdown
Contributor

hi @wirybeaver , I am working on coalesce partition into target binary sizes, your work looks good, just can you verify against sf10 or sf100, let's check if they compile well and there is no big regression? It d help reviewers to assess.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pinot-style colocated-join optimizer for hash-bucketed tables

2 participants