From 3b94aad5e72a6b96e4a8f517ac60e0a2fed2590b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 28 May 2021 20:42:11 +0800 Subject: [PATCH] [SPARK-35552][SQL] Make query stage materialized more readable ### What changes were proposed in this pull request? Add a new method `isMaterialized` in `QueryStageExec`. ### Why are the changes needed? Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass CI. Closes #32689 from ulysses-you/SPARK-35552. Authored-by: ulysses-you Signed-off-by: Gengliang Wang --- .../sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 5 ++--- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../sql/execution/adaptive/DynamicJoinSelection.scala | 2 +- .../spark/sql/execution/adaptive/QueryStageExec.scala | 7 +++++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 614fc78477c1d..648d2e7117aa4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -37,14 +37,13 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => stage.getRuntimeStatistics.rowCount case _ => None } private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized => stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 556c0360d1fc1..ebff79060fc75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -420,7 +420,7 @@ case class AdaptiveSparkPlanExec( context.stageCache.get(e.canonicalized) match { case Some(existingStage) if conf.exchangeReuseEnabled => val stage = reuseQueryStage(existingStage, e) - val isMaterialized = stage.resultOption.get().isDefined + val isMaterialized = stage.isMaterialized CreateStageResult( newPlan = stage, allChildStagesMaterialized = isMaterialized, @@ -442,7 +442,7 @@ case class AdaptiveSparkPlanExec( newStage = reuseQueryStage(queryStage, e) } } - val isMaterialized = newStage.resultOption.get().isDefined + val isMaterialized = newStage.isMaterialized CreateStageResult( newPlan = newStage, allChildStagesMaterialized = isMaterialized, @@ -455,7 +455,7 @@ case class AdaptiveSparkPlanExec( case q: QueryStageExec => CreateStageResult(newPlan = q, - allChildStagesMaterialized = q.resultOption.get().isDefined, newStages = Seq.empty) + allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty) case _ => if (plan.children.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala index 61124f049d464..a8c74b5aa4a44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala @@ -53,7 +53,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] { } private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] = plan match { - case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.get().isDefined + case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized && stage.mapStats.isDefined => val demoteBroadcastHash = shouldDemoteBroadcastHashJoin(stage.mapStats.get) val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index a4ec4f1c22727..6451d0bd5f59f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -95,11 +95,13 @@ abstract class QueryStageExec extends LeafExecNode { /** * Compute the statistics of the query stage if executed, otherwise None. */ - def computeStats(): Option[Statistics] = resultOption.get().map { _ => + def computeStats(): Option[Statistics] = if (isMaterialized) { val runtimeStats = getRuntimeStatistics val dataSize = runtimeStats.sizeInBytes.max(0) val numOutputRows = runtimeStats.rowCount.map(_.max(0)) - Statistics(dataSize, numOutputRows, isRuntime = true) + Some(Statistics(dataSize, numOutputRows, isRuntime = true)) + } else { + None } @transient @@ -107,6 +109,7 @@ abstract class QueryStageExec extends LeafExecNode { protected var _resultOption = new AtomicReference[Option[Any]](None) private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption + def isMaterialized: Boolean = resultOption.get().isDefined override def output: Seq[Attribute] = plan.output override def outputPartitioning: Partitioning = plan.outputPartitioning