Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4570][SQL]add BroadcastLeftSemiJoinHash #3442

Closed
wants to merge 5 commits into from

Conversation

wangxiaojing
Copy link
Contributor

JIRA issue: SPARK-4570
We are planning to create a BroadcastLeftSemiJoinHash to implement the broadcast join for left semijoin
In left semijoin :
If the size of data from right side is smaller than the user-settable threshold AUTO_BROADCASTJOIN_THRESHOLD,
the planner would mark it as the broadcast relation and mark the other relation as the stream side. The broadcast table will be broadcasted to all of the executors involved in the join, as a org.apache.spark.broadcast.Broadcast object. It will use joins.BroadcastLeftSemiJoinHash.,else it will use joins.LeftSemiJoinHash.

The benchmark suggests these made the optimized version 4x faster when left semijoin


Original:
left semi join : 9288 ms 
Optimized:
left semi join : 1963 ms 

The micro benchmark load data1/kv3.txt into a normal Hive table.
Benchmark code:


 def benchmark(f: => Unit) = {
    val begin = System.currentTimeMillis()
    f
    val end = System.currentTimeMillis()
    end - begin
  }
  val sc = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))
  val hiveContext = new HiveContext(sc)
  import hiveContext._
  sql("drop table if exists left_table")
  sql("drop table if exists right_table")
  sql( """create table left_table (key int, value string)
       """.stripMargin)
  sql( s"""load data local inpath "/data1/kv3.txt" into table left_table""")
  sql( """create table right_table (key int, value string)
       """.stripMargin)
  sql(
    """
      |from left_table
      |insert overwrite table right_table
      |select left_table.key, left_table.value
    """.stripMargin)

  val leftSimeJoin = sql(
    """select a.key from left_table a
      |left semi join right_table b on a.key = b.key""".stripMargin)
  val leftSemiJoinDuration = benchmark(leftSimeJoin.count())
  println(s"left semi join : $leftSemiJoinDuration ms ")

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@wangxiaojing
Copy link
Contributor Author

@liancheng

@liancheng
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Dec 1, 2014

Test build #23972 has started for PR 3442 at commit 3a63ecb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 1, 2014

Test build #23972 has finished for PR 3442 at commit 3a63ecb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastLeftSemiJoinHash(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23972/
Test FAILed.

@wangxiaojing
Copy link
Contributor Author

@liancheng

@liancheng
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 1, 2014

Test build #23983 has started for PR 3442 at commit 3a63ecb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 1, 2014

Test build #23983 has finished for PR 3442 at commit 3a63ecb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastLeftSemiJoinHash(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23983/
Test PASSed.


override def execute() = {

val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove blank line

@marmbrus
Copy link
Contributor

marmbrus commented Dec 1, 2014

Thanks for working on this!

case (query, joinClass) => assertJoin(query, joinClass)
}

sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-$tmp""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-$tmp: typo? And we can just use setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString) here.

@SparkQA
Copy link

SparkQA commented Dec 2, 2014

Test build #24025 has started for PR 3442 at commit d410f67.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 2, 2014

Test build #24025 has finished for PR 3442 at commit d410f67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastLeftSemiJoinHash(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24025/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Dec 4, 2014

Test build #24122 has started for PR 3442 at commit 4fdcfe7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 4, 2014

Test build #24122 has finished for PR 3442 at commit 4fdcfe7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastLeftSemiJoinHash(

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24122/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24533 has started for PR 3442 at commit 3a58191.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24533 has finished for PR 3442 at commit 3a58191.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BroadcastLeftSemiJoinHash(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24533/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Dec 24, 2014

Test build #24753 has started for PR 3442 at commit a4a43c9.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 24, 2014

Test build #24753 has finished for PR 3442 at commit a4a43c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24753/
Test PASSed.

@marmbrus
Copy link
Contributor

Thanks, merged to master.

@asfgit asfgit closed this in 07fa191 Dec 30, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants