Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation #45234

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.concurrent.Future

import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
import org.apache.spark.{MapOutputStatistics, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -51,13 +51,18 @@ abstract class QueryStageExec extends LeafExecNode {
*/
val plan: SparkPlan

/**
* Name of this query stage which is unique in the entire query plan.
*/
val name: String = s"${this.getClass.getSimpleName}-$id"

/**
* Materialize this query stage, to prepare for the execution, like submitting map stages,
* broadcasting data, etc. The caller side can use the returned [[Future]] to wait until this
* stage is ready.
*/
final def materialize(): Future[Any] = {
logDebug(s"Materialize query stage ${this.getClass.getSimpleName}: $id")
logDebug(s"Materialize query stage: $name")
doMaterialize()
}

Expand Down Expand Up @@ -151,7 +156,12 @@ abstract class ExchangeQueryStageExec extends QueryStageExec {
/**
* Cancel the stage materialization if in progress; otherwise do nothing.
*/
def cancel(): Unit
final def cancel(): Unit = {
logDebug(s"Cancel query stage: $name")
doCancel()
}

protected def doCancel(): Unit

/**
* The canonicalized plan before applying query stage optimizer rules.
Expand Down Expand Up @@ -184,9 +194,7 @@ case class ShuffleQueryStageExec(

def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize

@transient private lazy val shuffleFuture = shuffle.submitShuffleJob

override protected def doMaterialize(): Future[Any] = shuffleFuture
override protected def doMaterialize(): Future[Any] = shuffle.submitShuffleJob

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand All @@ -198,18 +206,14 @@ case class ShuffleQueryStageExec(
reuse
}

override def cancel(): Unit = shuffleFuture match {
case action: FutureAction[MapOutputStatistics] if !action.isCompleted =>
action.cancel()
case _ =>
}
override protected def doCancel(): Unit = shuffle.cancelShuffleJob

/**
* Returns the Option[MapOutputStatistics]. If the shuffle map stage has no partition,
* this method returns None, as there is no map statistics.
*/
def mapStats: Option[MapOutputStatistics] = {
assert(resultOption.get().isDefined, s"${getClass.getSimpleName} should already be ready")
assert(resultOption.get().isDefined, s"$name should already be ready")
val stats = resultOption.get().get.asInstanceOf[MapOutputStatistics]
Option(stats)
}
Expand All @@ -236,9 +240,7 @@ case class BroadcastQueryStageExec(
throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}")
}

override protected def doMaterialize(): Future[Any] = {
broadcast.submitBroadcastJob
}
override protected def doMaterialize(): Future[Any] = broadcast.submitBroadcastJob

override def newReuseInstance(
newStageId: Int, newOutput: Seq[Attribute]): ExchangeQueryStageExec = {
Expand All @@ -250,12 +252,7 @@ case class BroadcastQueryStageExec(
reuse
}

override def cancel(): Unit = {
if (!broadcast.relationFuture.isDone) {
sparkContext.cancelJobsWithTag(broadcast.jobTag)
broadcast.relationFuture.cancel(true)
}
}
override protected def doCancel(): Unit = broadcast.cancelBroadcastJob()

override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,22 @@ trait BroadcastExchangeLike extends Exchange {
* It also does the preparations work, such as waiting for the subqueries.
*/
final def submitBroadcastJob: scala.concurrent.Future[broadcast.Broadcast[Any]] = executeQuery {
materializationStarted.set(true)
completionFuture
}

protected def completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]]

/**
* Cancels broadcast job.
*/
final def cancelBroadcastJob(): Unit = {
if (isMaterializationStarted() && !this.relationFuture.isDone) {
sparkContext.cancelJobsWithTag(this.jobTag)
this.relationFuture.cancel(true)
}
}

/**
* Returns the runtime statistics after broadcast materialization.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.exchange

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -34,6 +36,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
*/
abstract class Exchange extends UnaryExecNode {
/**
* This flag aims to detect if the stage materialization is started. This helps
* to avoid unnecessary AQE stage materialization when the stage is canceled.
*/
protected val materializationStarted = new AtomicBoolean()

/**
* Exposes status if the materialization is started
*/
def isMaterializationStarted(): Boolean = materializationStarted.get()

override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(EXCHANGE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ import org.apache.spark.util.random.XORShiftRandom
*/
trait ShuffleExchangeLike extends Exchange {

/**
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
*/
@transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery {
materializationStarted.set(true)
mapOutputStatisticsFuture
}

/**
* Returns the number of mappers of this shuffle.
*/
Expand All @@ -68,15 +77,25 @@ trait ShuffleExchangeLike extends Exchange {
def shuffleOrigin: ShuffleOrigin

/**
* The asynchronous job that materializes the shuffle. It also does the preparations work,
* such as waiting for the subqueries.
* Submits the shuffle job.
*/
final def submitShuffleJob: Future[MapOutputStatistics] = executeQuery {
mapOutputStatisticsFuture
}
final def submitShuffleJob: Future[MapOutputStatistics] = shuffleFuture

protected def mapOutputStatisticsFuture: Future[MapOutputStatistics]

/**
* Cancels the shuffle job.
*/
final def cancelShuffleJob: Unit = {
if (isMaterializationStarted()) {
shuffleFuture match {
case action: FutureAction[MapOutputStatistics] if !action.isCompleted =>
action.cancel()
case _ =>
}
}
}

/**
* Returns the shuffle RDD with specified partition specs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
Expand Down Expand Up @@ -897,6 +900,92 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") {
def createJoinedDF(): DataFrame = {
val df = spark.range(5).toDF("col")
val df2 = spark.range(10).toDF("col").coalesce(2)
val df3 = spark.range(15).toDF("col").filter(Symbol("col") >= 2)
df.join(df2, Seq("col")).join(df3, Seq("col"))
}

try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")

val adaptivePlan = joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on ShuffleQueryStageExec
val shuffleQueryStageExecs = collect(adaptivePlan) {
case sqse: ShuffleQueryStageExec => sqse
}
assert(shuffleQueryStageExecs.length == 3, s"Physical Plan should include " +
s"3 ShuffleQueryStages. Physical Plan: $adaptivePlan")
shuffleQueryStageExecs.foreach(sqse => assert(sqse.name.contains("ShuffleQueryStageExec-")))
// First ShuffleQueryStage is materialized so it needs to be canceled.
assert(shuffleQueryStageExecs(0).shuffle.isMaterializationStarted(),
"Materialization should be started.")
// Second ShuffleQueryStage materialization is failed so
// it is excluded from the cancellation due to earlyFailedStage.
assert(shuffleQueryStageExecs(1).shuffle.isMaterializationStarted(),
"Materialization should be started but it is failed.")
// Last ShuffleQueryStage is not materialized yet so it does not require
// to be canceled and it is just skipped from the cancellation.
assert(!shuffleQueryStageExecs(2).shuffle.isMaterializationStarted(),
"Materialization should not be started.")
}
} finally {
spark.experimental.extraStrategies = Nil
}
}

test("SPARK-47148: Check if BroadcastQueryStage materialization is started") {
def createJoinedDF(): DataFrame = {
spark.range(10).toDF("col1").createTempView("t1")
spark.range(5).coalesce(2).toDF("col2").createTempView("t2")
spark.range(15).toDF("col3").filter(Symbol("col3") >= 2).createTempView("t3")
sql("SELECT * FROM (SELECT /*+ BROADCAST(t2) */ * FROM t1 " +
"INNER JOIN t2 ON t1.col1 = t2.col2) t JOIN t3 ON t.col1 = t3.col3;")
}
withTempView("t1", "t2", "t3") {
try {
spark.experimental.extraStrategies = TestProblematicCoalesceStrategy :: Nil
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val joinedDF = createJoinedDF()

val error = intercept[SparkException] {
joinedDF.collect()
}
assert(error.getMessage() contains "ProblematicCoalesce execution is failed")

val adaptivePlan =
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]

// All QueryStages should be based on BroadcastQueryStageExec
val broadcastQueryStageExecs = collect(adaptivePlan) {
case bqse: BroadcastQueryStageExec => bqse
}
assert(broadcastQueryStageExecs.length == 2, adaptivePlan)
broadcastQueryStageExecs.foreach { bqse =>
assert(bqse.name.contains("BroadcastQueryStageExec-"))
// Both BroadcastQueryStages are materialized at the beginning.
assert(bqse.broadcast.isMaterializationStarted(),
s"${bqse.name}' s materialization should be started.")
}
}
} finally {
spark.experimental.extraStrategies = Nil
}
}
}

test("SPARK-30403: AQE should handle InSubquery") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down Expand Up @@ -1876,8 +1965,8 @@ class AdaptiveQueryExecSuite
.map(_.getMessage.getFormattedMessage)
.filter(_.startsWith("Materialize query stage"))
.toArray
assert(materializeLogs(0).startsWith("Materialize query stage BroadcastQueryStageExec"))
assert(materializeLogs(1).startsWith("Materialize query stage ShuffleQueryStageExec"))
assert(materializeLogs(0).startsWith("Materialize query stage: BroadcastQueryStageExec-1"))
assert(materializeLogs(1).startsWith("Materialize query stage: ShuffleQueryStageExec-0"))
}

test("SPARK-34899: Use origin plan if we can not coalesce shuffle partition") {
Expand Down Expand Up @@ -2900,3 +2989,26 @@ private case class SimpleShuffleSortCostEvaluator() extends CostEvaluator {
SimpleCost(cost)
}
}

/**
* Helps to simulate ExchangeQueryStageExec materialization failure.
*/
private object TestProblematicCoalesceStrategy extends Strategy {
private case class TestProblematicCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new SparkException("ProblematicCoalesce execution is failed")
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan): TestProblematicCoalesceExec =
copy(child = newChild)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case org.apache.spark.sql.catalyst.plans.logical.Repartition(
numPartitions, false, child) =>
TestProblematicCoalesceExec(numPartitions, planLater(child)) :: Nil
case _ => Nil
}
}
}