From 15629893c895dd816381a10d6ea520ed8ad695f6 Mon Sep 17 00:00:00 2001 From: Akanksha Kedia Date: Thu, 14 May 2026 19:52:56 +0530 Subject: [PATCH 1/2] Add broadcast_right join strategy hint to multi-stage query engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new joinOptions(join_strategy='broadcast_right') hint that broadcasts the entire right side of a join to every worker while hash/random-distributing the left side. This eliminates the right-side network shuffle for star-schema patterns where the right table is small enough to fit in memory but is not pre-replicated as a dimension table. Changes: - PinotHintOptions: add BROADCAST_RIGHT_JOIN_STRATEGY constant and useBroadcastRightJoinStrategy() helper - PinotJoinExchangeNodeInsertRule (V1): detect hint and force BROADCAST distribution on the right exchange - TraitAssignment (V2 physical planner): detect hint and assign BROADCAST_DISTRIBUTED trait to the right input - RelToPlanNodeConverter (V1): set JoinStrategy.BROADCAST_RIGHT on the JoinNode - PRelToPlanNodeConverter (V2): same - JoinNode: add BROADCAST_RIGHT to the JoinStrategy enum - QueryCompilationTest: add two tests — equi-join and non-equi-join — verifying correct distribution types and JoinStrategy assignment Closes #14518 --- .../calcite/rel/hint/PinotHintOptions.java | 7 +++ .../PinotJoinExchangeNodeInsertRule.java | 13 ++++++ .../calcite/rel/traits/TraitAssignment.java | 13 ++++++ .../logical/RelToPlanNodeConverter.java | 2 + .../physical/v2/PRelToPlanNodeConverter.java | 2 + .../query/planner/plannode/JoinNode.java | 2 +- .../pinot/query/QueryCompilationTest.java | 46 +++++++++++++++++++ 7 files changed, 84 insertions(+), 1 deletion(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java index 05258e3b0007..815117e72edc 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java @@ -99,6 +99,9 @@ public static class JoinHintOptions { public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast"; // "lookup" can be used when the right table is a dimension table replicated to all workers public static final String LOOKUP_JOIN_STRATEGY = "lookup"; + // "broadcast_right" broadcasts the right side to all join workers; left side is hash/random-distributed. + // Use when the right table is small enough to fit in memory but is not pre-replicated. + public static final String BROADCAST_RIGHT_JOIN_STRATEGY = "broadcast_right"; public static final String LEFT_DISTRIBUTION_TYPE = "left_distribution_type"; public static final String RIGHT_DISTRIBUTION_TYPE = "right_distribution_type"; @@ -140,6 +143,10 @@ public static boolean useLookupJoinStrategy(Join join) { return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join)); } + public static boolean useBroadcastRightJoinStrategy(Join join) { + return BROADCAST_RIGHT_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join)); + } + @Nullable public static DistributionType getLeftDistributionType(Map joinHintOptions) { return DistributionType.fromHint(joinHintOptions.get(LEFT_DISTRIBUTION_TYPE)); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index 3db12f095827..592b83a4bde3 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -87,6 +87,19 @@ public void onMatch(RelOptRuleCall call) { // worker, avoiding hotspots. newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(Collections.emptyList())); newRight = PinotLogicalExchange.create(right, RelDistributions.hash(Collections.emptyList())); + } else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) { + // Broadcast-right join: broadcast the entire right side to every worker; left side is hash/random-distributed. + // This eliminates the right-side network shuffle for star-schema patterns where the right table is small + // enough to fit in memory but is not pre-replicated as a dimension table. + if (leftDistributionType == null) { + leftDistributionType = joinInfo.leftKeys.isEmpty() + ? PinotHintOptions.DistributionType.RANDOM : PinotHintOptions.DistributionType.HASH; + } + if (rightDistributionType == null) { + rightDistributionType = PinotHintOptions.DistributionType.BROADCAST; + } + newLeft = createExchangeForHashJoin(leftDistributionType, joinInfo.leftKeys, left, null); + newRight = createExchangeForHashJoin(rightDistributionType, joinInfo.rightKeys, right, null); } else { // Hash join // Force pre-partitioned exchange when colocated join hint is provided diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java index eddcf7dd485d..ede509493966 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java @@ -137,6 +137,19 @@ RelNode assignJoin(Join join) { if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { return join; } + // Case-1b: Broadcast-right join — broadcast the right input to all workers; left is hash/random-distributed. + if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) { + JoinInfo broadcastJoinInfo = join.analyzeCondition(); + RelDistribution leftDistribution = broadcastJoinInfo.leftKeys.isEmpty() + ? RelDistributions.RANDOM_DISTRIBUTED : RelDistributions.hash(broadcastJoinInfo.leftKeys); + RelNode leftInput = join.getInput(0); + RelTraitSet leftTraitSet = leftInput.getTraitSet().plus(leftDistribution); + leftInput = leftInput.copy(leftTraitSet, leftInput.getInputs()); + RelNode rightInput = join.getInput(1); + RelTraitSet rightTraitSet = rightInput.getTraitSet().plus(RelDistributions.BROADCAST_DISTRIBUTED); + rightInput = rightInput.copy(rightTraitSet, rightInput.getInputs()); + return join.copy(join.getTraitSet(), List.of(leftInput, rightInput)); + } // Case-2: Handle dynamic filter for semi joins. JoinInfo joinInfo = join.analyzeCondition(); /* if (join.isSemiJoin() && joinInfo.nonEquiConditions.isEmpty() && joinInfo.leftKeys.size() == 1) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index c347e18a79d4..e1099eb7c776 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -774,6 +774,8 @@ private JoinNode convertLogicalJoin(LogicalJoin join) { Preconditions.checkState(projectInput instanceof TableScan, "Right input for lookup join must be a Project over TableScan, got Project over: %s", projectInput.getClass().getSimpleName()); + } else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) { + joinStrategy = JoinNode.JoinStrategy.BROADCAST_RIGHT; } else { // TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy joinStrategy = JoinNode.JoinStrategy.HASH; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java index 8704529bad97..3f6fcc36988f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelToPlanNodeConverter.java @@ -241,6 +241,8 @@ public static JoinNode convertJoin(PhysicalJoin join) { JoinNode.JoinStrategy joinStrategy; if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) { joinStrategy = JoinNode.JoinStrategy.LOOKUP; + } else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) { + joinStrategy = JoinNode.JoinStrategy.BROADCAST_RIGHT; } else { joinStrategy = JoinNode.JoinStrategy.HASH; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index 83fc50d37f94..abd04dfb2e51 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -118,6 +118,6 @@ public int hashCode() { } public enum JoinStrategy { - HASH, LOOKUP, ASOF + HASH, LOOKUP, ASOF, BROADCAST_RIGHT } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java index 1bf31abd4314..3c387886543f 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java @@ -1068,6 +1068,52 @@ private DispatchablePlanFragment findJoinStage(DispatchableSubPlan dispatchableS return null; } + /** + * Tests that the broadcast_right join hint broadcasts the right side to all workers and hash/random distributes + * the left side. This is useful for star-schema patterns where the right table is small but not pre-replicated. + */ + @Test + public void testBroadcastRightJoinHintEquiJoin() { + String query = "SELECT /*+ joinOptions(join_strategy='broadcast_right') */ * FROM a JOIN b ON a.col1 = b.col1"; + DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); + + JoinNode joinNode = findJoinNode(dispatchableSubPlan); + assertNotNull(joinNode, "Should have a join node"); + assertEquals(joinNode.getJoinStrategy(), JoinNode.JoinStrategy.BROADCAST_RIGHT, + "Join strategy should be BROADCAST_RIGHT"); + + MailboxReceiveNode leftInput = (MailboxReceiveNode) joinNode.getInputs().get(0); + MailboxReceiveNode rightInput = (MailboxReceiveNode) joinNode.getInputs().get(1); + assertEquals(leftInput.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED, + "LEFT side of broadcast_right join should be HASH distributed"); + assertEquals(rightInput.getDistributionType(), RelDistribution.Type.BROADCAST_DISTRIBUTED, + "RIGHT side of broadcast_right join should be BROADCAST distributed"); + } + + /** + * Tests that the broadcast_right hint also works for non-equi joins (no join keys): left is RANDOM, right is + * BROADCAST, overriding the default behavior of a non-equi join which would also broadcast right but does not + * surface as an explicit BROADCAST_RIGHT strategy in the JoinNode. + */ + @Test + public void testBroadcastRightJoinHintNonEquiJoin() { + String query = + "SELECT /*+ joinOptions(join_strategy='broadcast_right') */ * FROM a JOIN b ON a.col3 > b.col3"; + DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); + + JoinNode joinNode = findJoinNode(dispatchableSubPlan); + assertNotNull(joinNode, "Should have a join node"); + assertEquals(joinNode.getJoinStrategy(), JoinNode.JoinStrategy.BROADCAST_RIGHT, + "Join strategy should be BROADCAST_RIGHT"); + + MailboxReceiveNode leftInput = (MailboxReceiveNode) joinNode.getInputs().get(0); + MailboxReceiveNode rightInput = (MailboxReceiveNode) joinNode.getInputs().get(1); + assertEquals(leftInput.getDistributionType(), RelDistribution.Type.RANDOM_DISTRIBUTED, + "LEFT side of broadcast_right non-equi join should be RANDOM"); + assertEquals(rightInput.getDistributionType(), RelDistribution.Type.BROADCAST_DISTRIBUTED, + "RIGHT side of broadcast_right non-equi join should be BROADCAST"); + } + /** * Finds the JoinNode in the plan. */ From c108a761917c5ce2d7935007bd129e29cfad4ebd Mon Sep 17 00:00:00 2001 From: Akanksha Kedia Date: Wed, 10 Jun 2026 16:31:07 +0530 Subject: [PATCH 2/2] Thread BROADCAST_RIGHT through wire format and add RIGHT/FULL outer join guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per xiangfu0's review: 1. Add BROADCAST_RIGHT = 3 to plan.proto JoinStrategy enum so plans survive serialization/deserialization across broker↔server boundary. 2. Add BROADCAST_RIGHT case to PlanNodeSerializer and PlanNodeDeserializer. 3. Add BROADCAST_RIGHT case to DefaultJoinOperatorFactory — at execution time it runs as HashJoinOperator (distribution was handled at planning time). 4. Fix InStageStatsTreeBuilder.visitJoin to handle BROADCAST_RIGHT and ASOF without a fragile assert (LOOKUP→LOOKUP_JOIN, everything else→HASH_JOIN). 5. Add guard in PinotJoinExchangeNodeInsertRule and TraitAssignment.assignJoin that rejects RIGHT/FULL OUTER joins with broadcast_right hint: broadcasting the right table means each worker independently emits unmatched right rows, which would produce duplicate null-extended rows. Co-Authored-By: Claude Sonnet 4.6 --- pinot-common/src/main/proto/plan.proto | 1 + .../rel/rules/PinotJoinExchangeNodeInsertRule.java | 8 ++++++++ .../apache/pinot/calcite/rel/traits/TraitAssignment.java | 4 ++++ .../pinot/query/planner/serde/PlanNodeDeserializer.java | 2 ++ .../pinot/query/planner/serde/PlanNodeSerializer.java | 2 ++ .../pinot/query/runtime/InStageStatsTreeBuilder.java | 8 ++++---- .../operator/factory/DefaultJoinOperatorFactory.java | 1 + 7 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 352cd92f4c19..caffc48542cd 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -94,6 +94,7 @@ enum JoinStrategy { HASH = 0; LOOKUP = 1; AS_OF = 2; + BROADCAST_RIGHT = 3; } message JoinNode { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index 592b83a4bde3..d73cd26f3260 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -91,6 +91,14 @@ public void onMatch(RelOptRuleCall call) { // Broadcast-right join: broadcast the entire right side to every worker; left side is hash/random-distributed. // This eliminates the right-side network shuffle for star-schema patterns where the right table is small // enough to fit in memory but is not pre-replicated as a dimension table. + // + // RIGHT and FULL outer joins are not supported: HashJoinOperator tracks matched-right rows locally, so + // broadcasting the right table to multiple workers would cause each worker to independently emit unmatched + // right rows, resulting in duplicate null-extended rows in the output. + JoinRelType joinType = join.getJoinType(); + Preconditions.checkArgument(joinType != JoinRelType.RIGHT && joinType != JoinRelType.FULL, + "broadcast_right join hint is not supported for RIGHT or FULL OUTER joins (would produce duplicate " + + "null-extended rows). Use the default hash join instead."); if (leftDistributionType == null) { leftDistributionType = joinInfo.leftKeys.isEmpty() ? PinotHintOptions.DistributionType.RANDOM : PinotHintOptions.DistributionType.HASH; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java index ede509493966..78eb2ffa6af4 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/traits/TraitAssignment.java @@ -138,7 +138,11 @@ RelNode assignJoin(Join join) { return join; } // Case-1b: Broadcast-right join — broadcast the right input to all workers; left is hash/random-distributed. + // Not supported for RIGHT/FULL outer joins: would produce duplicate null-extended rows. if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) { + JoinRelType joinType = join.getJoinType(); + Preconditions.checkArgument(joinType != JoinRelType.RIGHT && joinType != JoinRelType.FULL, + "broadcast_right join hint is not supported for RIGHT or FULL OUTER joins"); JoinInfo broadcastJoinInfo = join.analyzeCondition(); RelDistribution leftDistribution = broadcastJoinInfo.leftKeys.isEmpty() ? RelDistributions.RANDOM_DISTRIBUTED : RelDistributions.hash(broadcastJoinInfo.leftKeys); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index 7f2144124d36..8fb03608bfb8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -378,6 +378,8 @@ private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinS return JoinNode.JoinStrategy.LOOKUP; case AS_OF: return JoinNode.JoinStrategy.ASOF; + case BROADCAST_RIGHT: + return JoinNode.JoinStrategy.BROADCAST_RIGHT; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index 99665d5c29bc..9c24adcd4651 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -362,6 +362,8 @@ private static Plan.JoinStrategy convertJoinStrategy(JoinNode.JoinStrategy joinS return Plan.JoinStrategy.LOOKUP; case ASOF: return Plan.JoinStrategy.AS_OF; + case BROADCAST_RIGHT: + return Plan.JoinStrategy.BROADCAST_RIGHT; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java index cbe9a870722f..3444eb102ed9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/InStageStatsTreeBuilder.java @@ -278,11 +278,11 @@ public ObjectNode visitFilter(FilterNode node, Context context) { @Override public ObjectNode visitJoin(JoinNode node, Context context) { - if (node.getJoinStrategy() == JoinNode.JoinStrategy.HASH) { - return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context); - } else { - assert node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP; + if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) { return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN, context); + } else { + // HASH, BROADCAST_RIGHT, and ASOF all execute via HashJoinOperator / AsofJoinOperator at runtime. + return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/factory/DefaultJoinOperatorFactory.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/factory/DefaultJoinOperatorFactory.java index 2109a48ba7d1..957d21ef30a2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/factory/DefaultJoinOperatorFactory.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/factory/DefaultJoinOperatorFactory.java @@ -42,6 +42,7 @@ public MultiStageOperator createJoinOperator(OpChainExecutionContext context, Mu DataSchema leftSchema = leftPlanNode.getDataSchema(); switch (joinStrategy) { case HASH: + case BROADCAST_RIGHT: if (joinNode.getLeftKeys().isEmpty()) { // TODO: Consider adding non-equi as a separate join strategy. return new NonEquiJoinOperator(context, leftOperator, leftSchema, rightOperator, joinNode);