Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jan 21, 2018
1 parent 121dc96 commit e790ab9
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.rules

import com.google.common.util.concurrent.AtomicLongMap
import scala.collection.JavaConverters._

case class QueryExecutionMetering() {
private val timeMap = AtomicLongMap.create[String]()
private val numRunsMap = AtomicLongMap.create[String]()
private val numEffectiveRunsMap = AtomicLongMap.create[String]()
private val timeEffectiveRunsMap = AtomicLongMap.create[String]()

/** Resets statistics about time spent running specific rules */
def resetMetrics(): Unit = {
timeMap.clear()
numRunsMap.clear()
numEffectiveRunsMap.clear()
timeEffectiveRunsMap.clear()
}

def totalTime: Long = {
timeMap.sum()
}

def totalNumRuns: Long = {
numRunsMap.sum()
}

def incExecutionTimeBy(ruleName: String, delta: Long): Unit = {
timeMap.addAndGet(ruleName, delta)
}

def incTimeEffectiveExecutionBy(ruleName: String, delta: Long): Unit = {
timeEffectiveRunsMap.addAndGet(ruleName, delta)
}

def incNumEffectiveExecution(ruleName: String): Unit = {
numEffectiveRunsMap.incrementAndGet(ruleName)
}

def incNumExecution(ruleName: String): Unit = {
numRunsMap.incrementAndGet(ruleName)
}

/** Dump statistics about time spent running specific rules. */
def dumpTimeSpent(): String = {
val map = timeMap.asMap().asScala
val maxSize = map.keys.map(_.toString.length).max

val colRuleName = "Rule".padTo(maxSize, " ").mkString
val colRunTime = "Total Time".padTo(22, " ").mkString
val colTimeEffectiveRuns = "Effective Time".padTo(22, " ").mkString
val colNumRuns = "Total Runs".padTo(22, " ").mkString
val colNumEffectiveRuns = "Effective Runs".padTo(22, " ").mkString

val ruleMetrics = map.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
val ruleName = k.padTo(maxSize, " ").mkString
val runtime = v.toString.padTo(len = 22, " ").mkString
val numRuns = numRunsMap.get(k).toString.padTo(len = 22, " ").mkString
val numEffectiveRuns = numEffectiveRunsMap.get(k).toString.padTo(len = 22, " ").mkString
val timeEffectiveRuns = timeEffectiveRunsMap.get(k).toString.padTo(len = 22, " ").mkString
s"$ruleName $runtime $timeEffectiveRuns $numRuns $numEffectiveRuns"
}.mkString("\n", "\n", "")

s"""
|=== Metrics of Analyzer/Optimizer Rules ===
|Total number of runs = $totalNumRuns
|Total time: ${totalTime / 1000000000D} seconds
|
|$colRuleName $colRunTime $colTimeEffectiveRuns $colNumRuns $colNumEffectiveRuns
|$ruleMetrics
""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,23 @@

package org.apache.spark.sql.catalyst.rules

import scala.collection.JavaConverters._

import com.google.common.util.concurrent.AtomicLongMap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.util.Utils

object RuleExecutor {
protected val timeMap = AtomicLongMap.create[String]()

/** Resets statistics about time spent running specific rules */
def resetTime(): Unit = timeMap.clear()
protected val queryExecutionMeter = QueryExecutionMetering()

/** Dump statistics about time spent running specific rules. */
def dumpTimeSpent(): String = {
val map = timeMap.asMap().asScala
val maxSize = map.keys.map(_.toString.length).max
map.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
s"${k.padTo(maxSize, " ").mkString} $v"
}.mkString("\n", "\n", "")
queryExecutionMeter.dumpTimeSpent()
}

/** Resets statistics about time spent running specific rules */
def resetMetrics(): Unit = {
queryExecutionMeter.resetMetrics()
}
}

Expand Down Expand Up @@ -77,6 +71,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

batches.foreach { batch =>
val batchStartPlan = curPlan
Expand All @@ -91,15 +86,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)

if (!result.fastEquals(plan)) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)

// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
Expand Down Expand Up @@ -135,9 +133,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with B

override def beforeAll() {
super.beforeAll()
RuleExecutor.resetTime()
RuleExecutor.resetMetrics()
}

protected def checkGeneratedCode(plan: SparkPlan): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
// Add Locale setting
Locale.setDefault(Locale.US)
RuleExecutor.resetTime()
RuleExecutor.resetMetrics()
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
// (timestamp_*)
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
RuleExecutor.resetTime()
RuleExecutor.resetMetrics()
}

override def afterAll() {
Expand Down

0 comments on commit e790ab9

Please sign in to comment.