Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
AS_OF = 2;
BROADCAST_RIGHT = 3;
}

message JoinNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public static class JoinHintOptions {
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";
// "broadcast_right" broadcasts the right side to all join workers; left side is hash/random-distributed.
// Use when the right table is small enough to fit in memory but is not pre-replicated.
public static final String BROADCAST_RIGHT_JOIN_STRATEGY = "broadcast_right";

public static final String LEFT_DISTRIBUTION_TYPE = "left_distribution_type";
public static final String RIGHT_DISTRIBUTION_TYPE = "right_distribution_type";
Expand Down Expand Up @@ -140,6 +143,10 @@ public static boolean useLookupJoinStrategy(Join join) {
return LOOKUP_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}

public static boolean useBroadcastRightJoinStrategy(Join join) {
return BROADCAST_RIGHT_JOIN_STRATEGY.equalsIgnoreCase(getJoinStrategyHint(join));
}

@Nullable
public static DistributionType getLeftDistributionType(Map<String, String> joinHintOptions) {
return DistributionType.fromHint(joinHintOptions.get(LEFT_DISTRIBUTION_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ public void onMatch(RelOptRuleCall call) {
// worker, avoiding hotspots.
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(Collections.emptyList()));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(Collections.emptyList()));
} else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) {
// Broadcast-right join: broadcast the entire right side to every worker; left side is hash/random-distributed.
// This eliminates the right-side network shuffle for star-schema patterns where the right table is small
// enough to fit in memory but is not pre-replicated as a dimension table.
//
// RIGHT and FULL outer joins are not supported: HashJoinOperator tracks matched-right rows locally, so
// broadcasting the right table to multiple workers would cause each worker to independently emit unmatched
// right rows, resulting in duplicate null-extended rows in the output.
JoinRelType joinType = join.getJoinType();
Preconditions.checkArgument(joinType != JoinRelType.RIGHT && joinType != JoinRelType.FULL,
"broadcast_right join hint is not supported for RIGHT or FULL OUTER joins (would produce duplicate "
+ "null-extended rows). Use the default hash join instead.");
if (leftDistributionType == null) {
leftDistributionType = joinInfo.leftKeys.isEmpty()
? PinotHintOptions.DistributionType.RANDOM : PinotHintOptions.DistributionType.HASH;
}
if (rightDistributionType == null) {
rightDistributionType = PinotHintOptions.DistributionType.BROADCAST;
}
newLeft = createExchangeForHashJoin(leftDistributionType, joinInfo.leftKeys, left, null);
newRight = createExchangeForHashJoin(rightDistributionType, joinInfo.rightKeys, right, null);
} else {
// Hash join
// Force pre-partitioned exchange when colocated join hint is provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ RelNode assignJoin(Join join) {
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
return join;
}
// Case-1b: Broadcast-right join — broadcast the right input to all workers; left is hash/random-distributed.
// Not supported for RIGHT/FULL outer joins: would produce duplicate null-extended rows.
if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) {
JoinRelType joinType = join.getJoinType();
Preconditions.checkArgument(joinType != JoinRelType.RIGHT && joinType != JoinRelType.FULL,
"broadcast_right join hint is not supported for RIGHT or FULL OUTER joins");
JoinInfo broadcastJoinInfo = join.analyzeCondition();
RelDistribution leftDistribution = broadcastJoinInfo.leftKeys.isEmpty()
? RelDistributions.RANDOM_DISTRIBUTED : RelDistributions.hash(broadcastJoinInfo.leftKeys);
RelNode leftInput = join.getInput(0);
RelTraitSet leftTraitSet = leftInput.getTraitSet().plus(leftDistribution);
leftInput = leftInput.copy(leftTraitSet, leftInput.getInputs());
RelNode rightInput = join.getInput(1);
RelTraitSet rightTraitSet = rightInput.getTraitSet().plus(RelDistributions.BROADCAST_DISTRIBUTED);
rightInput = rightInput.copy(rightTraitSet, rightInput.getInputs());
return join.copy(join.getTraitSet(), List.of(leftInput, rightInput));
}
// Case-2: Handle dynamic filter for semi joins.
JoinInfo joinInfo = join.analyzeCondition();
/* if (join.isSemiJoin() && joinInfo.nonEquiConditions.isEmpty() && joinInfo.leftKeys.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ private JoinNode convertLogicalJoin(LogicalJoin join) {
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
} else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) {
joinStrategy = JoinNode.JoinStrategy.BROADCAST_RIGHT;
} else {
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
joinStrategy = JoinNode.JoinStrategy.HASH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public static JoinNode convertJoin(PhysicalJoin join) {
JoinNode.JoinStrategy joinStrategy;
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;
} else if (PinotHintOptions.JoinHintOptions.useBroadcastRightJoinStrategy(join)) {
joinStrategy = JoinNode.JoinStrategy.BROADCAST_RIGHT;
} else {
joinStrategy = JoinNode.JoinStrategy.HASH;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ public int hashCode() {
}

public enum JoinStrategy {
HASH, LOOKUP, ASOF
HASH, LOOKUP, ASOF, BROADCAST_RIGHT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinS
return JoinNode.JoinStrategy.LOOKUP;
case AS_OF:
return JoinNode.JoinStrategy.ASOF;
case BROADCAST_RIGHT:
return JoinNode.JoinStrategy.BROADCAST_RIGHT;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ private static Plan.JoinStrategy convertJoinStrategy(JoinNode.JoinStrategy joinS
return Plan.JoinStrategy.LOOKUP;
case ASOF:
return Plan.JoinStrategy.AS_OF;
case BROADCAST_RIGHT:
return Plan.JoinStrategy.BROADCAST_RIGHT;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,52 @@ private DispatchablePlanFragment findJoinStage(DispatchableSubPlan dispatchableS
return null;
}

/**
* Tests that the broadcast_right join hint broadcasts the right side to all workers and hash/random distributes
* the left side. This is useful for star-schema patterns where the right table is small but not pre-replicated.
*/
@Test
public void testBroadcastRightJoinHintEquiJoin() {
String query = "SELECT /*+ joinOptions(join_strategy='broadcast_right') */ * FROM a JOIN b ON a.col1 = b.col1";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);

