Skip to content

Commit

Permalink
Use withHashedRelation to set the HashedRelation used by a HashJoinNo…
Browse files Browse the repository at this point in the history
…de at runtime.
  • Loading branch information
yhuai committed Oct 4, 2015
1 parent f262b36 commit b9d5bc9
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.joins.{HashedRelation, BuildLeft, BuildRig
* `buildSide`. The actual work of this node will be delegated to the [[HashJoinNode]]
* that is created in `open`.
*/
case class BinarydHashJoinNode(
case class BinaryHashJoinNode(
conf: SQLConf,
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
Expand All @@ -40,8 +40,15 @@ case class BinarydHashJoinNode(
case BuildRight => (right, rightKeys, left, leftKeys)
}

private[this] var hashJoinNode: HashJoinNode = _

private[this] val hashJoinNode: HashJoinNode = {
HashJoinNode(
conf = conf,
streamedKeys = streamedKeys,
streamedNode = streamedNode,
buildSide = buildSide,
buildOutput = buildNode.output,
isWrapped = true)
}
override def output: Seq[Attribute] = left.output ++ right.output

private[this] def isUnsafeMode: Boolean = {
Expand All @@ -65,16 +72,8 @@ case class BinarydHashJoinNode(

// Call the open of streamedNode.
streamedNode.open()
// Create the HashJoinNode based on the streamedNode and HashedRelation.
hashJoinNode =
HashJoinNode(
conf = conf,
streamedKeys = streamedKeys,
streamedNode = streamedNode,
buildSide = buildSide,
buildOutput = buildNode.output,
hashedRelation = hashedRelation,
isWrapped = true)
// Set the HashedRelation used by the HashJoinNode.
hashJoinNode.withHashedRelation(hashedRelation)
// Setup this HashJoinNode. We still call these in case there is any setup work
// that needs to be done in this HashJoinNode. Because isWrapped is true,
// prepare and open will not propagate to the child of streamedNode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ case class BroadcastHashJoinNode(

override val child = streamedNode

private[this] var hashJoinNode: HashJoinNode = _
private[this] val hashJoinNode: HashJoinNode = {
HashJoinNode(
conf = conf,
streamedKeys = streamedKeys,
streamedNode = streamedNode,
buildSide = buildSide,
buildOutput = buildOutput,
isWrapped = true)
}

// Because we do not pass in the buildNode, we take the output of buildNode to
// create the inputSet properly.
Expand All @@ -51,16 +59,8 @@ case class BroadcastHashJoinNode(
override def open(): Unit = {
// Call the open of streamedNode.
streamedNode.open()
// Create the HashJoinNode based on the streamedNode and HashedRelation.
hashJoinNode =
HashJoinNode(
conf = conf,
streamedKeys = streamedKeys,
streamedNode = streamedNode,
buildSide = buildSide,
buildOutput = buildOutput,
hashedRelation = hashedRelation.value,
isWrapped = true)
// Set the HashedRelation used by the HashJoinNode.
hashJoinNode.withHashedRelation(hashedRelation.value)
// Setup this HashJoinNode. We still call these in case there is any setup work
// that needs to be done in this HashJoinNode. Because isWrapped is true,
// prepare and open will not propagate to the child of streamedNode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* A node for inner hash equi-join. It can be used individually or wrapped by other
* inner hash equi-join nodes such as [[BinarydHashJoinNode]]. This node takes a already
* inner hash equi-join nodes such as [[BinaryHashJoinNode]]. This node takes a already
* built [[HashedRelation]] and a [[LocalNode]] representing the streamed side.
* If this node is used individually, `isWrapped` should be set to false.
* If this node is wrapped in another node, `isWrapped` should be set to true
Expand All @@ -40,7 +40,6 @@ case class HashJoinNode(
streamedNode: LocalNode,
buildSide: BuildSide,
buildOutput: Seq[Attribute],
hashedRelation: HashedRelation,
isWrapped: Boolean) extends UnaryLocalNode(conf) {

override val child = streamedNode
Expand All @@ -61,7 +60,7 @@ case class HashJoinNode(
private[this] var joinRow: JoinedRow = _
private[this] var resultProjection: (InternalRow) => InternalRow = _

private[this] val hashed: HashedRelation = hashedRelation
private[this] var hashed: HashedRelation = _
private[this] var joinKeys: Projection = _

private[this] def isUnsafeMode: Boolean = {
Expand All @@ -76,6 +75,11 @@ case class HashJoinNode(
}
}

/** Sets the HashedRelation used by this node. */
def withHashedRelation(hashedRelation: HashedRelation): Unit = {
hashed = hashedRelation
}

override def prepare(): Unit = {
if (!isWrapped) {
// This node is used individually, we should propagate prepare call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class HashJoinNodeSuite extends LocalNodeTest with SharedSQLContext {
val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
val makeBinaryHashJoinNode = (node1: LocalNode, node2: LocalNode) => {
val binaryHashJoinNode =
BinarydHashJoinNode(conf, Seq('id1), Seq('id2), buildSide, node1, node2)
BinaryHashJoinNode(conf, Seq('id1), Seq('id2), buildSide, node1, node2)
resolveExpressions(binaryHashJoinNode)
}
val makeBroadcastJoinNode = (node1: LocalNode, node2: LocalNode) => {
Expand Down

0 comments on commit b9d5bc9

Please sign in to comment.