Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maryannxue committed Aug 19, 2019
1 parent 8ec528b commit 4c3f551
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 11 deletions.
Expand Up @@ -93,7 +93,8 @@ case class AdaptiveSparkPlanExec(
CollapseCodegenStages(conf)
)

@volatile private var currentPhysicalPlan = initialPlan
@volatile private var currentPhysicalPlan =
applyPhysicalRules(initialPlan, queryStagePreparationRules)

private var isFinalPlan = false

Expand Down Expand Up @@ -122,7 +123,6 @@ case class AdaptiveSparkPlanExec(
if (isFinalPlan) {
currentPhysicalPlan.execute()
} else {
currentPhysicalPlan = applyPhysicalRules(currentPhysicalPlan, queryStagePreparationRules)
// Make sure we only update Spark UI if this plan's `QueryExecution` object matches the one
// retrieved by the `sparkContext`'s current execution ID. Note that sub-queries do not have
// their own execution IDs and therefore rely on the main query to update UI.
Expand Down
Expand Up @@ -92,19 +92,19 @@ case class InsertAdaptiveSparkPlan(
* rule, or reuse the execution plan from another sub-query of the same semantics if possible.
*/
private def buildSubqueryMap(plan: SparkPlan): mutable.HashMap[Long, ExecSubqueryExpression] = {
val subqueryMapBuilder = mutable.HashMap.empty[Long, ExecSubqueryExpression]
val subqueryMap = mutable.HashMap.empty[Long, ExecSubqueryExpression]
plan.foreach(_.expressions.foreach(_.foreach {
case expressions.ScalarSubquery(p, _, exprId)
if !subqueryMapBuilder.contains(exprId.id) =>
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
val scalarSubquery = execution.ScalarSubquery(
SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
subqueryMapBuilder.put(exprId.id, scalarSubquery)
subqueryMap.put(exprId.id, scalarSubquery)
case _ =>
}))

subqueryMapBuilder
subqueryMap
}

def compileSubquery(plan: LogicalPlan): SparkPlan = {
Expand Down
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.ListQuery
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ExecSubqueryExpression, SparkPlan}

case class PlanAdaptiveSubqueries(
subqueryMap: mutable.HashMap[Long, ExecSubqueryExpression]) extends Rule[SparkPlan] {
subqueryMap: scala.collection.Map[Long, ExecSubqueryExpression]) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
Expand Down
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression,
import org.apache.spark.sql.internal.SQLConf

case class ReuseAdaptiveSubquery(
conf: SQLConf,
reuseMap: TrieMap[SparkPlan, BaseSubqueryExec]) extends Rule[SparkPlan] {
conf: SQLConf,
reuseMap: TrieMap[SparkPlan, BaseSubqueryExec]) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.subqueryReuseEnabled) {
Expand Down

0 comments on commit 4c3f551

Please sign in to comment.