Skip to content

Commit

Permalink
[SPARK-19937] Collect metrics for remote bytes read to disk during sh…
Browse files Browse the repository at this point in the history
…uffle.

In current code(#16989), big blocks are shuffled to disk.
This pr proposes to collect metrics for remote bytes fetched to disk.

Author: jinxing <jinxing6042@126.com>

Closes #18249 from jinxing64/SPARK-19937.
  • Loading branch information
jinxing authored and Marcelo Vanzin committed Jun 22, 2017
1 parent e55a105 commit 58434ac
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 13 deletions.
Expand Up @@ -50,6 +50,7 @@ private[spark] object InternalAccumulator {
val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched"
val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
val REMOTE_BYTES_READ_TO_DISK = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesReadToDisk"
val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
Expand Down
Expand Up @@ -31,6 +31,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[executor] val _remoteBlocksFetched = new LongAccumulator
private[executor] val _localBlocksFetched = new LongAccumulator
private[executor] val _remoteBytesRead = new LongAccumulator
private[executor] val _remoteBytesReadToDisk = new LongAccumulator
private[executor] val _localBytesRead = new LongAccumulator
private[executor] val _fetchWaitTime = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator
Expand All @@ -50,6 +51,11 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
*/
def remoteBytesRead: Long = _remoteBytesRead.sum

/**
* Total number of remotes bytes read to disk from the shuffle by this task.
*/
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk.sum

/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
Expand Down Expand Up @@ -80,13 +86,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
private[spark] def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.add(v)
private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)

private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
private[spark] def setRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.setValue(v)
private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
Expand All @@ -99,13 +107,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_remoteBlocksFetched.setValue(0)
_localBlocksFetched.setValue(0)
_remoteBytesRead.setValue(0)
_remoteBytesReadToDisk.setValue(0)
_localBytesRead.setValue(0)
_fetchWaitTime.setValue(0)
_recordsRead.setValue(0)
metrics.foreach { metric =>
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
_localBlocksFetched.add(metric.localBlocksFetched)
_remoteBytesRead.add(metric.remoteBytesRead)
_remoteBytesReadToDisk.add(metric.remoteBytesReadToDisk)
_localBytesRead.add(metric.localBytesRead)
_fetchWaitTime.add(metric.fetchWaitTime)
_recordsRead.add(metric.recordsRead)
Expand All @@ -122,20 +132,23 @@ private[spark] class TempShuffleReadMetrics {
private[this] var _remoteBlocksFetched = 0L
private[this] var _localBlocksFetched = 0L
private[this] var _remoteBytesRead = 0L
private[this] var _remoteBytesReadToDisk = 0L
private[this] var _localBytesRead = 0L
private[this] var _fetchWaitTime = 0L
private[this] var _recordsRead = 0L

def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk += v
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
def incRecordsRead(v: Long): Unit = _recordsRead += v

def remoteBlocksFetched: Long = _remoteBlocksFetched
def localBlocksFetched: Long = _localBlocksFetched
def remoteBytesRead: Long = _remoteBytesRead
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk
def localBytesRead: Long = _localBytesRead
def fetchWaitTime: Long = _fetchWaitTime
def recordsRead: Long = _recordsRead
Expand Down
Expand Up @@ -215,6 +215,7 @@ class TaskMetrics private[spark] () extends Serializable {
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.REMOTE_BYTES_READ_TO_DISK -> shuffleReadMetrics._remoteBytesReadToDisk,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
Expand Down
Expand Up @@ -200,6 +200,7 @@ private[v1] object AllStagesResource {
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk),
remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
Expand Down Expand Up @@ -281,6 +282,7 @@ private[v1] object AllStagesResource {
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
remoteBytesReadToDisk = internal.remoteBytesReadToDisk,
localBytesRead = internal.localBytesRead,
recordsRead = internal.recordsRead
)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Expand Up @@ -208,6 +208,7 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Long,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val remoteBytesReadToDisk: Long,
val localBytesRead: Long,
val recordsRead: Long)

Expand Down Expand Up @@ -249,6 +250,7 @@ class ShuffleReadMetricDistributions private[spark](
val localBlocksFetched: IndexedSeq[Double],
val fetchWaitTime: IndexedSeq[Double],
val remoteBytesRead: IndexedSeq[Double],
val remoteBytesReadToDisk: IndexedSeq[Double],
val totalBlocksFetched: IndexedSeq[Double])

class ShuffleWriteMetricDistributions private[spark](
Expand Down
Expand Up @@ -165,6 +165,9 @@ final class ShuffleBlockFetcherIterator(
case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
Expand Down Expand Up @@ -363,6 +366,9 @@ final class ShuffleBlockFetcherIterator(
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Expand Up @@ -251,6 +251,7 @@ private[spark] object UIData {
remoteBlocksFetched: Long,
localBlocksFetched: Long,
remoteBytesRead: Long,
remoteBytesReadToDisk: Long,
localBytesRead: Long,
fetchWaitTime: Long,
recordsRead: Long,
Expand All @@ -274,6 +275,7 @@ private[spark] object UIData {
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
remoteBytesReadToDisk = metrics.remoteBytesReadToDisk,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
Expand All @@ -282,7 +284,7 @@ private[spark] object UIData {
)
}
}
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0, 0)
}

case class ShuffleWriteMetricsUIData(
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -339,6 +339,7 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
("Remote Bytes Read To Disk" -> taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk) ~
("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
val shuffleWriteMetrics: JValue =
Expand Down Expand Up @@ -804,6 +805,8 @@ private[spark] object JsonProtocol {
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
Utils.jsonOption(readJson \ "Remote Bytes Read To Disk")
.foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extract[Long])}
readMetrics.incLocalBytesRead(
Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
Expand Down
Expand Up @@ -60,6 +60,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -105,6 +106,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -150,6 +152,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -195,6 +198,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -240,6 +244,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -285,6 +290,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -330,6 +336,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -375,6 +382,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down
Expand Up @@ -60,6 +60,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -105,6 +106,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -150,6 +152,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -195,6 +198,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -240,6 +244,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -285,6 +290,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -330,6 +336,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -375,6 +382,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down

0 comments on commit 58434ac

Please sign in to comment.