Skip to content

Commit

Permalink
[SPARK-35552][SQL] Make query stage materialized more readable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add a new method `isMaterialized` in `QueryStageExec`.

### Why are the changes needed?

Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes #32689 from ulysses-you/SPARK-35552.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
ulysses-you authored and gengliangwang committed May 28, 2021
1 parent 7eb7448 commit 3b94aad
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
Expand Up @@ -37,14 +37,13 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0)

private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined =>
case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized =>
stage.getRuntimeStatistics.rowCount
case _ => None
}

private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
if stage.resultOption.get().isDefined =>
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized =>
stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys
case _ => false
}
Expand Down
Expand Up @@ -420,7 +420,7 @@ case class AdaptiveSparkPlanExec(
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
val stage = reuseQueryStage(existingStage, e)
val isMaterialized = stage.resultOption.get().isDefined
val isMaterialized = stage.isMaterialized
CreateStageResult(
newPlan = stage,
allChildStagesMaterialized = isMaterialized,
Expand All @@ -442,7 +442,7 @@ case class AdaptiveSparkPlanExec(
newStage = reuseQueryStage(queryStage, e)
}
}
val isMaterialized = newStage.resultOption.get().isDefined
val isMaterialized = newStage.isMaterialized
CreateStageResult(
newPlan = newStage,
allChildStagesMaterialized = isMaterialized,
Expand All @@ -455,7 +455,7 @@ case class AdaptiveSparkPlanExec(

case q: QueryStageExec =>
CreateStageResult(newPlan = q,
allChildStagesMaterialized = q.resultOption.get().isDefined, newStages = Seq.empty)
allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty)

case _ =>
if (plan.children.isEmpty) {
Expand Down
Expand Up @@ -53,7 +53,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
}

private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] = plan match {
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.get().isDefined
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized
&& stage.mapStats.isDefined =>
val demoteBroadcastHash = shouldDemoteBroadcastHashJoin(stage.mapStats.get)
val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get)
Expand Down
Expand Up @@ -95,18 +95,21 @@ abstract class QueryStageExec extends LeafExecNode {
/**
* Compute the statistics of the query stage if executed, otherwise None.
*/
def computeStats(): Option[Statistics] = resultOption.get().map { _ =>
def computeStats(): Option[Statistics] = if (isMaterialized) {
val runtimeStats = getRuntimeStatistics
val dataSize = runtimeStats.sizeInBytes.max(0)
val numOutputRows = runtimeStats.rowCount.map(_.max(0))
Statistics(dataSize, numOutputRows, isRuntime = true)
Some(Statistics(dataSize, numOutputRows, isRuntime = true))
} else {
None
}

@transient
@volatile
protected var _resultOption = new AtomicReference[Option[Any]](None)

private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
def isMaterialized: Boolean = resultOption.get().isDefined

override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
Expand Down

0 comments on commit 3b94aad

Please sign in to comment.