From 79502089b484034ec4359f8bc8e535d4c7f0d0d7 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Fri, 23 Feb 2024 10:07:58 -0800 Subject: [PATCH 1/2] SPARK-47148 - Avoid to materialize AQE ExchangeQueryStageExec on the cancellation --- .../execution/adaptive/QueryStageExec.scala | 39 ++++-- .../adaptive/AdaptiveQueryExecSuite.scala | 120 +++++++++++++++++- 2 files changed, 147 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 7db9271aee0c4..d55a777b6d049 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.adaptive -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.concurrent.Future @@ -51,13 +51,30 @@ abstract class QueryStageExec extends LeafExecNode { */ val plan: SparkPlan + /** + * Name of this query stage which is unique in the entire query plan. + */ + val name: String = s"${this.getClass.getSimpleName}-$id" + + /** + * This flag aims to detect if the stage materialization is started. This helps + * to avoid unnecessary stage materialization when the stage is canceled. + */ + private val materializationStarted = new AtomicBoolean() + + /** + * Exposes status if the materialization is started + */ + def isMaterializationStarted(): Boolean = materializationStarted.get() + /** * Materialize this query stage, to prepare for the execution, like submitting map stages, * broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this * stage is ready. */ final def materialize(): Future[Any] = { - logDebug(s"Materialize query stage ${this.getClass.getSimpleName}: $id") + logDebug(s"Materialize query stage: $name") + materializationStarted.set(true) doMaterialize() } @@ -198,10 +215,15 @@ case class ShuffleQueryStageExec( reuse } - override def cancel(): Unit = shuffleFuture match { - case action: FutureAction[MapOutputStatistics] if !action.isCompleted => - action.cancel() - case _ => + override def cancel(): Unit = { + if (isMaterializationStarted()) { + shuffleFuture match { + case action: FutureAction[MapOutputStatistics] if !action.isCompleted => + action.cancel() + logInfo(s"$name is cancelled.") + case _ => + } + } } /** @@ -209,7 +231,7 @@ case class ShuffleQueryStageExec( * this method returns None, as there is no map statistics. */ def mapStats: Option[MapOutputStatistics] = { - assert(resultOption.get().isDefined, s"${getClass.getSimpleName} should already be ready") + assert(resultOption.get().isDefined, s"$name should already be ready") val stats = resultOption.get().get.asInstanceOf[MapOutputStatistics] Option(stats) } @@ -251,9 +273,10 @@ case class BroadcastQueryStageExec( } override def cancel(): Unit = { - if (!broadcast.relationFuture.isDone) { + if (isMaterializationStarted() && !broadcast.relationFuture.isDone) { sparkContext.cancelJobsWithTag(broadcast.jobTag) broadcast.relationFuture.cancel(true) + logInfo(s"$name is cancelled.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 39f6aa8505b32..1adf14e72cca8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,11 +25,14 @@ import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} +import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -897,6 +900,92 @@ class AdaptiveQueryExecSuite } } + test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") { + def createJoinedDF(): DataFrame = { + val df = spark.range(5).toDF("col") + val df2 = spark.range(10).toDF("col").coalesce(2) + val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2) + df.join(df2, Seq("col")).join(df3, Seq("col")) + } + + try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val joinedDF = createJoinedDF() + + val error = intercept[SparkException] { + joinedDF.collect() + } + assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + + val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + + // All QueryStages should be based on ShuffleQueryStageExec + val shuffleQueryStageExecs = collect(adaptivePlan) { + case sqse: ShuffleQueryStageExec => sqse + } + assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " + + s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") + shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) + // First ShuffleQueryStage is materialized so it needs to be canceled. + assert(shuffleQueryStageExecs(0).isMaterializationStarted(), + "Materialization should be started.") + // Second ShuffleQueryStage materialization is failed so + // it is excluded from the cancellation due to earlyFailedStage. + assert(shuffleQueryStageExecs(1).isMaterializationStarted(), + "Materialization should be started but it is failed.") + // Last ShuffleQueryStage is not materialized yet so it does not require + // to be canceled and it is just skipped from the cancellation. + assert(!shuffleQueryStageExecs(2).isMaterializationStarted(), + "Materialization should not be started.") + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + + test("SPARK-47148: Check if BroadcastQueryStage materialization is started") { + def createJoinedDF(): DataFrame = { + spark.range(10).toDF("col1").createTempView("t1") + spark.range(5).coalesce(2).toDF("col2").createTempView("t2") + spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3") + sql("SELECT * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " + + "INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;") + } + withTempView("t1", "t2", "t3") { + try { + spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val joinedDF = createJoinedDF() + + val error = intercept[SparkException] { + joinedDF.collect() + } + assert(error.getMessage() contains "ProblematicCoalesce execution is failed") + + val adaptivePlan = + joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + + // All QueryStages should be based on BroadcastQueryStageExec + val broadcastQueryStageExecs = collect(adaptivePlan) { + case bqse: BroadcastQueryStageExec => bqse + } + assert(broadcastQueryStageExecs.length == 2, adaptivePlan) + broadcastQueryStageExecs.foreach { bqse => + assert(bqse.name.contains("BroadcastQueryStageExec-")) + // Both BroadcastQueryStages are materialized at the beginning. + assert(bqse.isMaterializationStarted(), + s"${bqse.name}' s materialization should be started.") + } + } + } finally { + spark.experimental.extraStrategies = Nil + } + } + } + test("SPARK-30403: AQE should handle InSubquery") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -1876,8 +1965,8 @@ class AdaptiveQueryExecSuite .map(_.getMessage.getFormattedMessage) .filter(_.startsWith("Materialize query stage")) .toArray - assert(materializeLogs(0).startsWith("Materialize query stage BroadcastQueryStageExec")) - assert(materializeLogs(1).startsWith("Materialize query stage ShuffleQueryStageExec")) + assert(materializeLogs(0).startsWith("Materialize query stage: BroadcastQueryStageExec-1")) + assert(materializeLogs(1).startsWith("Materialize query stage: ShuffleQueryStageExec-0")) } test("SPARK-34899: Use origin plan if we can not coalesce shuffle partition") { @@ -2900,3 +2989,26 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator { SimpleCost(cost) } } + +/** + * Helps to simulate ExchangeQueryStageExec materialization failure. + */ +private object TestProblematicCoalesceStrategy extends Strategy { + private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan) + extends UnaryExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new SparkException("ProblematicCoalesce execution is failed") + override def output: Seq[Attribute] = child.output + override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec = + copy(child = newChild) + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + plan match { + case org.apache.spark.sql.catalyst.plans.logical.Repartition( + numPartitions, false, child) => + TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil + case _ => Nil + } + } +} From 11052824a8391f8421d4065625890d7a2c0dfa96 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 24 Apr 2024 22:36:24 -0700 Subject: [PATCH 2/2] SPARK-47148 - Refactor AQE APIs --- .../execution/adaptive/QueryStageExec.scala | 50 +++++-------------- .../exchange/BroadcastExchangeExec.scala | 11 ++++ .../sql/execution/exchange/Exchange.scala | 13 +++++ .../exchange/ShuffleExchangeExec.scala | 29 +++++++++-- .../adaptive/AdaptiveQueryExecSuite.scala | 8 +-- 5 files changed, 64 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index d55a777b6d049..71e138e6152b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.adaptive -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.Future -import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} +import org.apache.spark.{MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -56,17 +56,6 @@ abstract class QueryStageExec extends LeafExecNode { */ val name: String = s"${this.getClass.getSimpleName}-$id" - /** - * This flag aims to detect if the stage materialization is started. This helps - * to avoid unnecessary stage materialization when the stage is canceled. - */ - private val materializationStarted = new AtomicBoolean() - - /** - * Exposes status if the materialization is started - */ - def isMaterializationStarted(): Boolean = materializationStarted.get() - /** * Materialize this query stage, to prepare for the execution, like submitting map stages, * broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this @@ -74,7 +63,6 @@ abstract class QueryStageExec extends LeafExecNode { */ final def materialize(): Future[Any] = { logDebug(s"Materialize query stage: $name") - materializationStarted.set(true) doMaterialize() } @@ -168,7 +156,12 @@ abstract class ExchangeQueryStageExec extends QueryStageExec { /** * Cancel the stage materialization if in progress; otherwise do nothing. */ - def cancel(): Unit + final def cancel(): Unit = { + logDebug(s"Cancel query stage: $name") + doCancel() + } + + protected def doCancel(): Unit /** * The canonicalized plan before applying query stage optimizer rules. @@ -201,9 +194,7 @@ case class ShuffleQueryStageExec( def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize - @transient private lazy val shuffleFuture = shuffle.submitShuffleJob - - override protected def doMaterialize(): Future[Any] = shuffleFuture + override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { @@ -215,16 +206,7 @@ case class ShuffleQueryStageExec( reuse } - override def cancel(): Unit = { - if (isMaterializationStarted()) { - shuffleFuture match { - case action: FutureAction[MapOutputStatistics] if !action.isCompleted => - action.cancel() - logInfo(s"$name is cancelled.") - case _ => - } - } - } + override protected def doCancel(): Unit = shuffle.cancelShuffleJob /** * Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition, @@ -258,9 +240,7 @@ case class BroadcastQueryStageExec( throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}") } - override protected def doMaterialize(): Future[Any] = { - broadcast.submitBroadcastJob - } + override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob override def newReuseInstance( newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = { @@ -272,13 +252,7 @@ case class BroadcastQueryStageExec( reuse } - override def cancel(): Unit = { - if (isMaterializationStarted() && !broadcast.relationFuture.isDone) { - sparkContext.cancelJobsWithTag(broadcast.jobTag) - broadcast.relationFuture.cancel(true) - logInfo(s"$name is cancelled.") - } - } + override protected def doCancel(): Unit = broadcast.cancelBroadcastJob() override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 60a98c67163ea..7844f470b0ef0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -67,11 +67,22 @@ trait BroadcastExchangeLike extends Exchange { * It also does the preparations work, such as waiting for the subqueries. */ final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery { + materializationStarted.set(true) completionFuture } protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] + /** + * Cancels broadcast job. + */ + final def cancelBroadcastJob(): Unit = { + if (isMaterializationStarted() && !this.relationFuture.isDone) { + sparkContext.cancelJobsWithTag(this.jobTag) + this.relationFuture.cancel(true) + } + } + /** * Returns the runtime statistics after broadcast materialization. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index c02beea4f879c..154070a954f3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.exchange +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -34,6 +36,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe. */ abstract class Exchange extends UnaryExecNode { + /** + * This flag aims to detect if the stage materialization is started. This helps + * to avoid unnecessary AQE stage materialization when the stage is canceled. + */ + protected val materializationStarted = new AtomicBoolean() + + /** + * Exposes status if the materialization is started + */ + def isMaterializationStarted(): Boolean = materializationStarted.get() + override def output: Seq[Attribute] = child.output final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 6f9402e1c9e4e..70c08edfd8678 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -47,6 +47,15 @@ import org.apache.spark.util.random.XORShiftRandom */ trait ShuffleExchangeLike extends Exchange { + /** + * The asynchronous job that materializes the shuffle. It also does the preparations work, + * such as waiting for the subqueries. + */ + @transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery { + materializationStarted.set(true) + mapOutputStatisticsFuture + } + /** * Returns the number of mappers of this shuffle. */ @@ -68,15 +77,25 @@ trait ShuffleExchangeLike extends Exchange { def shuffleOrigin: ShuffleOrigin /** - * The asynchronous job that materializes the shuffle. It also does the preparations work, - * such as waiting for the subqueries. + * Submits the shuffle job. */ - final def submitShuffleJob: Future[MapOutputStatistics] = executeQuery { - mapOutputStatisticsFuture - } + final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture protected def mapOutputStatisticsFuture: Future[MapOutputStatistics] + /** + * Cancels the shuffle job. + */ + final def cancelShuffleJob: Unit = { + if (isMaterializationStarted()) { + shuffleFuture match { + case action: FutureAction[MapOutputStatistics] if !action.isCompleted => + action.cancel() + case _ => + } + } + } + /** * Returns the shuffle RDD with specified partition specs. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1adf14e72cca8..f6ca7ff3cdcca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -930,15 +930,15 @@ class AdaptiveQueryExecSuite s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan") shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-"))) // First ShuffleQueryStage is materialized so it needs to be canceled. - assert(shuffleQueryStageExecs(0).isMaterializationStarted(), + assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(), "Materialization should be started.") // Second ShuffleQueryStage materialization is failed so // it is excluded from the cancellation due to earlyFailedStage. - assert(shuffleQueryStageExecs(1).isMaterializationStarted(), + assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(), "Materialization should be started but it is failed.") // Last ShuffleQueryStage is not materialized yet so it does not require // to be canceled and it is just skipped from the cancellation. - assert(!shuffleQueryStageExecs(2).isMaterializationStarted(), + assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(), "Materialization should not be started.") } } finally { @@ -976,7 +976,7 @@ class AdaptiveQueryExecSuite broadcastQueryStageExecs.foreach { bqse => assert(bqse.name.contains("BroadcastQueryStageExec-")) // Both BroadcastQueryStages are materialized at the beginning. - assert(bqse.isMaterializationStarted(), + assert(bqse.broadcast.isMaterializationStarted(), s"${bqse.name}' s materialization should be started.") } }