Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 43 additions & 34 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -859,40 +859,49 @@ private[spark] object LiveEntityHelpers {
}

createMetrics(
updateMetricValue(m.executorDeserializeTime),
updateMetricValue(m.executorDeserializeCpuTime),
updateMetricValue(m.executorRunTime),
updateMetricValue(m.executorCpuTime),
updateMetricValue(m.resultSize),
updateMetricValue(m.jvmGcTime),
updateMetricValue(m.resultSerializationTime),
updateMetricValue(m.memoryBytesSpilled),
updateMetricValue(m.diskBytesSpilled),
updateMetricValue(m.peakExecutionMemory),
updateMetricValue(m.inputMetrics.bytesRead),
updateMetricValue(m.inputMetrics.recordsRead),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
updateMetricValue(m.outputMetrics.bytesWritten),
updateMetricValue(m.outputMetrics.recordsWritten),
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
updateMetricValue(m.shuffleReadMetrics.recordsRead),
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
updateMetricValue(m.shuffleWriteMetrics.writeTime),
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
executorDeserializeTime = updateMetricValue(m.executorDeserializeTime),
executorDeserializeCpuTime = updateMetricValue(m.executorDeserializeCpuTime),
executorRunTime = updateMetricValue(m.executorRunTime),
executorCpuTime = updateMetricValue(m.executorCpuTime),
resultSize = updateMetricValue(m.resultSize),
jvmGcTime = updateMetricValue(m.jvmGcTime),
resultSerializationTime = updateMetricValue(m.resultSerializationTime),
memoryBytesSpilled = updateMetricValue(m.memoryBytesSpilled),
diskBytesSpilled = updateMetricValue(m.diskBytesSpilled),
peakExecutionMemory = updateMetricValue(m.peakExecutionMemory),
inputBytesRead = updateMetricValue(m.inputMetrics.bytesRead),
inputRecordsRead = updateMetricValue(m.inputMetrics.recordsRead),
outputBytesWritten = updateMetricValue(m.outputMetrics.bytesWritten),
outputRecordsWritten = updateMetricValue(m.outputMetrics.recordsWritten),
shuffleRemoteBlocksFetched = updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
shuffleLocalBlocksFetched = updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
shuffleFetchWaitTime = updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
shuffleRemoteBytesRead = updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
shuffleRemoteBytesReadToDisk = updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
shuffleLocalBytesRead = updateMetricValue(m.shuffleReadMetrics.localBytesRead),
shuffleRecordsRead = updateMetricValue(m.shuffleReadMetrics.recordsRead),
shuffleCorruptMergedBlockChunks =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
shuffleMergedFetchFallbackCount =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount),
shuffleMergedRemoteBlocksFetched =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched),
shuffleMergedLocalBlocksFetched =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched),
shuffleMergedRemoteChunksFetched =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched),
shuffleMergedLocalChunksFetched =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched),
shuffleMergedRemoteBytesRead =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead),
shuffleMergedLocalBytesRead =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead),
shuffleRemoteReqsDuration = updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration),
shuffleMergedRemoteReqsDuration =
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration),
shuffleBytesWritten = updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
shuffleWriteTime = updateMetricValue(m.shuffleWriteMetrics.writeTime),
shuffleRecordsWritten = updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
}

private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
Expand Down
129 changes: 129 additions & 0 deletions core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,135 @@ class LiveEntitySuite extends SparkFunSuite {
assert(accuInfo.value == "[1,2,3,4,5,... 5 more items]")
}

