Skip to content

Commit

Permalink
Completed refactoring to make vars in TaskMetrics class private
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jan 7, 2015
1 parent c64da4f commit 5525c20
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 37 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
m.incExecutorDeserializeTime(taskStart - deserializeStartTime)
m.incExecutorRunTime(taskFinish - taskStart)
m.incJvmGCTime(gcTime - startGCTime)
m.incResultSerializationTime(afterSerialization - beforeSerialization)
}

val accumUpdates = Accumulators.values
Expand Down Expand Up @@ -260,8 +260,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
m.incExecutorRunTime(serviceTime)
m.incJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
Expand Down
47 changes: 36 additions & 11 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,67 @@ class TaskMetrics extends Serializable {
/**
* Host's name the task runs on
*/
var hostname: String = _

private var _hostname: String = _
def hostname = _hostname
def setHostname(value : String) = _hostname = value

/**
* Time taken on the executor to deserialize this task
*/
var executorDeserializeTime: Long = _

private var _executorDeserializeTime: Long = _
def executorDeserializeTime = _executorDeserializeTime
def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value
def decExecutorDeserializeTime(value: Long) = _executorDeserializeTime -= value


/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
var executorRunTime: Long = _

private var _executorRunTime: Long = _
def executorRunTime = _executorRunTime
def incExecutorRunTime(value: Long) = _executorRunTime += value
def decExecutorRunTime(value: Long) = _executorRunTime -= value

/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
var resultSize: Long = _
private var _resultSize: Long = _
def resultSize = _resultSize
def incResultSize(value: Long) = _resultSize += value
def decResultSize(value: Long) = _resultSize -= value


/**
* Amount of time the JVM spent in garbage collection while executing this task
*/
var jvmGCTime: Long = _
private var _jvmGCTime: Long = _
def jvmGCTime = _jvmGCTime
def incJvmGCTime(value: Long) = _jvmGCTime += value
def decJvmGCTime(value: Long) = _jvmGCTime -= value

/**
* Amount of time spent serializing the task result
*/
var resultSerializationTime: Long = _
private var _resultSerializationTime: Long = _
def resultSerializationTime = _resultSerializationTime
def incResultSerializationTime(value: Long) = _resultSerializationTime += value
def decResultSerializationTime(value: Long) = _resultSerializationTime -= value

/**
* The number of in-memory bytes spilled by this task
*/
var memoryBytesSpilled: Long = _
private var _memoryBytesSpilled: Long = _
def memoryBytesSpilled = _memoryBytesSpilled
def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value

/**
* The number of on-disk bytes spilled by this task
*/
var diskBytesSpilled: Long = _
private var _diskBytesSpilled: Long = _
def diskBytesSpilled = _diskBytesSpilled
def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value

/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
final def run(attemptId: Long): T = {
context = new TaskContextImpl(stageId, partitionId, attemptId, false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.hostname = Utils.localHostName()
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}

result.metrics.resultSize = size
result.metrics.incResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,14 @@ private[spark] object JsonProtocol {
return TaskMetrics.empty
}
val metrics = new TaskMetrics
metrics.hostname = (json \ "Host Name").extract[String]
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
metrics.executorRunTime = (json \ "Executor Run Time").extract[Long]
metrics.resultSize = (json \ "Result Size").extract[Long]
metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long]
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
metrics.setHostname((json \ "Host Name").extract[String])
metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.incExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.incResultSize((json \ "Result Size").extract[Long])
metrics.incJvmGCTime((json \ "JVM GC Time").extract[Long])
metrics.incResultSerializationTime((json \ "Result Serialization Time").extract[Long])
metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
metrics.setShuffleReadMetrics(
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.executorRunTime = base + 4
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
taskMetrics.incExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.incBytesRead(base + 7)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,13 @@ class JsonProtocolSuite extends FunSuite {
hasHadoopInput: Boolean,
hasOutput: Boolean) = {
val t = new TaskMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
t.executorRunTime = b
t.resultSize = c
t.jvmGCTime = d
t.resultSerializationTime = a + b
t.memoryBytesSpilled = a + c
t.setHostname("localhost")
t.incExecutorDeserializeTime(a)
t.incExecutorRunTime(b)
t.incResultSize(c)
t.incJvmGCTime(d)
t.incResultSerializationTime(a + b)
t.incMemoryBytesSpilled(a + c)

if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
Expand Down

0 comments on commit 5525c20

Please sign in to comment.