Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL][SPARK-2212]Hash Outer Join #1147

Closed

Conversation

chenghao-intel
Copy link
Contributor

This patch is to support the hash based outer join. Currently, outer join for big relations are resort to BoradcastNestedLoopJoin, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans.

Here is the testing code that I used:

package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

case class Record(key: String, value: String)

object JoinTablePrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i % 828193}", s"val_$i")))

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists b")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key STRING, value STRING)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE b (key STRING, value STRING)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key STRING, value STRING)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records 
             | insert into table a
             | select key, value
           """.stripMargin)
  hql(s"""from records 
             | insert into table b select key + 100000, value
           """.stripMargin)
}

object JoinTablePerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=20")

  val leftOuterJoin = "insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key"
  val rightOuterJoin = "insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key"
  val fullOuterJoin = "insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key"

  val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) :: ("LeftOuterJoin", benchmark(leftOuterJoin)) :: 
                ("RightOuterJoin", benchmark(rightOuterJoin)) :: ("RightOuterJoin", benchmark(rightOuterJoin)) :: 
                ("FullOuterJoin", benchmark(fullOuterJoin)) :: ("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
  val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain $rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
  println(explains.mkString(",\n"))
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val result = hql(cmd)
    val end = System.currentTimeMillis()
    val count = hql("select count(1) from result").collect.mkString("")
    ((end - begin), count)
  }
}

And the result as shown below:

[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[  HashOuterJoin [key#95], [key#97], LeftOuter, None],
[   Exchange (HashPartitioning [key#95], 20)],
[    HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#97], 20)],
[    HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[  HashOuterJoin [key#102], [key#104], RightOuter, None],
[   Exchange (HashPartitioning [key#102], 20)],
[    HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#104], 20)],
[    HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[  HashOuterJoin [key#109], [key#111], FullOuter, None],
[   Exchange (HashPartitioning [key#109], 20)],
[    HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#111], 20)],
[    HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)

Without this PR, the benchmark will run seems never end.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15942/

@@ -114,48 +94,27 @@ object HashFilteredJoin extends Logging with PredicateHelper {
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the join filter push down stuff, as we have the PushPredicateThroughJoin in Optimizer(Catalyst),

@marmbrus
Copy link
Contributor

I think it would be much better if this PR just added support for LeftOuter (and maybe RightOuter too?) to HashJoin instead of rewriting all of our join planning code.

@chenghao-intel
Copy link
Contributor Author

Thank you all for the comments, I will changed some of the code accordingly.
This PR actually contains 2 relevant parts:

  • Code Re-factor for Join
    • Removed FilteredOperation from the patterns.scala, cause the filters(WHERE CONDITION & JOIN CONDITION) has been pushed down via the PushPredicateThroughJoin in logical.Optimizer.scala already. Discard the combination of filters(where and join condition) seems make the join pattern match more clean and simple.
    • Pattern matching order is actually very critical for the Join Operator Selection in SparkStrategies.scala, hence I merged the 3 Join Strategies into 1.
    • The trait BinaryJoinNode, which can be utilized by HashJoin / SortMergeJoin(will implement soon) / CartesionProduct(InnerJoin) / MapSide Join (Left/Inner/LeftSemi, assume the right table is the build table) for all of the join types; and if we want to add code gen for join condition, only we need to modify is the trait BinaryJoinNode.
  • Add Outer Join Support for HashJoin
    • With BinaryJoinNode, add hash based outer join support is easy.

Sorry, this PR changes lots of code, and make the whole logic not easy to understand.

@chenghao-intel
Copy link
Contributor Author

I am planning to split the PR into 3 smaller ones, 2 of those (#1187 & #1190) are done , and I will update this PR as the 3rd one once the previous have been merged.

@chenghao-intel
Copy link
Contributor Author

BTW, it will be cool if #1163 is merged before I start working on this updating.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@chenghao-intel chenghao-intel changed the title [SQL][SPARK-2212]HashJoin(Shuffled) [SQL][SPARK-2212]Hash Outer Join Jun 27, 2014
@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16189/

@chenghao-intel
Copy link
Contributor Author

Thanks @marmbrus merged some of the dependent PRs. I've also updated both code and description for this PR accordingly, some of the issues were listed in the PR description, probably we can discuss those during the code review.

@SparkQA
Copy link

SparkQA commented Jul 31, 2014

QA tests have started for PR 1147. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17551/consoleFull

@chenghao-intel
Copy link
Contributor Author

I've updated the code by rebasing the latest master, this PR will greatly improve the outer join performance for big tables. See the local benchmark in the description.

var left: UnresolvedRelation = _
var right: UnresolvedRelation = _

override def beforeEach() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove left, right, beforeEach, afterEach.

@marmbrus
Copy link
Contributor

marmbrus commented Aug 1, 2014

Thanks a lot for working on this! I agree that it would be great to merge this in before 1.1. I'm a little worried about how much memory this is going to require. However, the current implementation is so bad and since I think what you have done here is strictly better, we can probably just merge this in and then revisit.

@marmbrus
Copy link
Contributor

marmbrus commented Aug 1, 2014

I also really like how isolated this change is and the inclusion of the benchmark :)

@chenghao-intel
Copy link
Contributor Author

Thank you @marmbrus I've updated the code as suggested.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1147. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17663/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1147:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class HashOuterJoin(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17663/consoleFull

*/
object HashOuterJoin {
val DUMMY_LIST = Seq[Row](null)
val EMPTY_LIST = Seq[Row]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Seq.empty[Row] inline instead of this variable.

@marmbrus
Copy link
Contributor

marmbrus commented Aug 1, 2014

I'm going to go ahead and merge this so we can have it in 1.1. It would be great if you could address the final readability concerns in a follow up PR.

Thanks again for implementing this :) We have had a few questions about it on the mailing list!

@asfgit asfgit closed this in 4415722 Aug 1, 2014
@chenghao-intel
Copy link
Contributor Author

Thank you very much @marmbrus , I will create follow up for the improvement. :)

@chenghao-intel chenghao-intel deleted the hash_based_outer_join branch August 4, 2014 03:11
asfgit pushed a commit that referenced this pull request Aug 12, 2014
…for HashOuterJoin

This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in #1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap

(cherry picked from commit 5d54d71)
Signed-off-by: Michael Armbrust <michael@databricks.com>
asfgit pushed a commit that referenced this pull request Aug 12, 2014
…for HashOuterJoin

This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in #1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This patch is to support the hash based outer join. Currently, outer join for big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. This PR will create 2 hash tables for both relations in the same partition, which greatly reduce the table scans.

Here is the testing code that I used:
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

case class Record(key: String, value: String)

object JoinTablePrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i % 828193}", s"val_$i")))

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists b")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE b (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
  hql(s"""from records
             | insert into table b select key + 100000, value
           """.stripMargin)
}

object JoinTablePerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=20")

  val leftOuterJoin = "insert overwrite table result select a.key, b.value from a left outer join b on a.key=b.key"
  val rightOuterJoin = "insert overwrite table result select a.key, b.value from a right outer join b on a.key=b.key"
  val fullOuterJoin = "insert overwrite table result select a.key, b.value from a full outer join b on a.key=b.key"

  val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) :: ("LeftOuterJoin", benchmark(leftOuterJoin)) ::
                ("RightOuterJoin", benchmark(rightOuterJoin)) :: ("RightOuterJoin", benchmark(rightOuterJoin)) ::
                ("FullOuterJoin", benchmark(fullOuterJoin)) :: ("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
  val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain $rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
  println(explains.mkString(",\n"))
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val result = hql(cmd)
    val end = System.currentTimeMillis()
    val count = hql("select count(1) from result").collect.mkString("")
    ((end - begin), count)
  }
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[  HashOuterJoin [key#95], [key#97], LeftOuter, None],
[   Exchange (HashPartitioning [key#95], 20)],
[    HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#97], 20)],
[    HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[  HashOuterJoin [key#102], [key#104], RightOuter, None],
[   Exchange (HashPartitioning [key#102], 20)],
[    HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#104], 20)],
[    HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[  HashOuterJoin [key#109], [key#111], FullOuter, None],
[   Exchange (HashPartitioning [key#109], 20)],
[    HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#111], 20)],
[    HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)
```

Without this PR, the benchmark will run seems never end.

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1147 from chenghao-intel/hash_based_outer_join and squashes the following commits:

65c599e [Cheng Hao] Fix issues with the community comments
72b1394 [Cheng Hao] Fix bug of stale value in joinedRow
55baef7 [Cheng Hao] Add HashOuterJoin
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…for HashOuterJoin

This is a follow up for apache#1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in apache#1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants