Skip to content

Commit

Permalink
[SPARK-11675][SQL] Remove shuffle hash joins.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <rxin@databricks.com>

Closes #9645 from rxin/SPARK-11675.

(cherry picked from commit e49e723)
Signed-off-by: Reynold Xin <rxin@databricks.com>
  • Loading branch information
rxin committed Nov 12, 2015
1 parent 2823504 commit 990f8ce
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 717 deletions.
9 changes: 1 addition & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,6 @@ private[spark] object SQLConf {
defaultValue = Some(5 * 60),
doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")

// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
defaultValue = Some(true),
doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")

// This is only used for the thriftserver
val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
doc = "Set a Fair Scheduler pool for a JDBC client session")
Expand Down Expand Up @@ -469,6 +463,7 @@ private[spark] object SQLConf {
val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
}
}

Expand Down Expand Up @@ -533,8 +528,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def nativeView: Boolean = getConf(NATIVE_VIEW)

private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

private[spark] def subexpressionEliminationEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
* of the join will be broadcasted and the other side will be streamed, with no shuffling
* performed. If both sides of the join are eligible to be broadcasted then the
* - Sort merge: if the matching join keys are sortable and
* [[org.apache.spark.sql.SQLConf.SORTMERGE_JOIN]] is enabled (default), then sort merge join
* will be used.
* - Hash: will be chosen if neither of the above optimizations apply to this join.
* - Sort merge: if the matching join keys are sortable.
*/
object EquiJoinSelection extends Strategy with PredicateHelper {

Expand All @@ -103,22 +100,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
if RowOrdering.isOrderable(leftKeys) =>
val mergeJoin =
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val buildSide =
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
joins.BuildRight
} else {
joins.BuildLeft
}
val hashJoin = joins.ShuffledHashJoin(
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil

// --- Outer joins --------------------------------------------------------------------------

case ExtractEquiJoinKeys(
Expand All @@ -132,14 +118,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
joins.ShuffledHashOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

// --- Cases where this strategy does not apply ---------------------------------------------

case _ => Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)

(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
s"will be ignored. Sort merge join will continue to be used.")
Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true"))
}
(keyValueOutput, runFunc)

// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 990f8ce

Please sign in to comment.