Skip to content

Commit

Permalink
Partially updated task metrics to make some vars private
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jan 7, 2015
1 parent b004150 commit c64da4f
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 48 deletions.
60 changes: 44 additions & 16 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* Total bytes read.
*/
var bytesRead: Long = 0L
private var _bytesRead: Long = _
def bytesRead = _bytesRead
def incBytesRead(value: Long) = _bytesRead += value
def decBytesRead(value: Long) = _bytesRead -= value
}

/**
Expand All @@ -194,7 +197,10 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
var bytesWritten: Long = 0L
private var _bytesWritten: Long = _
def bytesWritten = _bytesWritten
def incBytesWritten(value : Long) = _bytesWritten += value
def decBytesWritten(value : Long) = _bytesWritten -= value
}

/**
Expand All @@ -203,32 +209,48 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched

/**
* Number of remote blocks fetched in this shuffle by this task
*/
var remoteBlocksFetched: Int = _

private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched = _remoteBlocksFetched
def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value

/**
* Number of local blocks fetched in this shuffle by this task
*/
var localBlocksFetched: Int = _
private var _localBlocksFetched: Int = _
def localBlocksFetched = _localBlocksFetched
def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value


/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
var fetchWaitTime: Long = _

private var _fetchWaitTime: Long = _
def fetchWaitTime = _fetchWaitTime
def incFetchWaitTime(value: Long) = _fetchWaitTime += value
def decFetchWaitTime(value: Long) = _fetchWaitTime -= value

/**
* Total number of remote bytes read from the shuffle by this task
*/
var remoteBytesRead: Long = _
private var _remoteBytesRead: Long = _
def remoteBytesRead = _remoteBytesRead
def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value

/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
def totalBlocksFetched = _totalBlocksFetched
def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value
def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value
}

/**
Expand All @@ -240,10 +262,16 @@ class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
@volatile var shuffleBytesWritten: Long = _

@volatile private var _shuffleBytesWritten: Long = _
def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
def shuffleBytesWritten = _shuffleBytesWritten
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile var shuffleWriteTime: Long = _
@volatile private var _shuffleWriteTime: Long = _
def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
def shuffleWriteTime= _shuffleWriteTime

}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class HadoopRDD[K, V](
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.incBytesRead(bytesReadFn())
} else {
recordsSinceMetricsUpdate += 1
}
Expand All @@ -264,12 +264,12 @@ class HadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.incBytesRead(bytesReadFn())
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.inputSplit.value.getLength
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
context.taskMetrics.inputMetrics = Some(inputMetrics)
} catch {
case e: java.io.IOException =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class NewHadoopRDD[K, V](
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.incBytesRead(bytesReadFn())
} else {
recordsSinceMetricsUpdate += 1
}
Expand All @@ -174,12 +174,12 @@ class NewHadoopRDD[K, V](
// Update metrics with final amount
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.incBytesRead(bytesReadFn())
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
context.taskMetrics.inputMetrics = Some(inputMetrics)
} catch {
case e: java.io.IOException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
1
} : Int

Expand Down Expand Up @@ -1072,7 +1072,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.close()
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
}

self.context.runJob(self, writeToFile)
Expand All @@ -1095,7 +1095,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
bytesWrittenCallback.foreach { fn => outputMetrics.incBytesWritten(fn()) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.incBytesRead(bytes)
}

/**
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,31 +621,31 @@ private[spark] object JsonProtocol {

def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
metrics.remoteBytesRead = (json \ "Remote Bytes Read").extract[Long]
metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int])
metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
metrics
}

def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
val metrics = new ShuffleWriteMetrics
metrics.shuffleBytesWritten = (json \ "Shuffle Bytes Written").extract[Long]
metrics.shuffleWriteTime = (json \ "Shuffle Write Time").extract[Long]
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
metrics
}

def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.incBytesRead((json \ "Bytes Read").extract[Long])
metrics
}

def outputMetricsFromJson(json: JValue): OutputMetrics = {
val metrics = new OutputMetrics(
DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
metrics.incBytesWritten((json \ "Bytes Written").extract[Long])
metrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToData.size === 0)

// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
shuffleReadMetrics.incRemoteBytesRead(1000)
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
Expand Down Expand Up @@ -224,18 +224,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val shuffleWriteMetrics = new ShuffleWriteMetrics()
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
shuffleReadMetrics.remoteBytesRead = base + 1
shuffleReadMetrics.remoteBlocksFetched = base + 2
shuffleWriteMetrics.shuffleBytesWritten = base + 3
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.executorRunTime = base + 4
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
inputMetrics.incBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
outputMetrics.bytesWritten = base + 8
outputMetrics.incBytesWritten(base + 8)
taskMetrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,24 +633,24 @@ class JsonProtocolSuite extends FunSuite {

if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
inputMetrics.bytesRead = d + e + f
inputMetrics.incBytesRead(d + e + f)
t.inputMetrics = Some(inputMetrics)
} else {
val sr = new ShuffleReadMetrics
sr.remoteBytesRead = b + d
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
sr.remoteBlocksFetched = f
sr.incRemoteBytesRead(b + d)
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.bytesWritten = a + b + c
outputMetrics.incBytesWritten(a + b + c)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
sw.incShuffleBytesWritten(a + b + c)
sw.incShuffleWriteTime(b + c + d)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
Expand Down

0 comments on commit c64da4f

Please sign in to comment.