Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10887] [SQL] Build HashedRelation outside of HashJoinNode. #8953

Closed
wants to merge 6 commits into from
Closed

[SPARK-10887] [SQL] Build HashedRelation outside of HashJoinNode. #8953

wants to merge 6 commits into from

Conversation

yhuai
Copy link
Contributor

@yhuai yhuai commented Sep 30, 2015

This PR refactors HashJoinNode to take a existing HashedRelation. So, we can reuse this node for both ShuffledHashJoin and BroadcastHashJoin.

https://issues.apache.org/jira/browse/SPARK-10887

@SparkQA
Copy link

SparkQA commented Oct 1, 2015

Test build #43140 has finished for PR 8953 at commit 0cf8d44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai yhuai changed the title [SPARK-10887] [sql] Build HashedRelation outside of HashJoinNode. [SPARK-10887] [SQL] [WIP] Build HashedRelation outside of HashJoinNode. Oct 2, 2015
@yhuai
Copy link
Contributor Author

yhuai commented Oct 2, 2015

I think we still need to have a 2-way hash join node that takes two LocalNodes as children. It will internally use the refactored hash join node.

@yhuai yhuai changed the title [SPARK-10887] [SQL] [WIP] Build HashedRelation outside of HashJoinNode. [SPARK-10887] [SQL] Build HashedRelation outside of HashJoinNode. Oct 2, 2015
@SparkQA
Copy link

SparkQA commented Oct 2, 2015

Test build #43187 has finished for PR 8953 at commit f262b36.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinarydHashJoinNode(
    • case class BroadcastHashJoinNode(

@yhuai
Copy link
Contributor Author

yhuai commented Oct 2, 2015

test this please

@SparkQA
Copy link

SparkQA commented Oct 2, 2015

Test build #43189 has started for PR 8953 at commit f262b36.

@yhuai
Copy link
Contributor Author

yhuai commented Oct 2, 2015

test this please

@SparkQA
Copy link

SparkQA commented Oct 3, 2015

Test build #43201 has finished for PR 8953 at commit f262b36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinarydHashJoinNode(
    • case class BroadcastHashJoinNode(

@SparkQA
Copy link

SparkQA commented Oct 4, 2015

Test build #43223 has finished for PR 8953 at commit b9d5bc9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinaryHashJoinNode(
    • case class BroadcastHashJoinNode(

@SparkQA
Copy link

SparkQA commented Oct 6, 2015

Test build #43282 has finished for PR 8953 at commit e8a3d17.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinaryHashJoinNode(
    • case class BroadcastHashJoinNode(

left: LocalNode,
right: LocalNode) extends BinaryLocalNode(conf) {

private[this] lazy val (buildNode, buildKeys, streamedNode, streamedKeys) = buildSide match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a lazy val?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, doesn't need to be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed lazy.

@SparkQA
Copy link

SparkQA commented Oct 6, 2015

Test build #43283 has finished for PR 8953 at commit 65c6a50.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinaryHashJoinNode(
    • case class BroadcastHashJoinNode(

* input [[InternalRow]] for a fixed set of [[Expression Expressions]]. It exposes a `target`
* method. This method is used to set the row that will be updated. So, when `target` is used, the
* [[MutableRow]] object created internally will not be used. If `target` is not used, the
* [[MutableRow]] object created internally will be used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Where is target? I can't find it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See public ${classOf[BaseMutableProjection].getName} target($mutableRowType row) {.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh got it

@andrewor14
Copy link
Contributor

As discussed offline, we should explore alternatives where we don't have to wrap local nodes to avoid the wrapping logic. Right now we need to worry about the initialization / prepare / close order and it's easy to miss something there.

val resolvedBuildNode = resolveExpressions(buildNode)
val resolvedBuildKeys = resolveExpressions(buildKeys, resolvedBuildNode)
val hashedRelation = buildHashedRelation(conf, resolvedBuildKeys, resolvedBuildNode)
val broadcastHashedRelation = sqlContext.sparkContext.broadcast(hashedRelation)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 This is the place where I need sqlcontext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just create a broadcast variable directly? You might have to do some mocking for that, e.g.

import org.mockito.Mockito.{mock, when}

val hashedRelation = ...
val broadcastHashedRelation = mock(classOf[TorrentBroadcast[HashedRelation]])
when(broadcastHashedRelation.value).thenReturn(hashedRelation)

would that work? Then we won't have to rely on SparkContext.

@yhuai
Copy link
Contributor Author

yhuai commented Oct 7, 2015

@andrewor14 I have made HashJoinNode a trait.

@SparkQA
Copy link

SparkQA commented Oct 7, 2015

Test build #43340 has finished for PR 8953 at commit ecbd1d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* input [[InternalRow]] for a fixed set of [[Expression Expressions]].
* input [[InternalRow]] for a fixed set of [[Expression Expressions]]. It exposes a `target`
* method. This method is used to set the row that will be updated. So, when `target` is used, the
* [[MutableRow]] object created internally will not be used. If `target` is not used, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is quite verbose. I think you can just say

It exposes a `target` method, which is used to set the row that will be updated.
The internal [[MutableRow]] object created internally is used only when `target` is not used.


/**
* A node for inner hash equi-join. [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An abstract node for sharing common functionality among different implementations of
inner hash equi-join, notably [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]].

@andrewor14
Copy link
Contributor

@yhuai thanks, this looks much simpler. My remaining comments are mainly concerned with comments and tests.

@andrewor14
Copy link
Contributor

LGTM will merge pending tasks. Thanks for addressing the comments so quickly.

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43352 has finished for PR 8953 at commit 081e331.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinaryHashJoinNode(
    • case class BroadcastHashJoinNode(
    • trait HashJoinNode

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43366 has finished for PR 8953 at commit 081e331.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BinaryHashJoinNode(
    • case class BroadcastHashJoinNode(
    • trait HashJoinNode

@asfgit asfgit closed this in 82d275f Oct 8, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants