Skip to content

Commit

Permalink
[SPARK-14628][CORE][FOLLLOW-UP] Always tracking read/write metrics
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR is a follow up for #12417, now we always track input/output/shuffle metrics in spark JSON protocol and status API.

Most of the line changes are because of re-generating the gold answer for `HistoryServerSuite`, and we add a lot of 0 values for read/write metrics.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12462 from cloud-fan/follow.
  • Loading branch information
cloud-fan authored and rxin committed Apr 18, 2016
1 parent 6ff0435 commit 6027340
Show file tree
Hide file tree
Showing 40 changed files with 2,590 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
*/
def recordsRead: Long = _recordsRead.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (bytesRead | recordsRead) != 0

private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten:
*/
def recordsWritten: Long = _recordsWritten.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0

private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ class ShuffleReadMetrics private (
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0

private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ class ShuffleWriteMetrics private (
*/
def writeTime: Long = _writeTime.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0

private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,47 +167,32 @@ private[v1] object AllStagesResource {
// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just
// implement one "build" method, which just builds the quantiles for each field.

val inputMetrics: Option[InputMetricDistributions] =
val inputMetrics: InputMetricDistributions =
new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = {
if (raw.inputMetrics.isUpdated) {
Some(raw.inputMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics

def build: InputMetricDistributions = new InputMetricDistributions(
bytesRead = submetricQuantiles(_.bytesRead),
recordsRead = submetricQuantiles(_.recordsRead)
)
}.metricOption
}.build

val outputMetrics: Option[OutputMetricDistributions] =
val outputMetrics: OutputMetricDistributions =
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
if (raw.outputMetrics.isUpdated) {
Some(raw.outputMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics

def build: OutputMetricDistributions = new OutputMetricDistributions(
bytesWritten = submetricQuantiles(_.bytesWritten),
recordsWritten = submetricQuantiles(_.recordsWritten)
)
}.metricOption
}.build

val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] =
val shuffleReadMetrics: ShuffleReadMetricDistributions =
new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = {
if (raw.shuffleReadMetrics.isUpdated) {
Some(raw.shuffleReadMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics =
raw.shuffleReadMetrics

def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
Expand All @@ -217,24 +202,20 @@ private[v1] object AllStagesResource {
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
)
}.metricOption
}.build

val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] =
val shuffleWriteMetrics: ShuffleWriteMetricDistributions =
new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = {
if (raw.shuffleWriteMetrics.isUpdated) {
Some(raw.shuffleWriteMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics =
raw.shuffleWriteMetrics

def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption
}.build

new TaskMetricDistributions(
quantiles = quantiles,
Expand Down Expand Up @@ -273,84 +254,55 @@ private[v1] object AllStagesResource {
)
}

def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = {
if (internal.isUpdated) {
Some(new InputMetrics(
bytesRead = internal.bytesRead,
recordsRead = internal.recordsRead
))
} else {
None
}
def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
new InputMetrics(
bytesRead = internal.bytesRead,
recordsRead = internal.recordsRead
)
}

def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = {
if (internal.isUpdated) {
Some(new OutputMetrics(
bytesWritten = internal.bytesWritten,
recordsWritten = internal.recordsWritten
))
} else {
None
}
def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
new OutputMetrics(
bytesWritten = internal.bytesWritten,
recordsWritten = internal.recordsWritten
)
}

def convertShuffleReadMetrics(
internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = {
if (internal.isUpdated) {
Some(new ShuffleReadMetrics(
remoteBlocksFetched = internal.remoteBlocksFetched,
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
totalBlocksFetched = internal.totalBlocksFetched,
recordsRead = internal.recordsRead
))
} else {
None
}
def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
new ShuffleReadMetrics(
remoteBlocksFetched = internal.remoteBlocksFetched,
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
localBytesRead = internal.localBytesRead,
recordsRead = internal.recordsRead
)
}

def convertShuffleWriteMetrics(
internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = {
if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) {
None
} else {
Some(new ShuffleWriteMetrics(
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
))
}
def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
)
}
}

/**
* Helper for getting distributions from nested metric types. Many of the metrics we want are
* contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle
* the options (returning None if the metrics are all empty), and extract the quantiles for each
* metric. After creating an instance, call metricOption to get the result type.
* Helper for getting distributions from nested metric types.
*/
private[v1] abstract class MetricHelper[I, O](
rawMetrics: Seq[InternalTaskMetrics],
quantiles: Array[Double]) {

def getSubmetrics(raw: InternalTaskMetrics): Option[I]
def getSubmetrics(raw: InternalTaskMetrics): I

def build: O

val data: Seq[I] = rawMetrics.flatMap(getSubmetrics)
val data: Seq[I] = rawMetrics.map(getSubmetrics)

/** applies the given function to all input metrics, and returns the quantiles */
def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
}

def metricOption: Option[O] = {
if (data.isEmpty) {
None
} else {
Some(build)
}
}
}
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ class TaskMetrics private[spark](
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val inputMetrics: Option[InputMetrics],
val outputMetrics: Option[OutputMetrics],
val shuffleReadMetrics: Option[ShuffleReadMetrics],
val shuffleWriteMetrics: Option[ShuffleWriteMetrics])
val inputMetrics: InputMetrics,
val outputMetrics: OutputMetrics,
val shuffleReadMetrics: ShuffleReadMetrics,
val shuffleWriteMetrics: ShuffleWriteMetrics)

class InputMetrics private[spark](
val bytesRead: Long,
Expand All @@ -190,7 +190,7 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Int,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val totalBlocksFetched: Int,
val localBytesRead: Long,
val recordsRead: Long)

class ShuffleWriteMetrics private[spark](
Expand All @@ -209,10 +209,10 @@ class TaskMetricDistributions private[spark](
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],

val inputMetrics: Option[InputMetricDistributions],
val outputMetrics: Option[OutputMetricDistributions],
val shuffleReadMetrics: Option[ShuffleReadMetricDistributions],
val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions])
val inputMetrics: InputMetricDistributions,
val outputMetrics: OutputMetricDistributions,
val shuffleReadMetrics: ShuffleReadMetricDistributions,
val shuffleWriteMetrics: ShuffleWriteMetricDistributions)

class InputMetricDistributions private[spark](
val bytesRead: IndexedSeq[Double],
Expand Down
22 changes: 5 additions & 17 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,39 +326,27 @@ private[spark] object JsonProtocol {
}

def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) {
val shuffleReadMetrics: JValue =
("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
} else {
JNothing
}
val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) {
val shuffleWriteMetrics: JValue =
("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
} else {
JNothing
}
val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) {
val inputMetrics: JValue =
("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
("Records Read" -> taskMetrics.inputMetrics.recordsRead)
} else {
JNothing
}
val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) {
val outputMetrics: JValue =
("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
} else {
JNothing
}
val updatedBlocks =
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
("Status" -> blockStatusToJson(status))
})
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
Expand Down

0 comments on commit 6027340

Please sign in to comment.