Skip to content

Commit

Permalink
[GLUTEN-1985] avoid forceShuffledHashJoin when the join condition doe…
Browse files Browse the repository at this point in the history
…s not supported by the backend
  • Loading branch information
zheniantoushipashi committed Jun 19, 2023
1 parent f3f7872 commit 7fe4e4c
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
GlutenConfig.getConf.enableColumnarSort
}

override def supportSortMergeJoinExec(): Boolean = {
false
}

override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
var allSupported = true
breakable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.glutenproject.backendsapi.SparkPlanExecApi
import io.glutenproject.execution._
import io.glutenproject.expression.{AggregateFunctionsBuilder, AliasTransformerBase, CHSha1Transformer, CHSha2Transformer, ConverterUtils, ExpressionConverter, ExpressionMappings, ExpressionTransformer, WindowFunctionsBuilder}
import io.glutenproject.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import io.glutenproject.utils.CHJoinValidateUtil
import io.glutenproject.vectorized.{CHBlockWriterJniWrapper, CHColumnarBatchSerializer}

import org.apache.spark.{ShuffleDependency, SparkException}
Expand Down Expand Up @@ -380,6 +381,14 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
new CHSha1Transformer(substraitExprName, child, original)
}

/**
* Define whether the join operator is fallback because of the join operator is not supported by
* backend
*/
override def joinFallback(condition: Option[Expression]): Boolean = {
return CHJoinValidateUtil.doValidate(condition)
}

/**
* Generate an BasicScanExecTransformer to transfrom hive table scan. Currently only for CH
* backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Not}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}

class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSuite {

Expand Down Expand Up @@ -153,7 +153,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
test("test fallbackutils") {
val testSql =
"""
|SELECT i_brand_id AS brand_id, i_brand AS brand, i_manufact_id, i_manufact,
|SELECT i_brand_id AS brand_id, i_brand AS brand, i_manufact_id, i_manufact,
| sum(ss_ext_sales_price) AS ext_price
| FROM date_dim
| LEFT JOIN store_sales ON d_date_sk = ss_sold_date_sk
Expand All @@ -172,6 +172,36 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
assert(FallbackUtil.isFallback(df.queryExecution.executedPlan))
}

test(
"Test avoid forceShuffledHashJoin when the join condition" +
" does not supported by the backend") {

withSQLConf(("spark.gluten.sql.columnar.sortMergeJoin", "false")) {
val testSql =
"""
|SELECT /*+ merge(date_dim)*/ i_brand_id AS brand_id, i_brand AS brand, i_manufact_id, i_manufact,
| sum(ss_ext_sales_price) AS ext_price
| FROM date_dim
| LEFT JOIN store_sales ON d_date_sk == ss_sold_date_sk AND (d_date_sk = 213232 OR ss_sold_date_sk = 3232)
| LEFT JOIN item ON ss_item_sk = i_item_sk AND i_manager_id = 7
| LEFT JOIN customer ON ss_customer_sk = c_customer_sk
| LEFT JOIN customer_address ON c_current_addr_sk = ca_address_sk
| LEFT JOIN store ON ss_store_sk = s_store_sk AND substr(ca_zip,1,5) <> substr(s_zip,1,5)
| WHERE d_moy = 11
| AND d_year = 1999
| GROUP BY i_brand_id, i_brand, i_manufact_id, i_manufact
| ORDER BY ext_price DESC, i_brand, i_brand_id, i_manufact_id, i_manufact
| LIMIT 100;
|""".stripMargin

val df = spark.sql(testSql)
val sortMergeJoinExec = df.queryExecution.executedPlan.collect {
case s: SortMergeJoinExec => s
}
assert(sortMergeJoinExec.nonEmpty)
}
}

test("Gluten-1235: Fix missing reading from the broadcasted value when executing DPP") {
val testSql =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ object BackendSettings extends BackendSettingsApi {

override def supportSortExec(): Boolean = true

override def supportSortMergeJoinExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSortMergeJoin
}

override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
var allSupported = true
breakable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait BackendSettingsApi {
paths: Seq[String]): Boolean = false
def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
def supportSortMergeJoinExec(): Boolean = true
def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ trait SparkPlanExecApi {
*/
def extraExpressionMappings: Seq[Sig] = Seq.empty

/**
* Define whether the join operator is fallback because of
* the join operator is not supported by backend
*/
def joinFallback(condition: Option[Expression]): Boolean = false

/**
* default function to generate window function node
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ case class JoinSelectionOverrides(session: SparkSession) extends Strategy with
}

if (forceShuffledHashJoin &&
!BackendsApiManager.getSparkPlanExecApiInstance.joinFallback(condition) &&
!left.getTagValue(TAG).isDefined &&
!right.getTagValue(TAG).isDefined) {
// Force use of ShuffledHashJoin in preference to SortMergeJoin. With no respect to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
!scanOnly && BackendsApiManager.getSettings.supportColumnarShuffleExec()
val enableColumnarSort: Boolean = !scanOnly && columnarConf.enableColumnarSort
val enableColumnarWindow: Boolean = !scanOnly && columnarConf.enableColumnarWindow
val enableColumnarSortMergeJoin: Boolean = !scanOnly && columnarConf.enableColumnarSortMergeJoin
val enableColumnarSortMergeJoin: Boolean = !scanOnly &&
BackendsApiManager.getSettings.supportSortMergeJoinExec()
val enableColumnarBatchScan: Boolean = columnarConf.enableColumnarBatchScan
val enableColumnarFileScan: Boolean = columnarConf.enableColumnarFileScan
val enableColumnarProject: Boolean = !scanOnly && columnarConf.enableColumnarProject
Expand Down

0 comments on commit 7fe4e4c

Please sign in to comment.