From ab3f0b54a889c3ace6080dc472c79c75406499af Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 17 Jun 2016 21:02:10 +0900 Subject: [PATCH 1/2] Rework based on #10116 --- .../spark/sql/execution/QueryExecution.scala | 3 + .../apache/spark/sql/execution/SortExec.scala | 7 +- .../spark/sql/execution/SparkPlan.scala | 12 +++ .../aggregate/HashAggregateExec.scala | 7 +- .../sql/execution/metric/SQLMetrics.scala | 74 ++++++++++++++++++- 5 files changed, 98 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b3ef29f6e34c4..6fcbf48b487d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -83,6 +83,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // only used for execution. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) + // Set the start time for the entire plan. + executedPlan.setStartTimeMs(System.currentTimeMillis()) + /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index cc576bbc4c802..b329c143660f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -57,7 +57,9 @@ case class SortExec( override lazy val metrics = Map( "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), + "blockPhaseFinishTime" -> + SQLMetrics.createBlockingTimeMetric(sparkContext, "blocking phase finish time", startTimeMs)) def createSorter(): UnsafeExternalRowSorter = { val ordering = newOrdering(sortOrder, output) @@ -157,6 +159,7 @@ case class SortExec( val spillSize = metricTerm(ctx, "spillSize") val spillSizeBefore = ctx.freshName("spillSizeBefore") val sortTime = metricTerm(ctx, "sortTime") + val blockPhaseFinishTime = metricTerm(ctx, "blockPhaseFinishTime") s""" | if ($needToSort) { | long $spillSizeBefore = $metrics.memoryBytesSpilled(); @@ -174,6 +177,8 @@ case class SortExec( | ${consume(ctx, null, outputRow)} | if (shouldStop()) return; | } + | + | $blockPhaseFinishTime.add(System.currentTimeMillis()); """.stripMargin.trim } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index cadab37a449aa..5ef632028a30c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -90,6 +90,18 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ def longMetric(name: String): SQLMetric = metrics(name) + /** + * Called in the driver to set the start time for the entire plan. Recursively set for the + * children. The executors use this to have an approximate timestamp for when the query starts. + * This should be used only for metrics as the clock cannot be synchronized across the cluster. + */ + private[sql] var startTimeMs: Long = 0 + + private[sql] def setStartTimeMs(v: Long): Unit = { + startTimeMs = v + children.foreach(_.setStartTimeMs(v)) + } + // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 4529ed067e565..0b101f091d630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -59,7 +59,9 @@ case class HashAggregateExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), - "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")) + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"), + "blockPhaseFinishTime" -> + SQLMetrics.createBlockingTimeMetric(sparkContext, "blocking phase finish time", startTimeMs)) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -665,6 +667,7 @@ case class HashAggregateExec( val aggTime = metricTerm(ctx, "aggTime") + val blockPhaseFinishTime = metricTerm(ctx, "blockPhaseFinishTime") val beforeAgg = ctx.freshName("beforeAgg") s""" if (!$initAgg) { @@ -690,6 +693,8 @@ case class HashAggregateExec( if ($sorterTerm == null) { $hashMapTerm.free(); } + + $blockPhaseFinishTime.add(System.currentTimeMillis()); """ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index dbc27d8b237f3..30f7289182368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -30,8 +30,8 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato // We may use -1 as initial value of the accumulator, if the accumulator is valid, we will // update it at the end of task and the value will be at least 0. Then we can filter out the -1 // values before calculate max, min, etc. - private[this] var _value = initValue - private var _zeroValue = initValue + protected[this] var _value = initValue + protected var _zeroValue = initValue override def copy(): SQLMetric = { val newAcc = new SQLMetric(metricType, _value) @@ -62,11 +62,43 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } } +class SQLDiffMetric(metricType: String, startTime: Long = 0L, initValue: Long = 0L) + extends SQLMetric(metricType, initValue) { + + private[this] val _baseValue = startTime + + override def copy(): SQLMetric = { + val newAcc = new SQLDiffMetric(metricType, baseValue, super.value) + newAcc._zeroValue = initValue + newAcc + } + + override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { + case o: SQLDiffMetric => + if (_baseValue == o.baseValue) { + if (baseValue < o.baseValue) { + _value = o.baseValue + } + } else { + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with different baseValue: " + + s"${_baseValue} != ${o.baseValue}") + } + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: Long = super.value - baseValue + + def baseValue: Long = _baseValue +} object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val BLOCKING_TIME_METRIC = "blocking-time" def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) @@ -96,15 +128,52 @@ object SQLMetrics { acc } + /** + * Create a blocking time metric that reports duration in millis relative to startTime. + * + * The expected usage pattern is: + * + * On the driver: + * metric = createBlockingTimeMetric(..., System.currentTimeMillis) + * + * On each executor: + * < Do some work > + * metric += System.currentTimeMillis + * + * The metric will then output the latest value across all the executors. This is a proxy for + * wall clock latency as it measures when the last executor finished this stage. + */ + def createBlockingTimeMetric(sc: SparkContext, name: String, startTime: Long): SQLMetric = { + val acc = new SQLDiffMetric(BLOCKING_TIME_METRIC, startTime) + acc.register(sc, name = Some(name), countFailedValues = false) + acc + } + /** * A function that defines how we aggregate the final accumulator results among all tasks, * and represent it in string for a SQL physical operator. */ +<<<<<<< b482c09fa22c5762a355f95820e4ba3e2517fb77 def stringValue(metricsType: String, values: Seq[Long]): String = { if (metricsType == SUM_METRIC) { val numberFormat = NumberFormat.getIntegerInstance(Locale.US) numberFormat.format(values.sum) } else { +======= + def stringValue(metricsType: String, values: Seq[Long]): String = metricsType match { + case SUM_METRIC => + NumberFormat.getInstance().format(values.sum) + + case BLOCKING_TIME_METRIC => + val validValues = values.filter(_ >= 0) + if (validValues.nonEmpty) { + Utils.msDurationToString(validValues.max) + } else { + "0" + } + + case _ => +>>>>>>> Rework based on #10116 val strFormat: Long => String = if (metricsType == SIZE_METRIC) { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { @@ -124,6 +193,5 @@ object SQLMetrics { metric.map(strFormat) } s"\n$sum ($min, $med, $max)" - } } } From 10dca0e30711277343d1031bdb949b2ddab827c9 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sun, 19 Jun 2016 14:19:05 +0900 Subject: [PATCH 2/2] Fix bug --- .../spark/sql/execution/QueryExecution.scala | 10 +++-- .../sql/execution/metric/SQLMetrics.scala | 41 ++++++++----------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 6fcbf48b487d6..e5ce436bdc024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -81,10 +81,12 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) - - // Set the start time for the entire plan. - executedPlan.setStartTimeMs(System.currentTimeMillis()) + lazy val executedPlan: SparkPlan = { + // Set the start time for the entire plan. + val plan = prepareForExecution(sparkPlan) + plan.setStartTimeMs(System.currentTimeMillis()) + plan + } /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 30f7289182368..48cfc2f60de67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.metric @@ -153,16 +153,10 @@ object SQLMetrics { * A function that defines how we aggregate the final accumulator results among all tasks, * and represent it in string for a SQL physical operator. */ -<<<<<<< b482c09fa22c5762a355f95820e4ba3e2517fb77 - def stringValue(metricsType: String, values: Seq[Long]): String = { - if (metricsType == SUM_METRIC) { - val numberFormat = NumberFormat.getIntegerInstance(Locale.US) - numberFormat.format(values.sum) - } else { -======= def stringValue(metricsType: String, values: Seq[Long]): String = metricsType match { case SUM_METRIC => - NumberFormat.getInstance().format(values.sum) + val numberFormat = NumberFormat.getIntegerInstance(Locale.US) + numberFormat.format(values.sum) case BLOCKING_TIME_METRIC => val validValues = values.filter(_ >= 0) @@ -173,7 +167,6 @@ object SQLMetrics { } case _ => ->>>>>>> Rework based on #10116 val strFormat: Long => String = if (metricsType == SIZE_METRIC) { Utils.bytesToString } else if (metricsType == TIMING_METRIC) {