From c0a4ad4d8619e9606ade6b55d7aa4872e77d4c5e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 28 Sep 2015 15:40:42 -0700 Subject: [PATCH 1/3] report memory usage for tungsten sql operator --- .../execution/aggregate/TungstenAggregate.scala | 9 +++++++-- .../aggregate/TungstenAggregationIterator.scala | 4 +++- .../spark/sql/execution/metric/SQLMetrics.scala | 15 +++++++++------ .../org/apache/spark/sql/execution/sort.scala | 8 ++++++++ .../spark/sql/execution/ui/SQLListener.scala | 3 ++- .../spark/sql/execution/ui/SparkPlanGraph.scala | 6 ++++-- .../TungstenAggregationIteratorSuite.scala | 2 +- 7 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 7b3d072b2e067..cf420c4430353 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.util.Utils.bytesToString import org.apache.spark.TaskContext import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow @@ -40,7 +41,9 @@ case class TungstenAggregate( override private[sql] lazy val metrics = Map( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "numBytesUsed" -> + SQLMetrics.createLongMetric(sparkContext, "number of bytes used", bytesToString)) override def outputsUnsafeRows: Boolean = true @@ -70,6 +73,7 @@ case class TungstenAggregate( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") + val numBytesUsed = longMetric("numBytesUsed") /** * Set up the underlying unsafe data structures used before computing the parent partition. @@ -87,7 +91,8 @@ case class TungstenAggregate( child.output, testFallbackStartsAt, numInputRows, - numOutputRows) + numOutputRows, + numBytesUsed) } /** Compute a partition using the iterator already set up previously. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 4bb95c9eb7f3e..181913eea460e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -84,7 +84,8 @@ class TungstenAggregationIterator( originalInputAttributes: Seq[Attribute], testFallbackStartsAt: Option[Int], numInputRows: LongSQLMetric, - numOutputRows: LongSQLMetric) + numOutputRows: LongSQLMetric, + numBytesUsed: LongSQLMetric) extends Iterator[UnsafeRow] with Logging { // The parent partition iterator, to be initialized later in `start` @@ -694,6 +695,7 @@ class TungstenAggregationIterator( val mapMemory = hashMap.getPeakMemoryUsedBytes val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) val peakMemory = Math.max(mapMemory, sorterMemory) + numBytesUsed += peakMemory TaskContext.get().internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 7a2a98ec18cb8..d917ac32de22e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Accumulable, AccumulableParam, SparkContext} * An implementation of SQLMetric should override `+=` and `add` to avoid boxing. */ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( - name: String, val param: SQLMetricParam[R, T]) + name: String, val param: SQLMetricParam[R, T], val stringValue: T => String) extends Accumulable[R, T](param.zero, param, Some(name), true) /** @@ -81,8 +81,8 @@ private[sql] class IntSQLMetricValue(private var _value: Int) extends SQLMetricV * A specialized long Accumulable to avoid boxing and unboxing when using Accumulator's * `+=` and `add`. */ -private[sql] class LongSQLMetric private[metric](name: String) - extends SQLMetric[LongSQLMetricValue, Long](name, LongSQLMetricParam) { +private[sql] class LongSQLMetric private[metric](name: String, stringValue: Long => String) + extends SQLMetric[LongSQLMetricValue, Long](name, LongSQLMetricParam, stringValue) { override def +=(term: Long): Unit = { localValue.add(term) @@ -107,8 +107,11 @@ private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Lon private[sql] object SQLMetrics { - def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = { - val acc = new LongSQLMetric(name) + def createLongMetric( + sc: SparkContext, + name: String, + stringValue: Long => String = _.toString): LongSQLMetric = { + val acc = new LongSQLMetric(name, stringValue) sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } @@ -117,5 +120,5 @@ private[sql] object SQLMetrics { * A metric that its value will be ignored. Use this one when we need a metric parameter but don't * care about the value. */ - val nullLongMetric = new LongSQLMetric("null") + val nullLongMetric = new LongSQLMetric("null", _.toString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 27f26245a5ef0..8cadac4f6c824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution +import org.apache.spark.util.Utils.bytesToString import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.StructType import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter @@ -93,9 +95,14 @@ case class TungstenSort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil + override private[sql] lazy val metrics = Map( + "numBytesUsed" -> + SQLMetrics.createLongMetric(sparkContext, "number of bytes used", bytesToString)) + protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema val childOutput = child.output + val numBytesUsed = longMetric("numBytesUsed") /** * Set up the sorter in each partition before computing the parent partition. @@ -132,6 +139,7 @@ case class TungstenSort( sorter: UnsafeExternalRowSorter, parentIterator: Iterator[InternalRow]): Iterator[InternalRow] = { val sortedIterator = sorter.sort(parentIterator.asInstanceOf[Iterator[UnsafeRow]]) + numBytesUsed += sorter.getPeakMemoryUsage taskContext.internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage) sortedIterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index d6472400a6a21..4a24d2070e13e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -330,7 +330,8 @@ private[ui] class SQLExecutionUIData( private[ui] case class SQLPlanMetric( name: String, accumulatorId: Long, - metricParam: SQLMetricParam[SQLMetricValue[Any], Any]) + metricParam: SQLMetricParam[SQLMetricValue[Any], Any], + stringValue: Any => String) /** * Store all accumulatorUpdates for all tasks in a Spark stage. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index ae3d752dde348..35670f0609154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable +import org.apache.spark.util.Utils import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} @@ -63,7 +64,8 @@ private[sql] object SparkPlanGraph { edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { val metrics = plan.metrics.toSeq.map { case (key, metric) => SQLPlanMetric(metric.name.getOrElse(key), metric.id, - metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]) + metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]], + metric.stringValue.asInstanceOf[Any => String]) } val node = SparkPlanGraphNode( nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics) @@ -91,7 +93,7 @@ private[ui] case class SparkPlanGraphNode( val values = { for (metric <- metrics; value <- metricsValue.get(metric.accumulatorId)) yield { - metric.name + ": " + value + metric.name + ": " + metric.stringValue(value) } } val label = if (values.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala index ed974b3a53d41..51d0dd1620a5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala @@ -39,7 +39,7 @@ class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLConte } val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy") iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty, - Seq.empty, newMutableProjection, Seq.empty, None, dummyAccum, dummyAccum) + Seq.empty, newMutableProjection, Seq.empty, None, dummyAccum, dummyAccum, dummyAccum) val numPages = iter.getHashMap.getNumDataPages assert(numPages === 1) } finally { From fdae1827564a0535f22a19b432442d66e56f12a6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 29 Sep 2015 12:01:50 -0700 Subject: [PATCH 2/3] enhance metric info --- .../aggregate/TungstenAggregate.scala | 11 +-- .../TungstenAggregationIterator.scala | 10 ++- .../sql/execution/metric/SQLMetrics.scala | 68 +++++++++++++------ .../org/apache/spark/sql/execution/sort.scala | 18 +++-- .../sql/execution/ui/ExecutionPage.scala | 2 +- .../spark/sql/execution/ui/SQLListener.scala | 12 ++-- .../sql/execution/ui/SparkPlanGraph.scala | 10 ++- .../TungstenAggregationIteratorSuite.scala | 3 +- .../execution/metric/SQLMetricsSuite.scala | 13 +++- .../sql/execution/ui/SQLListenerSuite.scala | 20 +++--- 10 files changed, 108 insertions(+), 59 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index cf420c4430353..4e8fea47e1e53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.aggregate -import org.apache.spark.util.Utils.bytesToString import org.apache.spark.TaskContext import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow @@ -42,8 +41,8 @@ case class TungstenAggregate( override private[sql] lazy val metrics = Map( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), - "numBytesUsed" -> - SQLMetrics.createLongMetric(sparkContext, "number of bytes used", bytesToString)) + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "spilledSize" -> SQLMetrics.createSizeMetric(sparkContext, "spilled size")) override def outputsUnsafeRows: Boolean = true @@ -73,7 +72,8 @@ case class TungstenAggregate( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") - val numBytesUsed = longMetric("numBytesUsed") + val dataSize = longMetric("dataSize") + val spilledSize = longMetric("spilledSize") /** * Set up the underlying unsafe data structures used before computing the parent partition. @@ -92,7 +92,8 @@ case class TungstenAggregate( testFallbackStartsAt, numInputRows, numOutputRows, - numBytesUsed) + dataSize, + spilledSize) } /** Compute a partition using the iterator already set up previously. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 181913eea460e..8a1db1eddf6b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -85,7 +85,8 @@ class TungstenAggregationIterator( testFallbackStartsAt: Option[Int], numInputRows: LongSQLMetric, numOutputRows: LongSQLMetric, - numBytesUsed: LongSQLMetric) + dataSize: LongSQLMetric, + spilledSize: LongSQLMetric) extends Iterator[UnsafeRow] with Logging { // The parent partition iterator, to be initialized later in `start` @@ -108,6 +109,10 @@ class TungstenAggregationIterator( s"$allAggregateExpressions should have no more than 2 kinds of modes.") } + // Remember spilled data size of this task before execute this operator so that we can + // figure out how many bytes we spilled for this operator. + private val spilledSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled + // // The modes of AggregateExpressions. Right now, we can handle the following mode: // - Partial-only: @@ -695,7 +700,8 @@ class TungstenAggregationIterator( val mapMemory = hashMap.getPeakMemoryUsedBytes val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) val peakMemory = Math.max(mapMemory, sorterMemory) - numBytesUsed += peakMemory + dataSize += peakMemory + spilledSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spilledSizeBefore TaskContext.get().internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index d917ac32de22e..b5a58f0bc5850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.metric +import org.apache.spark.util.Utils import org.apache.spark.{Accumulable, AccumulableParam, SparkContext} /** @@ -26,7 +27,7 @@ import org.apache.spark.{Accumulable, AccumulableParam, SparkContext} * An implementation of SQLMetric should override `+=` and `add` to avoid boxing. */ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( - name: String, val param: SQLMetricParam[R, T], val stringValue: T => String) + name: String, val param: SQLMetricParam[R, T]) extends Accumulable[R, T](param.zero, param, Some(name), true) /** @@ -35,6 +36,12 @@ private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T]( */ private[sql] trait SQLMetricParam[R <: SQLMetricValue[T], T] extends AccumulableParam[R, T] { + /** + * A function that defines how we aggregate the final accumulator results among all tasks, + * and represent it in string for a SQL physical operator. + */ + val stringValue: Seq[T] => String + def zero: R } @@ -63,26 +70,12 @@ private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetr override def value: Long = _value } -/** - * A wrapper of Int to avoid boxing and unboxing when using Accumulator - */ -private[sql] class IntSQLMetricValue(private var _value: Int) extends SQLMetricValue[Int] { - - def add(term: Int): IntSQLMetricValue = { - _value += term - this - } - - // Although there is a boxing here, it's fine because it's only called in SQLListener - override def value: Int = _value -} - /** * A specialized long Accumulable to avoid boxing and unboxing when using Accumulator's * `+=` and `add`. */ -private[sql] class LongSQLMetric private[metric](name: String, stringValue: Long => String) - extends SQLMetric[LongSQLMetricValue, Long](name, LongSQLMetricParam, stringValue) { +private[sql] class LongSQLMetric private[metric](name: String, param: LongSQLMetricParam) + extends SQLMetric[LongSQLMetricValue, Long](name, param) { override def +=(term: Long): Unit = { localValue.add(term) @@ -93,7 +86,8 @@ private[sql] class LongSQLMetric private[metric](name: String, stringValue: Long } } -private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Long] { +private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialValue: Long) + extends SQLMetricParam[LongSQLMetricValue, Long] { override def addAccumulator(r: LongSQLMetricValue, t: Long): LongSQLMetricValue = r.add(t) @@ -102,7 +96,7 @@ private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Lon override def zero(initialValue: LongSQLMetricValue): LongSQLMetricValue = zero - override def zero: LongSQLMetricValue = new LongSQLMetricValue(0L) + override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue) } private[sql] object SQLMetrics { @@ -110,15 +104,45 @@ private[sql] object SQLMetrics { def createLongMetric( sc: SparkContext, name: String, - stringValue: Long => String = _.toString): LongSQLMetric = { - val acc = new LongSQLMetric(name, stringValue) + stringValue: Seq[Long] => String, + initialValue: Long): LongSQLMetric = { + val param = new LongSQLMetricParam(stringValue, initialValue) + val acc = new LongSQLMetric(name, param) sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } + def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = { + createLongMetric(sc, name, _.sum.toString, 0L) + } + + /** + * Create a metric to report the size information(including total, min, med, max) like data size, + * spilled size, etc. + */ + def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = { + val stringValue = (values: Seq[Long]) => { + // This is a work around for https://issues.apache.org/jira/browse/SPARK-11013 + // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update + // it at the end of task and the value will be at least 0. + val validValues = values.filter(_ >= 0) + val Seq(sum, min, med, max) = { + val metric = if (validValues.length == 0) { + Seq.fill(4)(0L) + } else { + val sorted = validValues.sorted + Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) + } + metric.map(Utils.bytesToString) + } + s"\n$sum ($min, $med, $max)" + } + createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L) + } + /** * A metric that its value will be ignored. Use this one when we need a metric parameter but don't * care about the value. */ - val nullLongMetric = new LongSQLMetric("null", _.toString) + val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index 8cadac4f6c824..d56a585de074a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.util.Utils.bytesToString import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ @@ -96,13 +95,15 @@ case class TungstenSort( if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil override private[sql] lazy val metrics = Map( - "numBytesUsed" -> - SQLMetrics.createLongMetric(sparkContext, "number of bytes used", bytesToString)) + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "spilledSize" -> SQLMetrics.createSizeMetric(sparkContext, "spilled size")) protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema val childOutput = child.output - val numBytesUsed = longMetric("numBytesUsed") + + val dataSize = longMetric("dataSize") + val spilledSize = longMetric("spilledSize") /** * Set up the sorter in each partition before computing the parent partition. @@ -138,8 +139,15 @@ case class TungstenSort( partitionIndex: Int, sorter: UnsafeExternalRowSorter, parentIterator: Iterator[InternalRow]): Iterator[InternalRow] = { + // Remember spilled data size of this task before execute this operator so that we can + // figure out how many bytes we spilled for this operator. + val spilledSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled + val sortedIterator = sorter.sort(parentIterator.asInstanceOf[Iterator[UnsafeRow]]) - numBytesUsed += sorter.getPeakMemoryUsage + + dataSize += sorter.getPeakMemoryUsage + spilledSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spilledSizeBefore + taskContext.internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage) sortedIterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index a4dbd2e1978d0..e74d6fb396e1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -100,7 +100,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") // scalastyle:on } - private def planVisualization(metrics: Map[Long, Any], graph: SparkPlanGraph): Seq[Node] = { + private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = { val metadata = graph.nodes.flatMap { node => val nodeId = s"plan-meta-data-${node.id}"
{node.desc}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 4a24d2070e13e..b302b519998ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -252,7 +252,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi /** * Get all accumulator updates from all tasks which belong to this execution and merge them. */ - def getExecutionMetrics(executionId: Long): Map[Long, Any] = synchronized { + def getExecutionMetrics(executionId: Long): Map[Long, String] = synchronized { _executionIdToData.get(executionId) match { case Some(executionUIData) => val accumulatorUpdates = { @@ -264,8 +264,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) } mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId => - executionUIData.accumulatorMetrics(accumulatorId).metricParam). - mapValues(_.asInstanceOf[SQLMetricValue[_]].value) + executionUIData.accumulatorMetrics(accumulatorId).metricParam) case None => // This execution has been dropped Map.empty @@ -274,11 +273,11 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi private def mergeAccumulatorUpdates( accumulatorUpdates: Seq[(Long, Any)], - paramFunc: Long => SQLMetricParam[SQLMetricValue[Any], Any]): Map[Long, Any] = { + paramFunc: Long => SQLMetricParam[SQLMetricValue[Any], Any]): Map[Long, String] = { accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => val param = paramFunc(accumulatorId) (accumulatorId, - values.map(_._2.asInstanceOf[SQLMetricValue[Any]]).foldLeft(param.zero)(param.addInPlace)) + param.stringValue(values.map(_._2.asInstanceOf[SQLMetricValue[Any]].value))) } } @@ -330,8 +329,7 @@ private[ui] class SQLExecutionUIData( private[ui] case class SQLPlanMetric( name: String, accumulatorId: Long, - metricParam: SQLMetricParam[SQLMetricValue[Any], Any], - stringValue: Any => String) + metricParam: SQLMetricParam[SQLMetricValue[Any], Any]) /** * Store all accumulatorUpdates for all tasks in a Spark stage. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 35670f0609154..f1fce5478a3fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.spark.util.Utils import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} @@ -34,7 +33,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} private[ui] case class SparkPlanGraph( nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { - def makeDotFile(metrics: Map[Long, Any]): String = { + def makeDotFile(metrics: Map[Long, String]): String = { val dotFile = new StringBuilder dotFile.append("digraph G {\n") nodes.foreach(node => dotFile.append(node.makeDotNode(metrics) + "\n")) @@ -64,8 +63,7 @@ private[sql] object SparkPlanGraph { edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { val metrics = plan.metrics.toSeq.map { case (key, metric) => SQLPlanMetric(metric.name.getOrElse(key), metric.id, - metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]], - metric.stringValue.asInstanceOf[Any => String]) + metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]) } val node = SparkPlanGraphNode( nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics) @@ -89,11 +87,11 @@ private[sql] object SparkPlanGraph { private[ui] case class SparkPlanGraphNode( id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) { - def makeDotNode(metricsValue: Map[Long, Any]): String = { + def makeDotNode(metricsValue: Map[Long, String]): String = { val values = { for (metric <- metrics; value <- metricsValue.get(metric.accumulatorId)) yield { - metric.name + ": " + metric.stringValue(value) + metric.name + ": " + value } } val label = if (values.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala index 51d0dd1620a5d..5a07e8eac8d33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIteratorSuite.scala @@ -39,7 +39,8 @@ class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLConte } val dummyAccum = SQLMetrics.createLongMetric(sparkContext, "dummy") iter = new TungstenAggregationIterator(Seq.empty, Seq.empty, Seq.empty, Seq.empty, Seq.empty, - Seq.empty, newMutableProjection, Seq.empty, None, dummyAccum, dummyAccum, dummyAccum) + Seq.empty, newMutableProjection, Seq.empty, None, + dummyAccum, dummyAccum, dummyAccum, dummyAccum) val numPages = iter.getHashMap.getNumDataPages assert(numPages === 1) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6afffae161ef6..cdd885ba14203 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -93,7 +93,16 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { }.toMap (node.id, node.name -> nodeMetrics) }.toMap - assert(expectedMetrics === actualMetrics) + + assert(expectedMetrics.keySet === actualMetrics.keySet) + for (nodeId <- expectedMetrics.keySet) { + val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId) + val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId) + assert(expectedNodeName === actualNodeName) + for (metricName <- expectedMetricsMap.keySet) { + assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName)) + } + } } else { // TODO Remove this "else" once we fix the race condition that missing the JobStarted event. // Since we cannot track all jobs, the metric values could be wrong and we should not check @@ -489,7 +498,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val metricValues = sqlContext.listener.getExecutionMetrics(executionId) // Because "save" will create a new DataFrame internally, we cannot get the real metric id. // However, we still can check the value. - assert(metricValues.values.toSeq === Seq(2L)) + assert(metricValues.values.toSeq === Seq("2")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 727cf3665a871..cc1c1e10e98c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -74,6 +74,10 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { } test("basic") { + def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { + assert(actual === expected.mapValues(_.toString)) + } + val listener = new SQLListener(sqlContext.sparkContext.conf) val executionId = 0 val df = createTestDataFrame @@ -114,7 +118,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { (1L, 0, 0, createTaskMetrics(accumulatorUpdates)) ))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, metrics) @@ -122,7 +126,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { (1L, 0, 0, createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))) ))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 3)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) @@ -133,7 +137,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { (1L, 0, 1, createTaskMetrics(accumulatorUpdates)) ))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt listener.onTaskEnd(SparkListenerTaskEnd( @@ -144,7 +148,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { createTaskInfo(0, 0), createTaskMetrics(accumulatorUpdates.mapValues(_ * 100)))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -162,7 +166,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { createTaskInfo(1, 0), createTaskMetrics(accumulatorUpdates.mapValues(_ * 3)))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 5)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5)) // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) @@ -173,7 +177,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { (1L, 1, 0, createTaskMetrics(accumulatorUpdates)) ))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 7)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7)) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -191,7 +195,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { createTaskInfo(1, 0), createTaskMetrics(accumulatorUpdates.mapValues(_ * 3)))) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) assert(executionUIData.runningJobs === Seq(0)) assert(executionUIData.succeededJobs.isEmpty) @@ -208,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(executionUIData.succeededJobs === Seq(0)) assert(executionUIData.failedJobs.isEmpty) - assert(listener.getExecutionMetrics(0) === accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) } test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { From 7e1b68d9aa9745f96164ed230f3156189b2086a1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 15 Oct 2015 11:36:27 -0700 Subject: [PATCH 3/3] address comments --- .../aggregate/TungstenAggregate.scala | 6 +++--- .../TungstenAggregationIterator.scala | 8 ++++---- .../sql/execution/metric/SQLMetrics.scala | 19 +++++++++++-------- .../org/apache/spark/sql/execution/sort.scala | 10 +++++----- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 340de7c12c6df..0d3a4b36c161b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -51,7 +51,7 @@ case class TungstenAggregate( "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spilledSize" -> SQLMetrics.createSizeMetric(sparkContext, "spilled size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) override def outputsUnsafeRows: Boolean = true @@ -82,7 +82,7 @@ case class TungstenAggregate( val numInputRows = longMetric("numInputRows") val numOutputRows = longMetric("numOutputRows") val dataSize = longMetric("dataSize") - val spilledSize = longMetric("spilledSize") + val spillSize = longMetric("spillSize") /** * Set up the underlying unsafe data structures used before computing the parent partition. @@ -103,7 +103,7 @@ case class TungstenAggregate( numInputRows, numOutputRows, dataSize, - spilledSize) + spillSize) } /** Compute a partition using the iterator already set up previously. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index bc4753fd3aa80..7cd0f7b81e46c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -89,7 +89,7 @@ class TungstenAggregationIterator( numInputRows: LongSQLMetric, numOutputRows: LongSQLMetric, dataSize: LongSQLMetric, - spilledSize: LongSQLMetric) + spillSize: LongSQLMetric) extends Iterator[UnsafeRow] with Logging { // The parent partition iterator, to be initialized later in `start` @@ -112,9 +112,9 @@ class TungstenAggregationIterator( s"$allAggregateExpressions should have no more than 2 kinds of modes.") } - // Remember spilled data size of this task before execute this operator so that we can + // Remember spill data size of this task before execute this operator so that we can // figure out how many bytes we spilled for this operator. - private val spilledSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled + private val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled // // The modes of AggregateExpressions. Right now, we can handle the following mode: @@ -849,7 +849,7 @@ class TungstenAggregationIterator( val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) val peakMemory = Math.max(mapMemory, sorterMemory) dataSize += peakMemory - spilledSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spilledSizeBefore + spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore TaskContext.get().internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index b5a58f0bc5850..075b7ad881112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -101,11 +101,11 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa private[sql] object SQLMetrics { - def createLongMetric( - sc: SparkContext, - name: String, - stringValue: Seq[Long] => String, - initialValue: Long): LongSQLMetric = { + private def createLongMetric( + sc: SparkContext, + name: String, + stringValue: Seq[Long] => String, + initialValue: Long): LongSQLMetric = { val param = new LongSQLMetricParam(stringValue, initialValue) val acc = new LongSQLMetric(name, param) sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc)) @@ -117,12 +117,12 @@ private[sql] object SQLMetrics { } /** - * Create a metric to report the size information(including total, min, med, max) like data size, - * spilled size, etc. + * Create a metric to report the size information (including total, min, med, max) like data size, + * spill size, etc. */ def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = { val stringValue = (values: Seq[Long]) => { - // This is a work around for https://issues.apache.org/jira/browse/SPARK-11013 + // This is a workaround for SPARK-11013. // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update // it at the end of task and the value will be at least 0. val validValues = values.filter(_ >= 0) @@ -137,6 +137,9 @@ private[sql] object SQLMetrics { } s"\n$sum ($min, $med, $max)" } + // The final result of this metric in physical operator UI may looks like: + // data size total (min, med, max): + // 100GB (100MB, 1GB, 10GB) createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index d56a585de074a..9385e5734db5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -96,14 +96,14 @@ case class TungstenSort( override private[sql] lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spilledSize" -> SQLMetrics.createSizeMetric(sparkContext, "spilled size")) + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) protected override def doExecute(): RDD[InternalRow] = { val schema = child.schema val childOutput = child.output val dataSize = longMetric("dataSize") - val spilledSize = longMetric("spilledSize") + val spillSize = longMetric("spillSize") /** * Set up the sorter in each partition before computing the parent partition. @@ -139,14 +139,14 @@ case class TungstenSort( partitionIndex: Int, sorter: UnsafeExternalRowSorter, parentIterator: Iterator[InternalRow]): Iterator[InternalRow] = { - // Remember spilled data size of this task before execute this operator so that we can + // Remember spill data size of this task before execute this operator so that we can // figure out how many bytes we spilled for this operator. - val spilledSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled + val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled val sortedIterator = sorter.sort(parentIterator.asInstanceOf[Iterator[UnsafeRow]]) dataSize += sorter.getPeakMemoryUsage - spilledSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spilledSizeBefore + spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore taskContext.internalMetricsToAccumulators( InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)