diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9807ebe04b..317303eb7b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -289,13 +289,6 @@ object CometConf extends ShimCometConf { .checkValues(Set("native", "jvm", "auto")) .createWithDefault("auto") - val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] = - conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar") - .doc("Whether to try falling back to columnar shuffle when native shuffle is not supported") - .internal() - .booleanConf - .createWithDefault(false) - val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled") .doc( diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 18e91d9da6..aeb1472c67 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -363,6 +363,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 433b4741979..07148eee480 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} + import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral} + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index daef11ae4d6..9f3cc9181f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -386,7 +424,7 @@ index daef11ae4d6..9f3cc9181f2 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..cc5224af735 100644 +index f33432ddb6f..1925aac8d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -417,17 +455,37 @@ index f33432ddb6f..cc5224af735 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -611,7 +669,7 @@ index 1792b4c32eb..1616e6f39bd 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 7f062bfb899..b347ef905d2 100644 +index 7f062bfb899..0ed85486e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier @@ -707,7 +765,7 @@ index 7f062bfb899..b347ef905d2 100644 // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) } -@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } // Test shuffled hash join @@ -722,6 +780,7 @@ index 7f062bfb899..b347ef905d2 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -735,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -745,7 +804,7 @@ index 7f062bfb899..b347ef905d2 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -760,7 +819,7 @@ index 7f062bfb899..b347ef905d2 100644 } // Test output ordering is not preserved -@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -775,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644 } // Test singe partition -@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -785,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -795,7 +854,7 @@ index 7f062bfb899..b347ef905d2 100644 }.size == 1) } } -@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -818,7 +877,7 @@ index 7f062bfb899..b347ef905d2 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } -@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite +@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) } @@ -1280,6 +1339,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff index 32d4d617e8..68e0b8e362 100644 --- a/dev/diffs/3.5.4.diff +++ b/dev/diffs/3.5.4.diff @@ -342,6 +342,44 @@ index 7ee18df3756..64f01a68048 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 47a311c71d5..342e71cfdd4 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f32b32ffc5a..447d7c6416e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -365,7 +403,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..19ce507e82b 100644 +index f33432ddb6f..0fa49fb3f0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -422,7 +460,7 @@ index f33432ddb6f..19ce507e82b 100644 - test("avoid reordering broadcast join keys to match input hash partitioning") { + test("avoid reordering broadcast join keys to match input hash partitioning", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { @@ -442,7 +480,7 @@ index f33432ddb6f..19ce507e82b 100644 test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( @@ -482,7 +520,7 @@ index f33432ddb6f..19ce507e82b 100644 - test("SPARK-34637: DPP side broadcast query stage is created firstly") { + test("SPARK-34637: DPP side broadcast query stage is created firstly", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { val df = sql( """ WITH v as ( @@ -746,7 +784,7 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 4d256154c85..43f0bebb00c 100644 +index 4d256154c85..66a5473852d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -872,7 +910,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -880,6 +918,7 @@ index 4d256154c85..43f0bebb00c 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -893,7 +932,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -903,7 +942,7 @@ index 4d256154c85..43f0bebb00c 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -918,7 +957,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test output ordering is not preserved -@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -933,7 +972,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test singe partition -@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -943,7 +982,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -953,7 +992,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size == 1) } } -@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -976,6 +1015,16 @@ index 4d256154c85..43f0bebb00c 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet does not support spilling")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index c26757c9cff..d55775f09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1451,6 +1500,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff index 9ca5310876..1fa55686d9 100644 --- a/dev/diffs/3.5.5.diff +++ b/dev/diffs/3.5.5.diff @@ -342,6 +342,44 @@ index 7ee18df3756..64f01a68048 100644 withTable("tbl") { sql( """ +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index 47a311c71d5..342e71cfdd4 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f32b32ffc5a..447d7c6416e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -365,7 +403,7 @@ index f32b32ffc5a..447d7c6416e 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index f33432ddb6f..fe9f74ff8f1 100644 +index f33432ddb6f..0e1499a24ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -386,7 +424,37 @@ index f33432ddb6f..fe9f74ff8f1 100644 case _ => Nil } } -@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + + test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + +- "canonicalization and exchange reuse") { ++ "canonicalization and exchange reuse", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = sql( +@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -563,7 +631,7 @@ index 7af826583bd..3c3def1eb67 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index 4d256154c85..43f0bebb00c 100644 +index 4d256154c85..66a5473852d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -689,7 +757,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -697,6 +765,7 @@ index 4d256154c85..43f0bebb00c 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -710,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -720,7 +789,7 @@ index 4d256154c85..43f0bebb00c 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -735,7 +804,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test output ordering is not preserved -@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -750,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644 } // Test singe partition -@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -760,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -770,7 +839,7 @@ index 4d256154c85..43f0bebb00c 100644 }.size == 1) } } -@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -793,6 +862,16 @@ index 4d256154c85..43f0bebb00c 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet does not support spilling")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index c26757c9cff..d55775f09d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1256,6 +1335,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..ab7baf434a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 2fec4297d8..3fb9b2c025 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -374,6 +374,44 @@ index 760ee802608..b77133ffd37 100644 } assert(exchanges.size == 2) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +index e3aff9b36ae..06196517935 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression + import org.apache.spark.sql.catalyst.optimizer.TransposeWindow + import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning ++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.execution.window.WindowExec + import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} + import org.apache.spark.sql.functions._ +@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest + } + + def isShuffleExecByRequirement( +- plan: ShuffleExchangeExec, ++ plan: ShuffleExchangeLike, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns ++ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) => ++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + +@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest + val shuffleByRequirement = windowed.queryExecution.executedPlan.exists { + case w: WindowExec => + w.child.exists { +- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) ++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + } + case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 16a493b5290..3f0b70e2d59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -397,7 +435,7 @@ index 16a493b5290..3f0b70e2d59 100644 assert(exchanges.size == 2) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala -index 2c24cc7d570..3e6a8632fa6 100644 +index 2c24cc7d570..21d36ebc6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen @@ -428,17 +466,37 @@ index 2c24cc7d570..3e6a8632fa6 100644 Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", -@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("avoid reordering broadcast join keys to match input hash partitioning") { ++ test("avoid reordering broadcast join keys to match input hash partitioning", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTable("large", "dimTwo", "dimThree") { +@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse") { + "canonicalization and exchange reuse", -+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) { ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( -@@ -1455,7 +1461,8 @@ abstract class DynamicPartitionPruningSuiteBase +@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase + } + } + +- test("SPARK-34637: DPP side broadcast query stage is created firstly") { ++ test("SPARK-34637: DPP side broadcast query stage is created firstly", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/1839")) { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + val df = sql( + """ WITH v as ( +@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase } } @@ -448,7 +506,7 @@ index 2c24cc7d570..3e6a8632fa6 100644 val df = sql( """ |SELECT s.store_id, f.product_id -@@ -1730,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat +@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat case s: BatchScanExec => // we use f1 col for v2 tables due to schema pruning s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1"))) @@ -649,7 +707,7 @@ index 53e47f428c3..a55d8f0c161 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index fcb937d82ba..fafe8e8d08b 100644 +index fcb937d82ba..fc208087a69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -775,7 +833,7 @@ index fcb937d82ba..fafe8e8d08b 100644 }.size === 1) // Same result between shuffled hash join and sort merge join checkAnswer(shjDF, smjResult) -@@ -1435,13 +1449,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1435,13 +1449,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -783,6 +841,7 @@ index fcb937d82ba..fafe8e8d08b 100644 + true + case WholeStageCodegenExec(ColumnarToRowExec( + InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, _)))) => true ++ case _: CometHashJoinExec => true }.size === 1) checkAnswer(shjCodegenDF, Seq.empty) @@ -796,7 +855,7 @@ index fcb937d82ba..fafe8e8d08b 100644 checkAnswer(shjNonCodegenDF, Seq.empty) } } -@@ -1489,7 +1509,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1489,7 +1510,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) // Have shuffle before aggregation @@ -806,7 +865,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } def getJoinQuery(selectExpr: String, joinType: String): String = { -@@ -1518,9 +1539,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1518,9 +1540,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -821,7 +880,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } // Test output ordering is not preserved -@@ -1529,9 +1553,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1529,9 +1554,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0" val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1) @@ -836,7 +895,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } // Test singe partition -@@ -1541,7 +1568,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1541,7 +1569,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2 |""".stripMargin) val plan = fullJoinDF.queryExecution.executedPlan @@ -846,7 +905,7 @@ index fcb937d82ba..fafe8e8d08b 100644 checkAnswer(fullJoinDF, Row(100)) } } -@@ -1586,6 +1614,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1586,6 +1615,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -856,7 +915,7 @@ index fcb937d82ba..fafe8e8d08b 100644 }.size == 1) } } -@@ -1630,14 +1661,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan +@@ -1630,14 +1662,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { def check(plan: SparkPlan): Unit = { @@ -879,7 +938,7 @@ index fcb937d82ba..fafe8e8d08b 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } -@@ -1773,7 +1810,8 @@ class ThreadLeakInSortMergeJoinSuite +@@ -1773,7 +1811,8 @@ class ThreadLeakInSortMergeJoinSuite sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) } @@ -1451,6 +1510,28 @@ index 47679ed7865..9ffbaecb98e 100644 }.length == hashAggCount) assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +index eec396b2e39..bf3f1c769d6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.execution + + import org.apache.spark.TestUtils.assertSpilled +-import org.apache.spark.sql.{AnalysisException, QueryTest, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row} + import org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, WINDOW_EXEC_BUFFER_SPILL_THRESHOLD} + import org.apache.spark.sql.test.SharedSparkSession + +@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSparkSession { + Row(1, 3, null) :: Row(2, null, 4) :: Nil) + } + +- test("test with low buffer spill threshold") { ++ test("test with low buffer spill threshold", IgnoreComet("Comet does not support spilling")) { + val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") + nums.createOrReplaceTempView("nums") + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 966f4e74712..8017e22d7f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c9dd4f17b7..54e6e63649 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.{CometConf, ExtendedExplainInfo} -import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR} +import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -491,16 +491,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { None } - // this is a temporary workaround because some Spark SQL tests fail - // when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs - // that we had not previously seen - val tryColumnarNext = - !nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty && - COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf)) - val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) { nativeShuffle - } else if (tryColumnarNext) { + } else { // Columnar shuffle for regular Spark operators (not Comet) and Comet operators // (if configured). // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not @@ -526,8 +519,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } else { None } - } else { - None } if (nativeOrColumnarShuffle.isDefined) { diff --git a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala index ecc0823d60..a1a96d321b 100644 --- a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala +++ b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometConf @@ -56,7 +57,8 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa override def apply(plan: SparkPlan): SparkPlan = { val newPlan = _apply(plan) if (showTransformations) { - logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan") + // scalastyle:off println + System.err.println(s"EliminateRedundantTransitions:\nINPUT: $plan\nOUTPUT: $newPlan") } newPlan } @@ -64,7 +66,7 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa private def _apply(plan: SparkPlan): SparkPlan = { val eliminatedPlan = plan transformUp { case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec) - if (plan.conf.adaptiveExecutionEnabled) => + if plan.conf.adaptiveExecutionEnabled => shuffleExchangeExec case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => if (sparkToColumnar.child.supportsColumnar) { @@ -112,6 +114,7 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa private def hasCometNativeChild(op: SparkPlan): Boolean = { op match { case c: QueryStageExec => hasCometNativeChild(c.plan) + case c: ReusedExchangeExec => hasCometNativeChild(c.child) case _ => op.exists(_.isInstanceOf[CometPlan]) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index 0391a1c3b3..95e03ca69c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.comet.util.{Utils => CometUtils} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{CodegenSupport, ColumnarToRowTransition, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ @@ -172,6 +173,7 @@ case class CometColumnarToRowExec(child: SparkPlan) op match { case b: CometBroadcastExchangeExec => Some(b) case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan) + case b: ReusedExchangeExec => findCometBroadcastExchange(b.child) case _ => op.children.collectFirst(Function.unlift(findCometBroadcastExchange)) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 8d084fd75d..0e582ddaba 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -188,6 +188,7 @@ class CometTPCDSQuerySuite conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g") + conf.set(CometConf.COMET_EXPLAIN_TRANSFORMATIONS.key, "true") conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "15g") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 0b15def98b..2a7983f0b5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,7 +78,6 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") - conf.set(CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 9f611612f6..cf887a1013 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -272,7 +272,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString, CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key -> "true", CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {