Skip to content

Commit

Permalink
Debugging tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jan 10, 2015
1 parent 17146c2 commit 26b312b
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 31 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down Expand Up @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private[spark] class PythonRDD(
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
metrics.incJvmGCTime(curGCTime - taskRunner.startGCTime)
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ class TaskMetrics extends Serializable {
private[spark] def updateShuffleReadMetrics() = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.fetchWaitTime += depMetrics.fetchWaitTime
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
}
_shuffleReadMetrics = Some(merged)
}
Expand Down Expand Up @@ -272,10 +272,7 @@ class ShuffleReadMetrics extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
def totalBlocksFetched = _totalBlocksFetched
def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value
def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C](
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =>
aggregatedIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter(
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)

if (initialized) {
objOut.flush()
Expand Down Expand Up @@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
reportedPosition = pos
}

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
writeMetrics.shuffleWriteTime += (System.nanoTime() - start)
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
}

// For testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator(
// This needs to be released after use.
buf.retain()
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
}
logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
Expand Down Expand Up @@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator(
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
shuffleMetrics.incLocalBlocksFetched(1)
buf.retain()
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
Expand Down Expand Up @@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator(
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)

result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ private[spark] class ExternalSorter[K, V, C](
}
}

context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
if (curWriteMetrics != null) {
m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten
m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime
m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
}
}

Expand Down

0 comments on commit 26b312b

Please sign in to comment.