JoinNode joinNode = findJoinNode(dispatchableSubPlan);
assertNotNull(joinNode, "Should have a join node");
assertEquals(joinNode.getJoinStrategy(), JoinNode.JoinStrategy.BROADCAST_RIGHT,
"Join strategy should be BROADCAST_RIGHT");

MailboxReceiveNode leftInput = (MailboxReceiveNode) joinNode.getInputs().get(0);
MailboxReceiveNode rightInput = (MailboxReceiveNode) joinNode.getInputs().get(1);
assertEquals(leftInput.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED,
"LEFT side of broadcast_right join should be HASH distributed");
assertEquals(rightInput.getDistributionType(), RelDistribution.Type.BROADCAST_DISTRIBUTED,
"RIGHT side of broadcast_right join should be BROADCAST distributed");
}

/**
* Tests that the broadcast_right hint also works for non-equi joins (no join keys): left is RANDOM, right is
* BROADCAST, overriding the default behavior of a non-equi join which would also broadcast right but does not
* surface as an explicit BROADCAST_RIGHT strategy in the JoinNode.
*/
@Test
public void testBroadcastRightJoinHintNonEquiJoin() {
String query =
"SELECT /*+ joinOptions(join_strategy='broadcast_right') */ * FROM a JOIN b ON a.col3 > b.col3";
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query);

JoinNode joinNode = findJoinNode(dispatchableSubPlan);
assertNotNull(joinNode, "Should have a join node");
assertEquals(joinNode.getJoinStrategy(), JoinNode.JoinStrategy.BROADCAST_RIGHT,
"Join strategy should be BROADCAST_RIGHT");

MailboxReceiveNode leftInput = (MailboxReceiveNode) joinNode.getInputs().get(0);
MailboxReceiveNode rightInput = (MailboxReceiveNode) joinNode.getInputs().get(1);
assertEquals(leftInput.getDistributionType(), RelDistribution.Type.RANDOM_DISTRIBUTED,
"LEFT side of broadcast_right non-equi join should be RANDOM");
assertEquals(rightInput.getDistributionType(), RelDistribution.Type.BROADCAST_DISTRIBUTED,
"RIGHT side of broadcast_right non-equi join should be BROADCAST");
}

/**
* Finds the JoinNode in the plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,11 @@ public ObjectNode visitFilter(FilterNode node, Context context) {

@Override
public ObjectNode visitJoin(JoinNode node, Context context) {
if (node.getJoinStrategy() == JoinNode.JoinStrategy.HASH) {
return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context);
} else {
assert node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP;
if (node.getJoinStrategy() == JoinNode.JoinStrategy.LOOKUP) {
return recursiveCase(node, MultiStageOperator.Type.LOOKUP_JOIN, context);
} else {
// HASH, BROADCAST_RIGHT, and ASOF all execute via HashJoinOperator / AsofJoinOperator at runtime.
return recursiveCase(node, MultiStageOperator.Type.HASH_JOIN, context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public MultiStageOperator createJoinOperator(OpChainExecutionContext context, Mu
DataSchema leftSchema = leftPlanNode.getDataSchema();
switch (joinStrategy) {
case HASH:
case BROADCAST_RIGHT:
if (joinNode.getLeftKeys().isEmpty()) {
// TODO: Consider adding non-equi as a separate join strategy.
return new NonEquiJoinOperator(context, leftOperator, leftSchema, rightOperator, joinNode);
Expand Down
Loading