From b6a3d02f2c2b0eff71f92c3ede854edc3b5bf9f8 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 20 Nov 2018 17:22:35 +0100 Subject: [PATCH 1/6] [SPARK-26129][SQL] Instrumentation for query planning time --- .../sql/catalyst/QueryPlanningTracker.scala | 118 ++++++++++++++++++ .../sql/catalyst/analysis/Analyzer.scala | 41 +++--- .../sql/catalyst/rules/RuleExecutor.scala | 11 +- .../catalyst/QueryPlanningTrackerSuite.scala | 78 ++++++++++++ .../sql/catalyst/analysis/AnalysisTest.scala | 2 +- .../ResolveGroupingAnalyticsSuite.scala | 2 +- .../ResolvedUuidExpressionsSuite.scala | 8 +- .../spark/sql/execution/QueryExecution.scala | 18 ++- .../QueryPlanningTrackerEndToEndSuite.scala | 37 ++++++ .../apache/spark/sql/hive/test/TestHive.scala | 2 +- 10 files changed, 284 insertions(+), 33 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala new file mode 100644 index 0000000000000..1135f015cb446 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.collection.JavaConverters._ + +import org.apache.spark.util.BoundedPriorityQueue + + +/** + * A simple utility for tracking runtime and associated stats in query planning. + * + * There are two separate concepts we track: + * + * 1. Phases: These are broad scope phases in query planning, as listed below, i.e. analysis, + * optimizationm and physical planning (just planning). + * + * 2. Rules: These are the individual Catalyst rules that we track. In addition to time, we also + * track the number of invocations and effective invocations. + */ +object QueryPlanningTracker { + + // Define a list of common phases here. + // TODO: Implement parsing time. + val PARSING = "parsing" + val ANALYSIS = "analysis" + val OPTIMIZATION = "optimization" + val PLANNING = "planning" + + class RuleSummary( + var totalTimeNs: Long, var numInvocations: Long, var numEffectiveInvocations: Long) { + + def this() = this(totalTimeNs = 0, numInvocations = 0, numEffectiveInvocations = 0) + + def merge(other: RuleSummary): RuleSummary = { + new RuleSummary( + this.totalTimeNs + other.totalTimeNs, + this.numInvocations + other.numInvocations, + this.numEffectiveInvocations + other.numEffectiveInvocations + ) + } + + override def toString: String = { + s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)" + } + } +} + + +class QueryPlanningTracker { + + import QueryPlanningTracker._ + + // Mapping from the name of a rule to a rule's summary. + // Use a Java HashMap for less overhead. + private val rulesMap = new java.util.HashMap[String, RuleSummary] + + // From a phase to time in ns. + private val phaseToTimeNs = new java.util.HashMap[String, Long] + + /** Measure the runtime of function f, and add it to the time for the specified phase. */ + def measureTime[T](phase: String)(f: => T): T = { + val startTime = System.nanoTime() + val ret = f + val timeTaken = System.nanoTime() - startTime + phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken) + ret + } + + /** + * Reecord a specific invocation of a rule. + * + * @param rule name of the rule + * @param timeNs time taken to run this invocation + * @param effective whether the invocation has resulted in a plan change + */ + def recordRuleInvocation(rule: String, timeNs: Long, effective: Boolean): Unit = { + var s = rulesMap.get(rule) + if (s eq null) { + s = new RuleSummary + rulesMap.put(rule, s) + } + + s.totalTimeNs += timeNs + s.numInvocations += 1 + s.numEffectiveInvocations += (if (effective) 1 else 0) + } + + // ------------ reporting functions below ------------ + + def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap + + def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap + + /** Returns the top k most expensive rules (as measured by time). */ + def topRulesByTime(k: Int): Seq[(String, RuleSummary)] = { + val orderingByTime: Ordering[(String, RuleSummary)] = Ordering.by(e => e._2.totalTimeNs) + val q = new BoundedPriorityQueue(k)(orderingByTime) + rulesMap.asScala.foreach(q.+=) + q.toSeq.sortBy(r => -r._2.totalTimeNs) + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ab2312fdcdeef..e313d357b72f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -102,29 +102,34 @@ class Analyzer( this(catalog, conf, conf.optimizerMaxIterations) } - def executeAndCheck(plan: LogicalPlan): LogicalPlan = AnalysisHelper.markInAnalyzer { - val analyzed = execute(plan) - try { - checkAnalysis(analyzed) - analyzed - } catch { - case e: AnalysisException => - val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) - ae.setStackTrace(e.getStackTrace) - throw ae + def executeAndCheck(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { + AnalysisHelper.markInAnalyzer { + val analyzed = execute(plan, tracker) + try { + checkAnalysis(analyzed) + analyzed + } catch { + case e: AnalysisException => + val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed)) + ae.setStackTrace(e.getStackTrace) + throw ae + } } } - override def execute(plan: LogicalPlan): LogicalPlan = { + override def execute(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { AnalysisContext.reset() try { - executeSameContext(plan) + executeSameContext(plan, tracker) } finally { AnalysisContext.reset() } } - private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan) + private def executeSameContext( + plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { + super.execute(plan, tracker) + } def resolver: Resolver = conf.resolver @@ -211,7 +216,7 @@ class Analyzer( case With(child, relations) => substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { case (resolved, (name, relation)) => - resolved :+ name -> executeSameContext(substituteCTE(relation, resolved)) + resolved :+ name -> executeSameContext(substituteCTE(relation, resolved), None) }) case other => other } @@ -696,7 +701,7 @@ class Analyzer( s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + "around this.") } - executeSameContext(child) + executeSameContext(child, None) } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => @@ -1405,7 +1410,7 @@ class Analyzer( do { // Try to resolve the subquery plan using the regular analyzer. previous = current - current = executeSameContext(current) + current = executeSameContext(current, None) // Use the outer references to resolve the subquery plan if it isn't resolved yet. val i = plans.iterator @@ -1527,7 +1532,7 @@ class Analyzer( grouping, Alias(cond, "havingCondition")() :: Nil, child) - val resolvedOperator = executeSameContext(aggregatedCondition) + val resolvedOperator = executeSameContext(aggregatedCondition, None) def resolvedAggregateFilter = resolvedOperator .asInstanceOf[Aggregate] @@ -1588,7 +1593,7 @@ class Analyzer( unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = - executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate] + executeSameContext(aggregatedOrdering, None).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index e991a2dc7462f..d4b5f9304e862 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.rules import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.sideBySide @@ -66,11 +67,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { */ protected def isPlanIntegral(plan: TreeType): Boolean = true + /** A special overload (as opposed to default parameter values) to allow execute in a closure. */ + def execute(plan: TreeType): TreeType = execute(plan, None) + /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ - def execute(plan: TreeType): TreeType = { + def execute(plan: TreeType, tracker: Option[QueryPlanningTracker]): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() @@ -88,8 +92,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime + val effective = !result.fastEquals(plan) - if (!result.fastEquals(plan)) { + if (effective) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) planChangeLogger.log(rule.ruleName, plan, result) @@ -97,6 +102,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) + tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective)) + // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala new file mode 100644 index 0000000000000..f42c262dfbdd8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/QueryPlanningTrackerSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.apache.spark.SparkFunSuite + +class QueryPlanningTrackerSuite extends SparkFunSuite { + + test("phases") { + val t = new QueryPlanningTracker + t.measureTime("p1") { + Thread.sleep(1) + } + + assert(t.phases("p1") > 0) + assert(!t.phases.contains("p2")) + + val old = t.phases("p1") + + t.measureTime("p1") { + Thread.sleep(1) + } + assert(t.phases("p1") > old) + } + + test("rules") { + val t = new QueryPlanningTracker + t.recordRuleInvocation("r1", 1, effective = false) + t.recordRuleInvocation("r2", 2, effective = true) + t.recordRuleInvocation("r3", 1, effective = false) + t.recordRuleInvocation("r3", 2, effective = true) + + val rules = t.rules + + assert(rules("r1").totalTimeNs == 1) + assert(rules("r1").numInvocations == 1) + assert(rules("r1").numEffectiveInvocations == 0) + + assert(rules("r2").totalTimeNs == 2) + assert(rules("r2").numInvocations == 1) + assert(rules("r2").numEffectiveInvocations == 1) + + assert(rules("r3").totalTimeNs == 3) + assert(rules("r3").numInvocations == 2) + assert(rules("r3").numEffectiveInvocations == 1) + } + + test("topRulesByTime") { + val t = new QueryPlanningTracker + t.recordRuleInvocation("r2", 2, effective = true) + t.recordRuleInvocation("r4", 4, effective = true) + t.recordRuleInvocation("r1", 1, effective = false) + t.recordRuleInvocation("r3", 3, effective = false) + + val top = t.topRulesByTime(2) + assert(top.size == 2) + assert(top(0)._1 == "r4") + assert(top(1)._1 == "r3") + + // Don't crash when k > total size + assert(t.topRulesByTime(10).size == 4) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 3d7c91870133b..ded571c6195cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -54,7 +54,7 @@ trait AnalysisTest extends PlanTest { expectedPlan: LogicalPlan, caseSensitive: Boolean = true): Unit = { val analyzer = getAnalyzer(caseSensitive) - val actualPlan = analyzer.executeAndCheck(inputPlan) + val actualPlan = analyzer.executeAndCheck(inputPlan, None) comparePlans(actualPlan, expectedPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala index 8da4d7e3aa372..f827b50e229a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala @@ -109,7 +109,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest { Seq(UnresolvedAlias(Multiply(unresolved_a, Literal(2))), unresolved_b, UnresolvedAlias(count(unresolved_c)))) - val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2) + val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2, None) val gExpressions = resultPlan.asInstanceOf[Aggregate].groupingExpressions assert(gExpressions.size == 3) val firstGroupingExprAttrName = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala index fe57c199b8744..9d3451153b80a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala @@ -47,7 +47,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest { test("analyzed plan sets random seed for Uuid expression") { val plan = r.select(a, uuid1) - val resolvedPlan = analyzer.executeAndCheck(plan) + val resolvedPlan = analyzer.executeAndCheck(plan, None) getUuidExpressions(resolvedPlan).foreach { u => assert(u.resolved) assert(u.randomSeed.isDefined) @@ -56,14 +56,14 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest { test("Uuid expressions should have different random seeds") { val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3) - val resolvedPlan = analyzer.executeAndCheck(plan) + val resolvedPlan = analyzer.executeAndCheck(plan, None) assert(getUuidExpressions(resolvedPlan).map(_.randomSeed.get).distinct.length == 3) } test("Different analyzed plans should have different random seeds in Uuids") { val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3) - val resolvedPlan1 = analyzer.executeAndCheck(plan) - val resolvedPlan2 = analyzer.executeAndCheck(plan) + val resolvedPlan1 = analyzer.executeAndCheck(plan, None) + val resolvedPlan2 = analyzer.executeAndCheck(plan, None) val uuids1 = getUuidExpressions(resolvedPlan1) val uuids2 = getUuidExpressions(resolvedPlan2) assert(uuids1.distinct.length == 3) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 905d035b64275..3e69997893211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -56,9 +56,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } } - lazy val analyzed: LogicalPlan = { + val tracker: QueryPlanningTracker = new QueryPlanningTracker + + lazy val analyzed: LogicalPlan = tracker.measureTime(QueryPlanningTracker.ANALYSIS) { SparkSession.setActiveSession(sparkSession) - sparkSession.sessionState.analyzer.executeAndCheck(logical) + sparkSession.sessionState.analyzer.executeAndCheck(logical, Option(tracker)) } lazy val withCachedData: LogicalPlan = { @@ -67,9 +69,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { sparkSession.sharedState.cacheManager.useCachedData(analyzed) } - lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) + lazy val optimizedPlan: LogicalPlan = tracker.measureTime(QueryPlanningTracker.OPTIMIZATION) { + sparkSession.sessionState.optimizer.execute(withCachedData, Option(tracker)) + } - lazy val sparkPlan: SparkPlan = { + lazy val sparkPlan: SparkPlan = tracker.measureTime(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. @@ -78,7 +82,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) + lazy val executedPlan: SparkPlan = tracker.measureTime(QueryPlanningTracker.PLANNING) { + prepareForExecution(sparkPlan) + } /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala new file mode 100644 index 0000000000000..1578a831da86c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext { + + test("basic measurement") { + val df = spark.range(1000).selectExpr("count(*)") + df.collect() + val tracker = df.queryExecution.tracker + + assert(tracker.phases.size == 3) + assert(tracker.phases("analysis") > 0) + assert(tracker.phases("optimization") > 0) + assert(tracker.phases("planning") > 0) + + assert(tracker.rules.nonEmpty) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 634b3db19ec27..a034c3c8db6f3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -586,7 +586,7 @@ private[hive] class TestHiveQueryExecution( logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(sparkSession.loadTestTable) // Proceed with analysis. - sparkSession.sessionState.analyzer.executeAndCheck(logical) + sparkSession.sessionState.analyzer.executeAndCheck(logical, None) } } From b2a2a01e0607fe79e8ff6fa039555f8f1ccb3248 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 20 Nov 2018 17:43:55 +0100 Subject: [PATCH 2/6] Add parsing --- .../sql/catalyst/QueryPlanningTracker.scala | 1 - .../scala/org/apache/spark/sql/Dataset.scala | 9 +++++++++ .../org/apache/spark/sql/SparkSession.scala | 6 +++++- .../spark/sql/execution/QueryExecution.scala | 7 ++++--- .../QueryPlanningTrackerEndToEndSuite.scala | 17 ++++++++++++++++- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 1135f015cb446..d24c2f2b29e17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -36,7 +36,6 @@ import org.apache.spark.util.BoundedPriorityQueue object QueryPlanningTracker { // Define a list of common phases here. - // TODO: Implement parsing time. val PARSING = "parsing" val ANALYSIS = "analysis" val OPTIMIZATION = "optimization" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0e77ec0406257..e757921b485df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -32,6 +32,7 @@ import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ @@ -76,6 +77,14 @@ private[sql] object Dataset { qe.assertAnalyzed() new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } + + /** A variant of ofRows that allows passing in a tracker so we can track query parsing time. */ + def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker) + : DataFrame = { + val qe = new QueryExecution(sparkSession, logicalPlan, tracker) + qe.assertAnalyzed() + new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 725db97df4ed1..739c6b54b4cb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -648,7 +648,11 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) + val tracker = new QueryPlanningTracker + val plan = tracker.measureTime(QueryPlanningTracker.PARSING) { + sessionState.sqlParser.parsePlan(sqlText) + } + Dataset.ofRows(self, plan, tracker) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 3e69997893211..240f1c9d8a76a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -43,7 +43,10 @@ import org.apache.spark.util.Utils * While this is not a public class, we should avoid changing the function names for the sake of * changing them, because a lot of developers use the feature for debugging. */ -class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { +class QueryExecution( + val sparkSession: SparkSession, + val logical: LogicalPlan, + val tracker: QueryPlanningTracker = new QueryPlanningTracker) { // TODO: Move the planner an optimizer into here from SessionState. protected def planner = sparkSession.sessionState.planner @@ -56,8 +59,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } } - val tracker: QueryPlanningTracker = new QueryPlanningTracker - lazy val analyzed: LogicalPlan = tracker.measureTime(QueryPlanningTracker.ANALYSIS) { SparkSession.setActiveSession(sparkSession) sparkSession.sessionState.analyzer.executeAndCheck(logical, Option(tracker)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala index 1578a831da86c..0af4c85400e9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.test.SharedSQLContext class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext { - test("basic measurement") { + test("programmatic API") { val df = spark.range(1000).selectExpr("count(*)") df.collect() val tracker = df.queryExecution.tracker @@ -34,4 +34,19 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSQLContext { assert(tracker.rules.nonEmpty) } + test("sql") { + val df = spark.sql("select * from range(1)") + df.collect() + + val tracker = df.queryExecution.tracker + + assert(tracker.phases.size == 4) + assert(tracker.phases("parsing") > 0) + assert(tracker.phases("analysis") > 0) + assert(tracker.phases("optimization") > 0) + assert(tracker.phases("planning") > 0) + + assert(tracker.rules.nonEmpty) + } + } From dd61273024bf322bfe03f3cd5e06803eaae62d8f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 20 Nov 2018 17:45:01 +0100 Subject: [PATCH 3/6] remove merge --- .../apache/spark/sql/catalyst/QueryPlanningTracker.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index d24c2f2b29e17..c4e7e8e122413 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -46,14 +46,6 @@ object QueryPlanningTracker { def this() = this(totalTimeNs = 0, numInvocations = 0, numEffectiveInvocations = 0) - def merge(other: RuleSummary): RuleSummary = { - new RuleSummary( - this.totalTimeNs + other.totalTimeNs, - this.numInvocations + other.numInvocations, - this.numEffectiveInvocations + other.numEffectiveInvocations - ) - } - override def toString: String = { s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)" } From f36a23170f5cc93724908e42041dad4672564353 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 20 Nov 2018 22:29:02 +0100 Subject: [PATCH 4/6] thread local --- .../sql/catalyst/QueryPlanningTracker.scala | 20 +++++++++++++++- .../sql/catalyst/analysis/Analyzer.scala | 23 +++++++++---------- .../sql/catalyst/rules/RuleExecutor.scala | 19 +++++++++++---- .../sql/catalyst/analysis/AnalysisTest.scala | 3 ++- .../ResolveGroupingAnalyticsSuite.scala | 3 ++- .../ResolvedUuidExpressionsSuite.scala | 10 ++++---- .../spark/sql/execution/QueryExecution.scala | 4 ++-- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index c4e7e8e122413..1f016ddf35e49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -50,6 +50,24 @@ object QueryPlanningTracker { s"RuleSummary($totalTimeNs, $numInvocations, $numEffectiveInvocations)" } } + + /** + * A thread local variable to implicitly pass the tracker around. This assumes the query planner + * is single-threaded, and avoids passing the same tracker context in every function call. + */ + private val localTracker = new ThreadLocal[QueryPlanningTracker]() { + override def initialValue: QueryPlanningTracker = null + } + + /** Returns the current tracker in scope, based on the thread local variable. */ + def get: QueryPlanningTracker = localTracker.get() + + /** Sets the current tracker for the execution of function f. We assume f is single-threaded. */ + def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = { + val originalTracker = localTracker.get() + localTracker.set(tracker) + try f finally { localTracker.set(originalTracker) } + } } @@ -74,7 +92,7 @@ class QueryPlanningTracker { } /** - * Reecord a specific invocation of a rule. + * Record a specific invocation of a rule. * * @param rule name of the rule * @param timeNs time taken to run this invocation diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e313d357b72f8..05a3535fe7f1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -102,9 +102,9 @@ class Analyzer( this(catalog, conf, conf.optimizerMaxIterations) } - def executeAndCheck(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { + def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { - val analyzed = execute(plan, tracker) + val analyzed = executeAndTrack(plan, tracker) try { checkAnalysis(analyzed) analyzed @@ -117,18 +117,17 @@ class Analyzer( } } - override def execute(plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { + override def execute(plan: LogicalPlan): LogicalPlan = { AnalysisContext.reset() try { - executeSameContext(plan, tracker) + executeSameContext(plan) } finally { AnalysisContext.reset() } } - private def executeSameContext( - plan: LogicalPlan, tracker: Option[QueryPlanningTracker]): LogicalPlan = { - super.execute(plan, tracker) + private def executeSameContext(plan: LogicalPlan): LogicalPlan = { + super.execute(plan) } def resolver: Resolver = conf.resolver @@ -216,7 +215,7 @@ class Analyzer( case With(child, relations) => substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { case (resolved, (name, relation)) => - resolved :+ name -> executeSameContext(substituteCTE(relation, resolved), None) + resolved :+ name -> executeSameContext(substituteCTE(relation, resolved)) }) case other => other } @@ -701,7 +700,7 @@ class Analyzer( s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + "around this.") } - executeSameContext(child, None) + executeSameContext(child) } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => @@ -1410,7 +1409,7 @@ class Analyzer( do { // Try to resolve the subquery plan using the regular analyzer. previous = current - current = executeSameContext(current, None) + current = executeSameContext(current) // Use the outer references to resolve the subquery plan if it isn't resolved yet. val i = plans.iterator @@ -1532,7 +1531,7 @@ class Analyzer( grouping, Alias(cond, "havingCondition")() :: Nil, child) - val resolvedOperator = executeSameContext(aggregatedCondition, None) + val resolvedOperator = executeSameContext(aggregatedCondition) def resolvedAggregateFilter = resolvedOperator .asInstanceOf[Aggregate] @@ -1593,7 +1592,7 @@ class Analyzer( unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")()) val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering) val resolvedAggregate: Aggregate = - executeSameContext(aggregatedOrdering, None).asInstanceOf[Aggregate] + executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate] val resolvedAliasedOrdering: Seq[Alias] = resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d4b5f9304e862..54cd00fbb4ebe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -67,17 +67,26 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { */ protected def isPlanIntegral(plan: TreeType): Boolean = true - /** A special overload (as opposed to default parameter values) to allow execute in a closure. */ - def execute(plan: TreeType): TreeType = execute(plan, None) + /** + * Executes the batches of rules defined by the subclass, and also tracks timing info for each + * rule using the provided tracker. + * @see [[execute]] + */ + def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = { + QueryPlanningTracker.withTracker(tracker) { + execute(plan) + } + } /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ - def execute(plan: TreeType, tracker: Option[QueryPlanningTracker]): TreeType = { + def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() + val tracker = QueryPlanningTracker.get batches.foreach { batch => val batchStartPlan = curPlan @@ -102,7 +111,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) - tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective)) + if (tracker ne null) { + tracker.recordRuleInvocation(rule.ruleName, runTime, effective) + } // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ded571c6195cc..fab1b776a3c72 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.Locale import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ @@ -54,7 +55,7 @@ trait AnalysisTest extends PlanTest { expectedPlan: LogicalPlan, caseSensitive: Boolean = true): Unit = { val analyzer = getAnalyzer(caseSensitive) - val actualPlan = analyzer.executeAndCheck(inputPlan, None) + val actualPlan = analyzer.executeAndCheck(inputPlan, new QueryPlanningTracker) comparePlans(actualPlan, expectedPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala index f827b50e229a1..aa5eda8e5ba87 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupingAnalyticsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.TimeZone +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -109,7 +110,7 @@ class ResolveGroupingAnalyticsSuite extends AnalysisTest { Seq(UnresolvedAlias(Multiply(unresolved_a, Literal(2))), unresolved_b, UnresolvedAlias(count(unresolved_c)))) - val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2, None) + val resultPlan = getAnalyzer(true).executeAndCheck(originalPlan2, new QueryPlanningTracker) val gExpressions = resultPlan.asInstanceOf[Aggregate].groupingExpressions assert(gExpressions.size == 3) val firstGroupingExprAttrName = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala index 9d3451153b80a..64bd07534b19b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -34,6 +35,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest { private lazy val uuid3 = Uuid().as('_uuid3) private lazy val uuid1Ref = uuid1.toAttribute + private val tracker = new QueryPlanningTracker private val analyzer = getAnalyzer(caseSensitive = true) private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = { @@ -47,7 +49,7 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest { test("analyzed plan sets random seed for Uuid expression") { val plan = r.select(a, uuid1) - val resolvedPlan = analyzer.executeAndCheck(plan, None) + val resolvedPlan = analyzer.executeAndCheck(plan, tracker) getUuidExpressions(resolvedPlan).foreach { u => assert(u.resolved) assert(u.randomSeed.isDefined) @@ -56,14 +58,14 @@ class ResolvedUuidExpressionsSuite extends AnalysisTest { test("Uuid expressions should have different random seeds") { val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3) - val resolvedPlan = analyzer.executeAndCheck(plan, None) + val resolvedPlan = analyzer.executeAndCheck(plan, tracker) assert(getUuidExpressions(resolvedPlan).map(_.randomSeed.get).distinct.length == 3) } test("Different analyzed plans should have different random seeds in Uuids") { val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3) - val resolvedPlan1 = analyzer.executeAndCheck(plan, None) - val resolvedPlan2 = analyzer.executeAndCheck(plan, None) + val resolvedPlan1 = analyzer.executeAndCheck(plan, tracker) + val resolvedPlan2 = analyzer.executeAndCheck(plan, tracker) val uuids1 = getUuidExpressions(resolvedPlan1) val uuids2 = getUuidExpressions(resolvedPlan2) assert(uuids1.distinct.length == 3) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 240f1c9d8a76a..87a4ceb91aae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -61,7 +61,7 @@ class QueryExecution( lazy val analyzed: LogicalPlan = tracker.measureTime(QueryPlanningTracker.ANALYSIS) { SparkSession.setActiveSession(sparkSession) - sparkSession.sessionState.analyzer.executeAndCheck(logical, Option(tracker)) + sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } lazy val withCachedData: LogicalPlan = { @@ -71,7 +71,7 @@ class QueryExecution( } lazy val optimizedPlan: LogicalPlan = tracker.measureTime(QueryPlanningTracker.OPTIMIZATION) { - sparkSession.sessionState.optimizer.execute(withCachedData, Option(tracker)) + sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker) } lazy val sparkPlan: SparkPlan = tracker.measureTime(QueryPlanningTracker.PLANNING) { From 2cd069c21808420c138ef8444dcb407454a447fd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 20 Nov 2018 22:31:42 +0100 Subject: [PATCH 5/6] fix compile --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +--- .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 05a3535fe7f1a..b977fa07db5c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -126,9 +126,7 @@ class Analyzer( } } - private def executeSameContext(plan: LogicalPlan): LogicalPlan = { - super.execute(plan) - } + private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan) def resolver: Resolver = conf.resolver diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a034c3c8db6f3..7aac2b8828094 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation @@ -586,7 +587,7 @@ private[hive] class TestHiveQueryExecution( logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(sparkSession.loadTestTable) // Proceed with analysis. - sparkSession.sessionState.analyzer.executeAndCheck(logical, None) + sparkSession.sessionState.analyzer.executeAndCheck(logical, new QueryPlanningTracker) } } From 34f8bfe69b70ff702324ec7f38d78ae920410ef7 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 21 Nov 2018 12:30:45 +0100 Subject: [PATCH 6/6] fix Hive test cases --- .../sql/catalyst/QueryPlanningTracker.scala | 2 +- .../spark/sql/catalyst/rules/RuleExecutor.scala | 7 +++---- .../apache/spark/sql/hive/test/TestHive.scala | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 1f016ddf35e49..420f2a1f20997 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -60,7 +60,7 @@ object QueryPlanningTracker { } /** Returns the current tracker in scope, based on the thread local variable. */ - def get: QueryPlanningTracker = localTracker.get() + def get: Option[QueryPlanningTracker] = Option(localTracker.get()) /** Sets the current tracker for the execution of function f. We assume f is single-threaded. */ def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 54cd00fbb4ebe..cf6ff4f986399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -86,7 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() - val tracker = QueryPlanningTracker.get + val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get batches.foreach { batch => val batchStartPlan = curPlan @@ -111,9 +111,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) - if (tracker ne null) { - tracker.recordRuleInvocation(rule.ruleName, runTime, effective) - } + // Record timing information using QueryPlanningTracker + tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective)) // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7aac2b8828094..3508affda241a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -33,10 +33,9 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.QueryPlanningTracker +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} @@ -220,6 +219,16 @@ private[hive] class TestHiveSparkSession( sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.newSession() } + /** + * This is a temporary hack to override SparkSession.sql so we can still use the version of + * Dataset.ofRows that creates a TestHiveQueryExecution (rather than a normal QueryExecution + * which wouldn't load all the test tables). + */ + override def sql(sqlText: String): DataFrame = { + val plan = sessionState.sqlParser.parsePlan(sqlText) + Dataset.ofRows(self, plan) + } + override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables) } @@ -587,7 +596,7 @@ private[hive] class TestHiveQueryExecution( logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(sparkSession.loadTestTable) // Proceed with analysis. - sparkSession.sessionState.analyzer.executeAndCheck(logical, new QueryPlanningTracker) + sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } }