Skip to content

[MSE] Add broadcast_right join strategy hint for star-schema joins#18829

Closed
Akanksha-kedia wants to merge 2 commits into
apache:masterfrom
Akanksha-kedia:feature/broadcast-right-join
Closed

[MSE] Add broadcast_right join strategy hint for star-schema joins#18829
Akanksha-kedia wants to merge 2 commits into
apache:masterfrom
Akanksha-kedia:feature/broadcast-right-join

Conversation

@Akanksha-kedia

Copy link
Copy Markdown
Contributor

Summary

Adds a joinOptions(join_strategy='broadcast_right') query hint to the multi-stage query engine (MSE). When set, the entire right-side table is broadcast to every join worker while the left side is hash/random-distributed. This eliminates the right-side network shuffle for star-schema patterns where the right table is small enough to fit in memory but is not pre-replicated as a dimension table.

SELECT /*+ joinOptions(join_strategy='broadcast_right') */
    o.order_id, p.product_name
FROM orders o
JOIN products p ON o.product_id = p.id

Why this is useful

Strategy When to use
lookup Right table is a dimension table replicated to all workers
broadcast_right (new) Right table is small but not replicated — avoids right-side shuffle
default hash General case

Changes

File Change
PinotHintOptions Add BROADCAST_RIGHT_JOIN_STRATEGY constant + useBroadcastRightJoinStrategy() helper
PinotJoinExchangeNodeInsertRule (V1 planner) Detect hint → BROADCAST exchange on right, hash/random on left
TraitAssignment (V2 planner) Detect hint → assign BROADCAST_DISTRIBUTED trait to right input
RelToPlanNodeConverter (V1) Set JoinStrategy.BROADCAST_RIGHT on the JoinNode
PRelToPlanNodeConverter (V2) Same
JoinNode Add BROADCAST_RIGHT to JoinStrategy enum
plan.proto Add BROADCAST_RIGHT = 3 to JoinStrategy proto enum
PlanNodeSerializer / PlanNodeDeserializer Round-trip BROADCAST_RIGHT across broker↔server boundary
DefaultJoinOperatorFactory Execute BROADCAST_RIGHT as HashJoinOperator (distribution handled at plan time)
InStageStatsTreeBuilder Replace fragile assert LOOKUP with explicit switch — handles BROADCAST_RIGHT and ASOF cleanly

Why RIGHT/FULL OUTER joins are blocked

HashJoinOperator tracks unmatched right rows locally per worker. Broadcasting the right table to N workers means each worker independently emits null-extended rows for the same unmatched right row → N duplicate rows in the output. The planner rejects RIGHT/FULL OUTER with a clear error message at query compile time.

Tests

  • testBroadcastRightJoinHintEquiJoin — equi-join: verifies BROADCAST distribution on right, HASH on left, BROADCAST_RIGHT strategy on the JoinNode
  • testBroadcastRightJoinHintNonEquiJoin — non-equi-join: verifies BROADCAST on right, RANDOM on left

mvn test -pl pinot-query-planner -Dtest=QueryCompilationTest201 tests, 0 failures

Closes #14518

Akanksha-kedia and others added 2 commits June 22, 2026 19:29
Adds a new joinOptions(join_strategy='broadcast_right') hint that broadcasts
the entire right side of a join to every worker while hash/random-distributing
the left side. This eliminates the right-side network shuffle for star-schema
patterns where the right table is small enough to fit in memory but is not
pre-replicated as a dimension table.

Changes:
- PinotHintOptions: add BROADCAST_RIGHT_JOIN_STRATEGY constant and
  useBroadcastRightJoinStrategy() helper
- PinotJoinExchangeNodeInsertRule (V1): detect hint and force BROADCAST
  distribution on the right exchange
- TraitAssignment (V2 physical planner): detect hint and assign
  BROADCAST_DISTRIBUTED trait to the right input
- RelToPlanNodeConverter (V1): set JoinStrategy.BROADCAST_RIGHT on the JoinNode
- PRelToPlanNodeConverter (V2): same
- JoinNode: add BROADCAST_RIGHT to the JoinStrategy enum
- QueryCompilationTest: add two tests — equi-join and non-equi-join — verifying
  correct distribution types and JoinStrategy assignment

Closes apache#14518
…oin guard

Per xiangfu0's review:
1. Add BROADCAST_RIGHT = 3 to plan.proto JoinStrategy enum so plans survive
   serialization/deserialization across broker↔server boundary.
2. Add BROADCAST_RIGHT case to PlanNodeSerializer and PlanNodeDeserializer.
3. Add BROADCAST_RIGHT case to DefaultJoinOperatorFactory — at execution time
   it runs as HashJoinOperator (distribution was handled at planning time).
4. Fix InStageStatsTreeBuilder.visitJoin to handle BROADCAST_RIGHT and ASOF
   without a fragile assert (LOOKUP→LOOKUP_JOIN, everything else→HASH_JOIN).
5. Add guard in PinotJoinExchangeNodeInsertRule and TraitAssignment.assignJoin
   that rejects RIGHT/FULL OUTER joins with broadcast_right hint: broadcasting
   the right table means each worker independently emits unmatched right rows,
   which would produce duplicate null-extended rows.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Akanksha-kedia

Copy link
Copy Markdown
Contributor Author

CC @Jackie-Jiang @xiangfu0 — this adds a broadcast_right join strategy hint to MSE (follow-up to earlier review feedback on the wire format). Would appreciate a review when you get a chance!

@codecov-commenter

codecov-commenter commented Jun 22, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 36.36364% with 21 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.77%. Comparing base (5526d5b) to head (c108a76).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pinot/calcite/rel/traits/TraitAssignment.java 0.00% 12 Missing and 1 partial ⚠️
...ite/rel/rules/PinotJoinExchangeNodeInsertRule.java 70.00% 0 Missing and 3 partials ⚠️
...y/planner/physical/v2/PRelToPlanNodeConverter.java 0.00% 1 Missing and 1 partial ⚠️
...inot/query/planner/serde/PlanNodeDeserializer.java 0.00% 1 Missing ⚠️
.../pinot/query/planner/serde/PlanNodeSerializer.java 0.00% 1 Missing ⚠️
...e/pinot/query/runtime/InStageStatsTreeBuilder.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18829      +/-   ##
============================================
- Coverage     64.82%   64.77%   -0.05%     
  Complexity     1319     1319              
============================================
  Files          3388     3392       +4     
  Lines        210228   210978     +750     
  Branches      32948    33128     +180     
============================================
+ Hits         136282   136667     +385     
- Misses        62978    63296     +318     
- Partials      10968    11015      +47     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.77% <36.36%> (-0.05%) ⬇️
temurin 64.77% <36.36%> (-0.05%) ⬇️
unittests 64.77% <36.36%> (-0.05%) ⬇️
unittests1 56.97% <36.36%> (-0.04%) ⬇️
unittests2 37.19% <6.06%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Jackie-Jiang

Copy link
Copy Markdown
Contributor

Duplicate of #18514, which is not needed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add more JOIN strategies

3 participants