From 5525c2011d96929d6143e24cf1abdd9823a9fe26 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 7 Jan 2015 16:25:40 -0500 Subject: [PATCH] Completed refactoring to make vars in TaskMetrics class private --- .../org/apache/spark/executor/Executor.scala | 12 ++--- .../apache/spark/executor/TaskMetrics.scala | 47 ++++++++++++++----- .../org/apache/spark/scheduler/Task.scala | 2 +- .../spark/scheduler/TaskResultGetter.scala | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 +++---- .../ui/jobs/JobProgressListenerSuite.scala | 6 +-- .../apache/spark/util/JsonProtocolSuite.scala | 14 +++--- 7 files changed, 62 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 52de6980ecbf8..d21c63fa53f81 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,10 +206,10 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.executorDeserializeTime = taskStart - deserializeStartTime - m.executorRunTime = taskFinish - taskStart - m.jvmGCTime = gcTime - startGCTime - m.resultSerializationTime = afterSerialization - beforeSerialization + m.incExecutorDeserializeTime(taskStart - deserializeStartTime) + m.incExecutorRunTime(taskFinish - taskStart) + m.incJvmGCTime(gcTime - startGCTime) + m.incResultSerializationTime(afterSerialization - beforeSerialization) } val accumUpdates = Accumulators.values @@ -260,8 +260,8 @@ private[spark] class Executor( val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { - m.executorRunTime = serviceTime - m.jvmGCTime = gcTime - startGCTime + m.incExecutorRunTime(serviceTime) + m.incJvmGCTime(gcTime - startGCTime) } val reason = new ExceptionFailure(t, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 27a1313ad588b..5903c1091e795 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -39,42 +39,67 @@ class TaskMetrics extends Serializable { /** * Host's name the task runs on */ - var hostname: String = _ - + private var _hostname: String = _ + def hostname = _hostname + def setHostname(value : String) = _hostname = value + /** * Time taken on the executor to deserialize this task */ - var executorDeserializeTime: Long = _ - + private var _executorDeserializeTime: Long = _ + def executorDeserializeTime = _executorDeserializeTime + def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value + def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value + + /** * Time the executor spends actually running the task (including fetching shuffle data) */ - var executorRunTime: Long = _ - + private var _executorRunTime: Long = _ + def executorRunTime = _executorRunTime + def incExecutorRunTime(value: Long) = _executorRunTime += value + def decExecutorRunTime(value: Long) = _executorRunTime -= value + /** * The number of bytes this task transmitted back to the driver as the TaskResult */ - var resultSize: Long = _ + private var _resultSize: Long = _ + def resultSize = _resultSize + def incResultSize(value: Long) = _resultSize += value + def decResultSize(value: Long) = _resultSize -= value + /** * Amount of time the JVM spent in garbage collection while executing this task */ - var jvmGCTime: Long = _ + private var _jvmGCTime: Long = _ + def jvmGCTime = _jvmGCTime + def incJvmGCTime(value: Long) = _jvmGCTime += value + def decJvmGCTime(value: Long) = _jvmGCTime -= value /** * Amount of time spent serializing the task result */ - var resultSerializationTime: Long = _ + private var _resultSerializationTime: Long = _ + def resultSerializationTime = _resultSerializationTime + def incResultSerializationTime(value: Long) = _resultSerializationTime += value + def decResultSerializationTime(value: Long) = _resultSerializationTime -= value /** * The number of in-memory bytes spilled by this task */ - var memoryBytesSpilled: Long = _ + private var _memoryBytesSpilled: Long = _ + def memoryBytesSpilled = _memoryBytesSpilled + def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value + def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value /** * The number of on-disk bytes spilled by this task */ - var diskBytesSpilled: Long = _ + private var _diskBytesSpilled: Long = _ + def diskBytesSpilled = _diskBytesSpilled + def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value + def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value /** * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 2552d03d18d06..1eb2739982523 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -47,7 +47,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex final def run(attemptId: Long): T = { context = new TaskContextImpl(stageId, partitionId, attemptId, false) TaskContextHelper.setTaskContext(context) - context.taskMetrics.hostname = Utils.localHostName() + context.taskMetrics.setHostname(Utils.localHostName()) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 819b51e12ad8c..c4f16c783a40a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -76,7 +76,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } - result.metrics.resultSize = size + result.metrics.incResultSize(size) scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 27c4f9f3f46c3..7a7d4efb47aae 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -592,14 +592,14 @@ private[spark] object JsonProtocol { return TaskMetrics.empty } val metrics = new TaskMetrics - metrics.hostname = (json \ "Host Name").extract[String] - metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] - metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] - metrics.resultSize = (json \ "Result Size").extract[Long] - metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long] - metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] - metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] - metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] + metrics.setHostname((json \ "Host Name").extract[String]) + metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long]) + metrics.incResultSize((json \ "Result Size").extract[Long]) + metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long]) + metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long]) + metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) + metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 22c2a65e51061..6389bd13cd85a 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -227,9 +227,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) shuffleWriteMetrics.incShuffleBytesWritten(base + 3) - taskMetrics.executorRunTime = base + 4 - taskMetrics.diskBytesSpilled = base + 5 - taskMetrics.memoryBytesSpilled = base + 6 + taskMetrics.incExecutorRunTime(base + 4) + taskMetrics.incDiskBytesSpilled(base + 5) + taskMetrics.incMemoryBytesSpilled(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.inputMetrics = Some(inputMetrics) inputMetrics.incBytesRead(base + 7) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 589de887cc1c8..c0fe9b80a8e44 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -623,13 +623,13 @@ class JsonProtocolSuite extends FunSuite { hasHadoopInput: Boolean, hasOutput: Boolean) = { val t = new TaskMetrics - t.hostname = "localhost" - t.executorDeserializeTime = a - t.executorRunTime = b - t.resultSize = c - t.jvmGCTime = d - t.resultSerializationTime = a + b - t.memoryBytesSpilled = a + c + t.setHostname("localhost") + t.incExecutorDeserializeTime(a) + t.incExecutorRunTime(b) + t.incResultSize(c) + t.incJvmGCTime(d) + t.incResultSerializationTime(a + b) + t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)