Skip to content

Commit

Permalink
[SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on…
Browse files Browse the repository at this point in the history
… the cancellation

### What changes were proposed in this pull request?
AQE can materialize both `ShuffleQueryStage` and `BroadcastQueryStage` on the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is already non-materialized (a.k.a `ShuffleQueryStage.shuffleFuture` or `BroadcastQueryStage.broadcastFuture` is not initialized yet), it should just be skipped without materializing it.

**Problematic Stacktrace:**
```
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:104)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:210)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:210)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:223)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$cleanUpAndThrowException$1(AdaptiveSparkPlanExec.scala:905)
```

Please find sample use-case:
**1- Stage Materialization Steps:**
When stage materialization is failed:
```
1.1- ShuffleQueryStage1 - is materialized successfully,
1.2- ShuffleQueryStage2 - materialization is failed,
1.3- ShuffleQueryStage3 - Not materialized yet so ShuffleQueryStage3.shuffleFuture is not initialized yet
```
**2- Stage Cancellation Steps:**
```
2.1- ShuffleQueryStage1 - is canceled due to already materialized,
2.2- ShuffleQueryStage2 - is earlyFailedStage so currently, it is skipped as default by AQE because it could not be materialized,
2.3- ShuffleQueryStage3 - Problem is here: This stage is not materialized yet but currently, it is also tried to cancel and this stage requires to be materialized first.
```
**Reproduce Steps:**
https://github.com/apache/spark/pull/45234/files#diff-f89f2fe78b324c6bc7190bef84220181f3616efc156ea99b3f15d375a22d7f88R900

### Why are the changes needed?
Current logic introduces unnecessary Shuffle Job / Broadcast Job to be able to cancel `ShuffleQueryStage` / `BroadcastQueryStage`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added new Unit Tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45234 from erenavsarogullari/SPARK-47148.

Authored-by: erenavsarogullari <erenavsarogullari@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
erenavsarogullari authored and JacobZheng0927 committed May 11, 2024
1 parent d38b6af commit 4c1a166
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.concurrent.Future

import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
import org.apache.spark.{MapOutputStatistics, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -51,13 +51,18 @@ abstract class QueryStageExec extends LeafExecNode {
*/
val plan: SparkPlan

/**
* Name of this query stage which is unique in the entire query plan.
*/
val name: String = s"${this.getClass.getSimpleName}-$id"

/**
* Materialize this query stage, to prepare for the execution, like submitting map stages,
* broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this
* stage is ready.
*/
final def materialize(): Future[Any] = {
logDebug(s"Materialize query stage ${this.getClass.getSimpleName}: $id")
logDebug(s"Materialize query stage: $name")
doMaterialize()
}

Expand Down Expand Up @@ -151,7 +156,12 @@ abstract class ExchangeQueryStageExec extends QueryStageExec {
/**
* Cancel the stage materialization if in progress; otherwise do nothing.
*/
def cancel(): Unit
final def cancel(): Unit = {
logDebug(s"Cancel query stage: $name")
doCancel()
}

protected def doCancel(): Unit

/**
* The canonicalized plan before applying query stage optimizer rules.
Expand Down Expand Up @@ -184,9 +194,7 @@ case class ShuffleQueryStageExec(

def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize

@transient private lazy val shuffleFuture = shuffle.submitShuffleJob

override protected def doMaterialize(): Future[Any] = shuffleFuture
override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand All @@ -198,18 +206,14 @@ case class ShuffleQueryStageExec(
reuse
}

override def cancel(): Unit = shuffleFuture match {
case action: FutureAction[MapOutputStatistics] if !action.isCompleted =>
action.cancel()
case _ =>
}
override protected def doCancel(): Unit = shuffle.cancelShuffleJob

/**
* Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition,
* this method returns None, as there is no map statistics.
*/
def mapStats: Option[MapOutputStatistics] = {
assert(resultOption.get().isDefined, s"${getClass.getSimpleName} should already be ready")
assert(resultOption.get().isDefined, s"$name should already be ready")
val stats = resultOption.get().get.asInstanceOf[MapOutputStatistics]
Option(stats)
}
Expand All @@ -236,9 +240,7 @@ case class BroadcastQueryStageExec(
throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}")
}

override protected def doMaterialize(): Future[Any] = {
broadcast.submitBroadcastJob
}
override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand All @@ -250,12 +252,7 @@ case class BroadcastQueryStageExec(
reuse
}

override def cancel(): Unit = {
if (!broadcast.relationFuture.isDone) {
sparkContext.cancelJobsWithTag(broadcast.jobTag)
broadcast.relationFuture.cancel(true)
}
}
override protected def doCancel(): Unit = broadcast.cancelBroadcastJob()

override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,22 @@ trait BroadcastExchangeLike extends Exchange {
* It also does the preparations work, such as waiting for the subqueries.
*/
final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery {
materializationStarted.set(true)
completionFuture
}

protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]]

/**
* Cancels broadcast job.
*/
final def cancelBroadcastJob(): Unit = {
if (isMaterializationStarted() && !this.relationFuture.isDone) {
sparkContext.cancelJobsWithTag(this.jobTag)
this.relationFuture.cancel(true)
}
}

/**
* Returns the runtime statistics after broadcast materialization.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.exchange

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -34,6 +36,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryExecNode {
/**
* This flag aims to detect if the stage materialization is started. This helps
* to avoid unnecessary AQE stage materialization when the stage is canceled.
*/
protected val materializationStarted = new AtomicBoolean()

/**
* Exposes status if the materialization is started
*/
def isMaterializationStarted(): Boolean = materializationStarted.get()

override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ import org.apache.spark.util.random.XORShiftRandom
*/
trait ShuffleExchangeLike extends Exchange {

/**
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
*/
@transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery {
materializationStarted.set(true)
mapOutputStatisticsFuture
}

/**
* Returns the number of mappers of this shuffle.
*/
Expand All @@ -68,15 +77,25 @@ trait ShuffleExchangeLike extends Exchange {
def shuffleOrigin: ShuffleOrigin

/**
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
* Submits the shuffle job.
*/
final def submitShuffleJob: Future[MapOutputStatistics] = executeQuery {
mapOutputStatisticsFuture
}
final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture

protected def mapOutputStatisticsFuture: Future[MapOutputStatistics]

/**
* Cancels the shuffle job.
*/
final def cancelShuffleJob: Unit = {
if (isMaterializationStarted()) {
shuffleFuture match {
case action: FutureAction[MapOutputStatistics] if !action.isCompleted =>
action.cancel()
case _ =>
}
}
}

/**
* Returns the shuffle RDD with specified partition specs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
Expand Down Expand Up @@ -897,6 +900,92 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") {
def createJoinedDF(): DataFrame = {
val df = spark.range(5).toDF("col")
val df2 = spark.range(10).toDF("col").coalesce(2)
val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2)
df.join(df2, Seq("col")).join(df3, Seq("col"))
}

try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")

val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on ShuffleQueryStageExec
val shuffleQueryStageExecs = collect(adaptivePlan) {
case sqse: ShuffleQueryStageExec => sqse
}
assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " +
s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-")))
// First ShuffleQueryStage is materialized so it needs to be canceled.
assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(),
"Materialization should be started.")
// Second ShuffleQueryStage materialization is failed so
// it is excluded from the cancellation due to earlyFailedStage.
assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(),
"Materialization should be started but it is failed.")
// Last ShuffleQueryStage is not materialized yet so it does not require
// to be canceled and it is just skipped from the cancellation.
assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(),
"Materialization should not be started.")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}

test("SPARK-47148: Check if BroadcastQueryStage materialization is started") {
def createJoinedDF(): DataFrame = {
spark.range(10).toDF("col1").createTempView("t1")
spark.range(5).coalesce(2).toDF("col2").createTempView("t2")
spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3")
sql("SELECT * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " +
"INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;")
}
withTempView("t1", "t2", "t3") {
try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")

val adaptivePlan =
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on BroadcastQueryStageExec
val broadcastQueryStageExecs = collect(adaptivePlan) {
case bqse: BroadcastQueryStageExec => bqse
}
assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
broadcastQueryStageExecs.foreach { bqse =>
assert(bqse.name.contains("BroadcastQueryStageExec-"))
// Both BroadcastQueryStages are materialized at the beginning.
assert(bqse.broadcast.isMaterializationStarted(),
s"${bqse.name}' s materialization should be started.")
}
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

test("SPARK-30403: AQE should handle InSubquery") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down Expand Up @@ -1876,8 +1965,8 @@ class AdaptiveQueryExecSuite
.map(_.getMessage.getFormattedMessage)
.filter(_.startsWith("Materialize query stage"))
.toArray
assert(materializeLogs(0).startsWith("Materialize query stage BroadcastQueryStageExec"))
assert(materializeLogs(1).startsWith("Materialize query stage ShuffleQueryStageExec"))
assert(materializeLogs(0).startsWith("Materialize query stage: BroadcastQueryStageExec-1"))
assert(materializeLogs(1).startsWith("Materialize query stage: ShuffleQueryStageExec-0"))
}

test("SPARK-34899: Use origin plan if we can not coalesce shuffle partition") {
Expand Down Expand Up @@ -2900,3 +2989,26 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
SimpleCost(cost)
}
}

/**
* Helps to simulate ExchangeQueryStageExec materialization failure.
*/
private object TestProblematicCoalesceStrategy extends Strategy {
private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new SparkException("ProblematicCoalesce execution is failed")
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case org.apache.spark.sql.catalyst.plans.logical.Repartition(
numPartitions, false, child) =>
TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil
case _ => Nil
}
}
}

0 comments on commit 4c1a166

Please sign in to comment.