test("makeNegative correctly negates all metrics with proper argument order") {
import LiveEntityHelpers._

val originalMetrics = createMetrics(
executorDeserializeTime = 1L,
executorDeserializeCpuTime = 2L,
executorRunTime = 3L,
executorCpuTime = 4L,
resultSize = 5L,
jvmGcTime = 6L,
resultSerializationTime = 7L,
memoryBytesSpilled = 8L,
diskBytesSpilled = 9L,
peakExecutionMemory = 10L,
inputBytesRead = 11L,
inputRecordsRead = 12L,
outputBytesWritten = 13L,
outputRecordsWritten = 14L,
shuffleRemoteBlocksFetched = 15L,
shuffleLocalBlocksFetched = 16L,
shuffleFetchWaitTime = 17L,
shuffleRemoteBytesRead = 18L,
shuffleRemoteBytesReadToDisk = 19L,
shuffleLocalBytesRead = 20L,
shuffleRecordsRead = 21L,
shuffleCorruptMergedBlockChunks = 22L,
shuffleMergedFetchFallbackCount = 23L,
shuffleMergedRemoteBlocksFetched = 24L,
shuffleMergedLocalBlocksFetched = 25L,
shuffleMergedRemoteChunksFetched = 26L,
shuffleMergedLocalChunksFetched = 27L,
shuffleMergedRemoteBytesRead = 28L,
shuffleMergedLocalBytesRead = 29L,
shuffleRemoteReqsDuration = 30L,
shuffleMergedRemoteReqsDuration = 31L,
shuffleBytesWritten = 32L,
shuffleWriteTime = 33L,
shuffleRecordsWritten = 34L
)

val negatedMetrics = makeNegative(originalMetrics)

def expectedNegated(v: Long): Long = v * -1L - 1L

// Verify all fields are correctly negated
assert(negatedMetrics.executorDeserializeTime === expectedNegated(1L))
assert(negatedMetrics.executorDeserializeCpuTime === expectedNegated(2L))
assert(negatedMetrics.executorRunTime === expectedNegated(3L))
assert(negatedMetrics.executorCpuTime === expectedNegated(4L))
assert(negatedMetrics.resultSize === expectedNegated(5L))
assert(negatedMetrics.jvmGcTime === expectedNegated(6L))
assert(negatedMetrics.resultSerializationTime === expectedNegated(7L))
assert(negatedMetrics.memoryBytesSpilled === expectedNegated(8L))
assert(negatedMetrics.diskBytesSpilled === expectedNegated(9L))
assert(negatedMetrics.peakExecutionMemory === expectedNegated(10L))

// Verify input metrics
assert(negatedMetrics.inputMetrics.bytesRead === expectedNegated(11L))
assert(negatedMetrics.inputMetrics.recordsRead === expectedNegated(12L))

// Verify output metrics (these were in wrong position in current master)
assert(negatedMetrics.outputMetrics.bytesWritten === expectedNegated(13L),
"outputMetrics.bytesWritten should be correctly negated")
assert(negatedMetrics.outputMetrics.recordsWritten === expectedNegated(14L),
"outputMetrics.recordsWritten should be correctly negated")

// Verify shuffle read metrics (these were in wrong position in current master)
assert(negatedMetrics.shuffleReadMetrics.remoteBlocksFetched === expectedNegated(15L),
"shuffleReadMetrics.remoteBlocksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.localBlocksFetched === expectedNegated(16L),
"shuffleReadMetrics.localBlocksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.fetchWaitTime === expectedNegated(17L),
"shuffleReadMetrics.fetchWaitTime should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.remoteBytesRead === expectedNegated(18L),
"shuffleReadMetrics.remoteBytesRead should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.remoteBytesReadToDisk === expectedNegated(19L),
"shuffleReadMetrics.remoteBytesReadToDisk should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.localBytesRead === expectedNegated(20L),
"shuffleReadMetrics.localBytesRead should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.recordsRead === expectedNegated(21L),
"shuffleReadMetrics.recordsRead should be correctly negated")

// Verify shuffle push read metrics (these were in wrong position in current master)
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks ===
expectedNegated(22L),
"shufflePushReadMetrics.corruptMergedBlockChunks should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount ===
expectedNegated(23L),
"shufflePushReadMetrics.mergedFetchFallbackCount should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched ===
expectedNegated(24L),
"shufflePushReadMetrics.remoteMergedBlocksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched ===
expectedNegated(25L),
"shufflePushReadMetrics.localMergedBlocksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched ===
expectedNegated(26L),
"shufflePushReadMetrics.remoteMergedChunksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched ===
expectedNegated(27L),
"shufflePushReadMetrics.localMergedChunksFetched should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead ===
expectedNegated(28L),
"shufflePushReadMetrics.remoteMergedBytesRead should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead ===
expectedNegated(29L),
"shufflePushReadMetrics.localMergedBytesRead should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.remoteReqsDuration === expectedNegated(30L),
"shuffleReadMetrics.remoteReqsDuration should be correctly negated")
assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration ===
expectedNegated(31L),
"shufflePushReadMetrics.remoteMergedReqsDuration should be correctly negated")

// Verify shuffle write metrics
assert(negatedMetrics.shuffleWriteMetrics.bytesWritten === expectedNegated(32L))
assert(negatedMetrics.shuffleWriteMetrics.writeTime === expectedNegated(33L))
assert(negatedMetrics.shuffleWriteMetrics.recordsWritten === expectedNegated(34L))

// Verify zero handling: 0 should become -1
val zeroMetrics = createMetrics(default = 0L)
val negatedZeroMetrics = makeNegative(zeroMetrics)
assert(negatedZeroMetrics.executorDeserializeTime === -1L,
"Zero value should be converted to -1")
assert(negatedZeroMetrics.inputMetrics.bytesRead === -1L,
"Zero input metric should be converted to -1")
assert(negatedZeroMetrics.outputMetrics.bytesWritten === -1L,
"Zero output metric should be converted to -1")
}

private def checkSize(seq: Seq[_], expected: Int): Unit = {
assert(seq.length === expected)
var count = 0
Expand Down