Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maryannxue committed Jun 5, 2019
1 parent e2fa8e3 commit 55450e9
Showing 1 changed file with 62 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ case class AdaptiveSparkPlanExec(

@volatile private var isFinalPlan = false

@volatile private var fallback = false

/**
* Return type for `createQueryStages`
* @param newPlan the new plan with created query stages.
Expand All @@ -117,12 +119,14 @@ case class AdaptiveSparkPlanExec(
} else {
lock.synchronized {
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val (r, p) = createQueryStagesOrFallback(currentPhysicalPlan, currentLogicalPlan)
var result = r
var logicalPlan = p
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[SparkException]()
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, result.newStages)
currentLogicalPlan = logicalPlan
currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, currentLogicalPlan)
onUpdatePlan()

Expand Down Expand Up @@ -167,13 +171,17 @@ case class AdaptiveSparkPlanExec(
}

// Do re-planning and try creating new stages on the new physical plan.
reOptimize(currentLogicalPlan)
result = createQueryStages(currentPhysicalPlan)
val (newPhysicalPlan, newLogicalPlan) = reOptimize(currentLogicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
val (r, p) = createQueryStagesOrFallback(currentPhysicalPlan, currentLogicalPlan)
result = r
logicalPlan = p
}

// Run the final plan when there's no more unfinished stages.
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, currentLogicalPlan)
currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, logicalPlan)
logDebug(s"Final plan: $currentPhysicalPlan")
onUpdatePlan()
isFinalPlan = true
Expand All @@ -193,6 +201,30 @@ case class AdaptiveSparkPlanExec(
depth, lastChildren, append, verbose, "", addSuffix = false, maxFields)
}

/**
* Try creating new query stages and updating the logical plan accordingly. Return the
* `CreateStageResult` along with the updated logical plan if successful; otherwise, turn on
* the "fallback" mode, which means no new stages will be created and we just wait for all the
* existing stages to complete and execute the rest of the plan.
*/
private def createQueryStagesOrFallback(
physicalPlan: SparkPlan,
logicalPlan: LogicalPlan): (CreateStageResult, LogicalPlan) = {
var result = createQueryStages(physicalPlan)
var newLogicalPlan = logicalPlan
try {
newLogicalPlan = updateLogicalPlan(logicalPlan, result.newPlan, result.newStages)
} catch {
case e: AmbiguousLogicalMappingException =>
logWarning("Fall back to non-adaptive mode for the rest of the plan due to ambiguous " +
s"logical plan mapping for node ${e.plan}.")
fallback = true
result = createQueryStages(physicalPlan)
assert(result.newStages.isEmpty, "Fallback mode should not create new stages.")
}
(result, newLogicalPlan)
}

/**
* This method is called recursively to traverse the plan tree bottom-up and create a new query
* stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of
Expand All @@ -204,7 +236,7 @@ case class AdaptiveSparkPlanExec(
* 3) A list of the new query stages that have been created.
*/
private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
case e: Exchange =>
case e: Exchange if !fallback =>
// First have a quick check in the `stageCache` without having to traverse down the node.
stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
Expand Down Expand Up @@ -305,6 +337,7 @@ case class AdaptiveSparkPlanExec(
*/
private def updateLogicalPlan(
logicalPlan: LogicalPlan,
physicalPlan: SparkPlan,
newStages: Seq[(Exchange, QueryStageExec)]): LogicalPlan = {
var currentLogicalPlan = logicalPlan
newStages.foreach { case (exhange: Exchange, stage: QueryStageExec) =>
Expand All @@ -316,27 +349,25 @@ case class AdaptiveSparkPlanExec(
})
assert(logicalNodeOpt.isDefined)
val logicalNode = logicalNodeOpt.get
val physicalNode = currentPhysicalPlan.collectFirst {
val physicalNode = physicalPlan.collectFirst {
case p if p.eq(stage) || p.logicalLink.exists(logicalNode.eq) => p
}
assert(physicalNode.isDefined)
// Replace the corresponding logical node with LogicalQueryStage
val newLogicalNode = LogicalQueryStage(logicalNode, physicalNode.get)
// The logical plan may contain multiple identical subtree instances, because, e.g., rules
// like `PushDownPredicate` can push the same logical plan subtree instance into different
// branches of a Union. This hack is based on the fact that one physical stage should
// correspond to exactly one logical node and the Seq `newStages` is in the same order as
// that of the tree traversal by `transformDown`.
var transformed = false
var cnt = 0
val newLogicalPlan = currentLogicalPlan.transformDown {
case p if !transformed && p.eq(logicalNode) =>
transformed = true
case p if p.eq(logicalNode) =>
cnt += 1
newLogicalNode
}
assert(newLogicalPlan != currentLogicalPlan,
if (cnt > 1) {
throw AmbiguousLogicalMappingException(logicalNode)
}
assert(cnt == 1,
s"logicalNode: $logicalNode; " +
s"logicalPlan: $currentLogicalPlan " +
s"physicalPlan: $currentPhysicalPlan" +
s"physicalPlan: $physicalPlan" +
s"stage: $stage")
currentLogicalPlan = newLogicalPlan
}
Expand All @@ -346,24 +377,27 @@ case class AdaptiveSparkPlanExec(
/**
* Re-optimize and run physical planning on the current logical plan based on the latest stats.
*/
private def reOptimize(logicalPlan: LogicalPlan): Unit = {
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
logicalPlan.invalidateStatsCache()
val optimized = optimizer.execute(logicalPlan)
SparkSession.setActiveSession(session)
val sparkPlan = session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
val newPlan = applyPhysicalRules(sparkPlan, queryStagePreparationRules)
currentPhysicalPlan = newPlan
(newPlan, optimized)
}

/**
* Notify the listeners of the physical plan change.
*/
private def onUpdatePlan(): Unit = {
executionId.foreach { id =>
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
SQLExecution.getQueryExecution(id).toString,
SparkPlanInfo.fromSparkPlan(currentPhysicalPlan)))
val exec = SQLExecution.getQueryExecution(id)
if (exec != null) {
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
exec.toString,
SparkPlanInfo.fromSparkPlan(currentPhysicalPlan)))
}
}
}
}
Expand Down Expand Up @@ -405,3 +439,8 @@ case class StageSuccess(stage: QueryStageExec, result: Any) extends StageMateria
* The materialization of a query stage hit an error and failed.
*/
case class StageFailure(stage: QueryStageExec, error: Throwable) extends StageMaterializationEvent

/**
* Exception indicating that a stage maps to multiple sub-trees in the logical plan.
*/
private case class AmbiguousLogicalMappingException(plan: LogicalPlan) extends Exception {}

0 comments on commit 55450e9

Please sign in to comment.