diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 7db9271aee0c4..71e138e6152b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.Future -import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} +import org.apache.spark.{MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -51,13 +51,18 @@ abstract class QueryStageExec extends LeafExecNode { */ val plan: SparkPlan + /** + * Name of this query stage which is unique in the entire query plan. + */ + val name: String = s"${this.getClass.getSimpleName}-$id" + /** * Materialize this query stage, to prepare for the execution, like submitting map stages, * broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this * stage is ready. */ final def materialize(): Future[Any] = { - logDebug(s"Materialize query stage ${this.getClass.getSimpleName}: $id") + logDebug(s"Materialize query stage: $name") doMaterialize() } @@ -151,7 +156,12 @@ abstract class ExchangeQueryStageExec extends QueryStageExec { /** * Cancel the stage materialization if in progress; otherwise do nothing. */ - def cancel(): Unit + final def cancel(): Unit = { + logDebug(s"Cancel query stage: $name") + doCancel() + } + + protected def doCancel(): Unit /** * The canonicalized plan before applying query stage optimizer rules. @@ -184,9 +194,7 @@ case class ShuffleQueryStageExec( def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize - @transient private lazy val shuffleFuture = shuffle.submitShuffleJob - - override protected def doMaterialize(): Future[Any] = shuffleFuture + override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { @@ -198,18 +206,14 @@ case class ShuffleQueryStageExec( reuse } - override def cancel(): Unit = shuffleFuture match { - case action: FutureAction[MapOutputStatistics] if !action.isCompleted => - action.cancel() - case _ => - } + override protected def doCancel(): Unit = shuffle.cancelShuffleJob /** * Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition, * this method returns None, as there is no map statistics. */ def mapStats: Option[MapOutputStatistics] = { - assert(resultOption.get().isDefined, s"${getClass.getSimpleName} should already be ready") + assert(resultOption.get().isDefined, s"$name should already be ready") val stats = resultOption.get().get.asInstanceOf[MapOutputStatistics] Option(stats) } @@ -236,9 +240,7 @@ case class BroadcastQueryStageExec( throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}") } - override protected def doMaterialize(): Future[Any] = { - broadcast.submitBroadcastJob - } + override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { @@ -250,12 +252,7 @@ case class BroadcastQueryStageExec( reuse } - override def cancel(): Unit = { - if (!broadcast.relationFuture.isDone) { - sparkContext.cancelJobsWithTag(broadcast.jobTag) - broadcast.relationFuture.cancel(true) - } - } + override protected def doCancel(): Unit = broadcast.cancelBroadcastJob() override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 60a98c67163ea..7844f470b0ef0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -67,11 +67,22 @@ trait BroadcastExchangeLike extends Exchange { * It also does the preparations work, such as waiting for the subqueries. */ final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery { + materializationStarted.set(true) completionFuture } protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] + /** + * Cancels broadcast job. + */ + final def cancelBroadcastJob(): Unit = { + if (isMaterializationStarted() && !this.relationFuture.isDone) { + sparkContext.cancelJobsWithTag(this.jobTag) + this.relationFuture.cancel(true) + } + } + /** * Returns the runtime statistics after broadcast materialization. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index c02beea4f879c..154070a954f3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.exchange +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -34,6 +36,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe. */ abstract class Exchange extends UnaryExecNode { + /** + * This flag aims to detect if the stage materialization is started. This helps + * to avoid unnecessary AQE stage materialization when the stage is canceled. + */ + protected val materializationStarted = new AtomicBoolean() + + /** + * Exposes status if the materialization is started + */ + def isMaterializationStarted(): Boolean = materializationStarted.get() + override def output: Seq[Attribute] = child.output final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 6f9402e1c9e4e..70c08edfd8678 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -47,6 +47,15 @@ import org.apache.spark.util.random.XORShiftRandom */ trait ShuffleExchangeLike extends Exchange { + /** + * The asynchronous job that materializes the shuffle. It also does the preparations work, + * such as waiting for the subqueries. + */ + @transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery { + materializationStarted.set(true) + mapOutputStatisticsFuture + } + /** * Returns the number of mappers of this shuffle. */ @@ -68,15 +77,25 @@ trait ShuffleExchangeLike extends Exchange { def shuffleOrigin: ShuffleOrigin /** - * The asynchronous job that materializes the shuffle. It also does the preparations work, - * such as waiting for the subqueries. + * Submits the shuffle job. */ - final def submitShuffleJob: Future[MapOutputStatistics] = executeQuery { - mapOutputStatisticsFuture - } + final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture protected def mapOutputStatisticsFuture: Future[MapOutputStatistics] + /** + * Cancels the shuffle job. + */ + final def cancelShuffleJob: Unit = { + if (isMaterializationStarted()) { + shuffleFuture match { + case action: FutureAction[MapOutputStatistics] if !action.isCompleted => + action.cancel() + case _ => + } + } + } + /** * Returns the shuffle RDD with specified partition specs. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 39f6aa8505b32..f6ca7ff3cdcca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,11 +25,14 @@ import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} +import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -897,6 +900,92 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { + def createJoinedDF(): DataFrame = { + val df = spark.range(5).toDF("col") + val df2 = spark.range(10).toDF("col").coalesce(2) + val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2) + df.join(df2, Seq("col")).join(df3, Seq("col")) + } + + try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val joinedDF = createJoinedDF() + + val error = intercept[SparkException] { + joinedDF.collect() + } + assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + + val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + + // All QueryStages should be based on ShuffleQueryStageExec + val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse + } + assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") + shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) + // First ShuffleQueryStage is materialized so it needs to be canceled. + assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(), + "Materialization should be started.") + // Second ShuffleQueryStage materialization is failed so + // it is excluded from the cancellation due to earlyFailedStage. + assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(), + "Materialization should be started but it is failed.") + // Last ShuffleQueryStage is not materialized yet so it does not require + // to be canceled and it is just skipped from the cancellation. + assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(), + "Materialization should not be started.") + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + + test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { + def createJoinedDF(): DataFrame = { + spark.range(10).toDF("col1").createTempView("t1") + spark.range(5).coalesce(2).toDF("col2").createTempView("t2") + spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3") + sql("SELECT * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " + + "INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;") + } + withTempView("t1", "t2", "t3") { + try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val joinedDF = createJoinedDF() + + val error = intercept[SparkException] { + joinedDF.collect() + } + assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + + val adaptivePlan = + joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + + // All QueryStages should be based on BroadcastQueryStageExec + val broadcastQueryStageExecs = collect(adaptivePlan) { + case bqse: BroadcastQueryStageExec => bqse + } + assert(broadcastQueryStageExecs.length == 2, adaptivePlan) + broadcastQueryStageExecs.foreach { bqse => + assert(bqse.name.contains("BroadcastQueryStageExec-")) + // Both BroadcastQueryStages are materialized at the beginning. + assert(bqse.broadcast.isMaterializationStarted(), + s"${bqse.name}' s materialization should be started.") + } + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + } + test("SPARK-30403: AQE should handle InSubquery") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -1876,8 +1965,8 @@ class AdaptiveQueryExecSuite .map(_.getMessage.getFormattedMessage) .filter(_.startsWith("Materialize query stage")) .toArray - assert(materializeLogs(0).startsWith("Materialize query stage BroadcastQueryStageExec")) - assert(materializeLogs(1).startsWith("Materialize query stage ShuffleQueryStageExec")) + assert(materializeLogs(0).startsWith("Materialize query stage: BroadcastQueryStageExec-1")) + assert(materializeLogs(1).startsWith("Materialize query stage: ShuffleQueryStageExec-0")) } test("SPARK-34899: Use origin plan if we can not coalesce shuffle partition") { @@ -2900,3 +2989,26 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator { SimpleCost(cost) } } + +/** + * Helps to simulate ExchangeQueryStageExec materialization failure. + */ +private object TestProblematicCoalesceStrategy extends Strategy { + private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan) + extends UnaryExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new SparkException("ProblematicCoalesce execution is failed") + override def output: Seq[Attribute] = child.output + override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec = + copy(child = newChild) + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + plan match { + case org.apache.spark.sql.catalyst.plans.logical.Repartition( + numPartitions, false, child) => + TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil + case _ => Nil + } + } +}