Skip to content

Commit

Permalink
[SPARK-37370][SQL] Add SQL configs to control newly added join code-g…
Browse files Browse the repository at this point in the history
…en in 3.3

### What changes were proposed in this pull request?

During Spark 3.3, we added code-gen for FULL OUTER shuffled hash join, FULL OUTER sort merge join, and Existence sort merge join. Given the join test coverage is not high, and we would love to avoid any upcoming release regression due to it. So here we introduce three internal configs to allow users and developers to disable code-gen in case we found any bug after release.

### Why are the changes needed?

Allow users for quick mitigation in case any bug found. Avoid release regression.

### Does this PR introduce _any_ user-facing change?

Yes, three internal configs to control each join. By default they are true, so code-gen is enabled.

### How was this patch tested?

Only added configs here, so rely on existing tests.

Closes #34643 from c21/join-config.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
c21 authored and cloud-fan committed Nov 19, 2021
1 parent 0398b5b commit 43b05e7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 0 deletions.
Expand Up @@ -1765,6 +1765,30 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN =
buildConf("spark.sql.codegen.join.fullOuterShuffledHashJoin.enabled")
.internal()
.doc("When true, enable code-gen for FULL OUTER shuffled hash join.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN =
buildConf("spark.sql.codegen.join.fullOuterSortMergeJoin.enabled")
.internal()
.doc("When true, enable code-gen for FULL OUTER sort merge join.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val ENABLE_EXISTENCE_SORT_MERGE_JOIN_CODEGEN =
buildConf("spark.sql.codegen.join.existenceSortMergeJoin.enabled")
.internal()
.doc("When true, enable code-gen for Existence sort merge join.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val MAX_NESTED_VIEW_DEPTH =
buildConf("spark.sql.view.maxNestedViewDepth")
.internal()
Expand Down
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{RowIterator, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.collection.{BitSet, OpenHashSet}

/**
Expand Down Expand Up @@ -311,6 +312,11 @@ case class ShuffledHashJoinExec(
streamResultIter ++ buildResultIter
}

override def supportCodegen: Boolean = joinType match {
case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
case _ => true
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
streamedPlan.execute() :: buildPlan.execute() :: Nil
}
Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.util.collection.BitSet

Expand Down Expand Up @@ -376,6 +377,12 @@ case class SortMergeJoinExec(
private lazy val streamedOutput = streamedPlan.output
private lazy val bufferedOutput = bufferedPlan.output

override def supportCodegen: Boolean = joinType match {
case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN)
case _: ExistenceJoin => conf.getConf(SQLConf.ENABLE_EXISTENCE_SORT_MERGE_JOIN_CODEGEN)
case _ => true
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
streamedPlan.execute() :: bufferedPlan.execute() :: Nil
}
Expand Down

0 comments on commit 43b05e7

Please sign in to comment.