diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index efc670440bc6..3c4efd8a5ead 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -859,40 +859,49 @@ private[spark] object LiveEntityHelpers { } createMetrics( - updateMetricValue(m.executorDeserializeTime), - updateMetricValue(m.executorDeserializeCpuTime), - updateMetricValue(m.executorRunTime), - updateMetricValue(m.executorCpuTime), - updateMetricValue(m.resultSize), - updateMetricValue(m.jvmGcTime), - updateMetricValue(m.resultSerializationTime), - updateMetricValue(m.memoryBytesSpilled), - updateMetricValue(m.diskBytesSpilled), - updateMetricValue(m.peakExecutionMemory), - updateMetricValue(m.inputMetrics.bytesRead), - updateMetricValue(m.inputMetrics.recordsRead), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead), - updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration), - updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration), - updateMetricValue(m.outputMetrics.bytesWritten), - updateMetricValue(m.outputMetrics.recordsWritten), - updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched), - updateMetricValue(m.shuffleReadMetrics.localBlocksFetched), - updateMetricValue(m.shuffleReadMetrics.fetchWaitTime), - updateMetricValue(m.shuffleReadMetrics.remoteBytesRead), - updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk), - updateMetricValue(m.shuffleReadMetrics.localBytesRead), - updateMetricValue(m.shuffleReadMetrics.recordsRead), - updateMetricValue(m.shuffleWriteMetrics.bytesWritten), - updateMetricValue(m.shuffleWriteMetrics.writeTime), - updateMetricValue(m.shuffleWriteMetrics.recordsWritten)) + executorDeserializeTime = updateMetricValue(m.executorDeserializeTime), + executorDeserializeCpuTime = updateMetricValue(m.executorDeserializeCpuTime), + executorRunTime = updateMetricValue(m.executorRunTime), + executorCpuTime = updateMetricValue(m.executorCpuTime), + resultSize = updateMetricValue(m.resultSize), + jvmGcTime = updateMetricValue(m.jvmGcTime), + resultSerializationTime = updateMetricValue(m.resultSerializationTime), + memoryBytesSpilled = updateMetricValue(m.memoryBytesSpilled), + diskBytesSpilled = updateMetricValue(m.diskBytesSpilled), + peakExecutionMemory = updateMetricValue(m.peakExecutionMemory), + inputBytesRead = updateMetricValue(m.inputMetrics.bytesRead), + inputRecordsRead = updateMetricValue(m.inputMetrics.recordsRead), + outputBytesWritten = updateMetricValue(m.outputMetrics.bytesWritten), + outputRecordsWritten = updateMetricValue(m.outputMetrics.recordsWritten), + shuffleRemoteBlocksFetched = updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched), + shuffleLocalBlocksFetched = updateMetricValue(m.shuffleReadMetrics.localBlocksFetched), + shuffleFetchWaitTime = updateMetricValue(m.shuffleReadMetrics.fetchWaitTime), + shuffleRemoteBytesRead = updateMetricValue(m.shuffleReadMetrics.remoteBytesRead), + shuffleRemoteBytesReadToDisk = updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk), + shuffleLocalBytesRead = updateMetricValue(m.shuffleReadMetrics.localBytesRead), + shuffleRecordsRead = updateMetricValue(m.shuffleReadMetrics.recordsRead), + shuffleCorruptMergedBlockChunks = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks), + shuffleMergedFetchFallbackCount = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount), + shuffleMergedRemoteBlocksFetched = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched), + shuffleMergedLocalBlocksFetched = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched), + shuffleMergedRemoteChunksFetched = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched), + shuffleMergedLocalChunksFetched = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched), + shuffleMergedRemoteBytesRead = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead), + shuffleMergedLocalBytesRead = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead), + shuffleRemoteReqsDuration = updateMetricValue(m.shuffleReadMetrics.remoteReqsDuration), + shuffleMergedRemoteReqsDuration = + updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration), + shuffleBytesWritten = updateMetricValue(m.shuffleWriteMetrics.bytesWritten), + shuffleWriteTime = updateMetricValue(m.shuffleWriteMetrics.writeTime), + shuffleRecordsWritten = updateMetricValue(m.shuffleWriteMetrics.recordsWritten)) } private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = { diff --git a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala index 35e8a62c93c9..bed822f0b457 100644 --- a/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala +++ b/core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala @@ -66,6 +66,135 @@ class LiveEntitySuite extends SparkFunSuite { assert(accuInfo.value == "[1,2,3,4,5,... 5 more items]") } + test("makeNegative correctly negates all metrics with proper argument order") { + import LiveEntityHelpers._ + + val originalMetrics = createMetrics( + executorDeserializeTime = 1L, + executorDeserializeCpuTime = 2L, + executorRunTime = 3L, + executorCpuTime = 4L, + resultSize = 5L, + jvmGcTime = 6L, + resultSerializationTime = 7L, + memoryBytesSpilled = 8L, + diskBytesSpilled = 9L, + peakExecutionMemory = 10L, + inputBytesRead = 11L, + inputRecordsRead = 12L, + outputBytesWritten = 13L, + outputRecordsWritten = 14L, + shuffleRemoteBlocksFetched = 15L, + shuffleLocalBlocksFetched = 16L, + shuffleFetchWaitTime = 17L, + shuffleRemoteBytesRead = 18L, + shuffleRemoteBytesReadToDisk = 19L, + shuffleLocalBytesRead = 20L, + shuffleRecordsRead = 21L, + shuffleCorruptMergedBlockChunks = 22L, + shuffleMergedFetchFallbackCount = 23L, + shuffleMergedRemoteBlocksFetched = 24L, + shuffleMergedLocalBlocksFetched = 25L, + shuffleMergedRemoteChunksFetched = 26L, + shuffleMergedLocalChunksFetched = 27L, + shuffleMergedRemoteBytesRead = 28L, + shuffleMergedLocalBytesRead = 29L, + shuffleRemoteReqsDuration = 30L, + shuffleMergedRemoteReqsDuration = 31L, + shuffleBytesWritten = 32L, + shuffleWriteTime = 33L, + shuffleRecordsWritten = 34L + ) + + val negatedMetrics = makeNegative(originalMetrics) + + def expectedNegated(v: Long): Long = v * -1L - 1L + + // Verify all fields are correctly negated + assert(negatedMetrics.executorDeserializeTime === expectedNegated(1L)) + assert(negatedMetrics.executorDeserializeCpuTime === expectedNegated(2L)) + assert(negatedMetrics.executorRunTime === expectedNegated(3L)) + assert(negatedMetrics.executorCpuTime === expectedNegated(4L)) + assert(negatedMetrics.resultSize === expectedNegated(5L)) + assert(negatedMetrics.jvmGcTime === expectedNegated(6L)) + assert(negatedMetrics.resultSerializationTime === expectedNegated(7L)) + assert(negatedMetrics.memoryBytesSpilled === expectedNegated(8L)) + assert(negatedMetrics.diskBytesSpilled === expectedNegated(9L)) + assert(negatedMetrics.peakExecutionMemory === expectedNegated(10L)) + + // Verify input metrics + assert(negatedMetrics.inputMetrics.bytesRead === expectedNegated(11L)) + assert(negatedMetrics.inputMetrics.recordsRead === expectedNegated(12L)) + + // Verify output metrics (these were in wrong position in current master) + assert(negatedMetrics.outputMetrics.bytesWritten === expectedNegated(13L), + "outputMetrics.bytesWritten should be correctly negated") + assert(negatedMetrics.outputMetrics.recordsWritten === expectedNegated(14L), + "outputMetrics.recordsWritten should be correctly negated") + + // Verify shuffle read metrics (these were in wrong position in current master) + assert(negatedMetrics.shuffleReadMetrics.remoteBlocksFetched === expectedNegated(15L), + "shuffleReadMetrics.remoteBlocksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.localBlocksFetched === expectedNegated(16L), + "shuffleReadMetrics.localBlocksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.fetchWaitTime === expectedNegated(17L), + "shuffleReadMetrics.fetchWaitTime should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.remoteBytesRead === expectedNegated(18L), + "shuffleReadMetrics.remoteBytesRead should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.remoteBytesReadToDisk === expectedNegated(19L), + "shuffleReadMetrics.remoteBytesReadToDisk should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.localBytesRead === expectedNegated(20L), + "shuffleReadMetrics.localBytesRead should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.recordsRead === expectedNegated(21L), + "shuffleReadMetrics.recordsRead should be correctly negated") + + // Verify shuffle push read metrics (these were in wrong position in current master) + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks === + expectedNegated(22L), + "shufflePushReadMetrics.corruptMergedBlockChunks should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.mergedFetchFallbackCount === + expectedNegated(23L), + "shufflePushReadMetrics.mergedFetchFallbackCount should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBlocksFetched === + expectedNegated(24L), + "shufflePushReadMetrics.remoteMergedBlocksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBlocksFetched === + expectedNegated(25L), + "shufflePushReadMetrics.localMergedBlocksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedChunksFetched === + expectedNegated(26L), + "shufflePushReadMetrics.remoteMergedChunksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedChunksFetched === + expectedNegated(27L), + "shufflePushReadMetrics.localMergedChunksFetched should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedBytesRead === + expectedNegated(28L), + "shufflePushReadMetrics.remoteMergedBytesRead should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.localMergedBytesRead === + expectedNegated(29L), + "shufflePushReadMetrics.localMergedBytesRead should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.remoteReqsDuration === expectedNegated(30L), + "shuffleReadMetrics.remoteReqsDuration should be correctly negated") + assert(negatedMetrics.shuffleReadMetrics.shufflePushReadMetrics.remoteMergedReqsDuration === + expectedNegated(31L), + "shufflePushReadMetrics.remoteMergedReqsDuration should be correctly negated") + + // Verify shuffle write metrics + assert(negatedMetrics.shuffleWriteMetrics.bytesWritten === expectedNegated(32L)) + assert(negatedMetrics.shuffleWriteMetrics.writeTime === expectedNegated(33L)) + assert(negatedMetrics.shuffleWriteMetrics.recordsWritten === expectedNegated(34L)) + + // Verify zero handling: 0 should become -1 + val zeroMetrics = createMetrics(default = 0L) + val negatedZeroMetrics = makeNegative(zeroMetrics) + assert(negatedZeroMetrics.executorDeserializeTime === -1L, + "Zero value should be converted to -1") + assert(negatedZeroMetrics.inputMetrics.bytesRead === -1L, + "Zero input metric should be converted to -1") + assert(negatedZeroMetrics.outputMetrics.bytesWritten === -1L, + "Zero output metric should be converted to -1") + } + private def checkSize(seq: Seq[_], expected: Int): Unit = { assert(seq.length === expected) var count = 0