Skip to content

Commit

Permalink
Replace set with register
Browse files Browse the repository at this point in the history
JsonProtocol remains the only place where we still call set
on each of the *Metrics classes.
  • Loading branch information
Andrew Or committed Jan 18, 2016
1 parent b9d7fbf commit 0785984
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public BypassMergeSortShuffleWriter(
this.shuffleId = dep.shuffleId();
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.shuffleBlockResolver = shuffleBlockResolver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public UnsafeShuffleWriter(
this.shuffleId = dep.shuffleId();
this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
this.partitioner = dep.partitioner();
this.writeMetrics = new ShuffleWriteMetrics();
taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ private[spark] class HashShuffleWriter[K, V](
// we don't try deleting files, etc twice.
private var stopping = false

private val writeMetrics = new ShuffleWriteMetrics()
metrics.shuffleWriteMetrics = Some(writeMetrics)
private val writeMetrics = metrics.registerShuffleWriteMetrics()

private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private var mapStatus: MapStatus = null

private val writeMetrics = new ShuffleWriteMetrics()
context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)
private val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
Expand Down Expand Up @@ -93,8 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C](
if (sorter != null) {
val startTime = System.nanoTime()
sorter.stop()
context.taskMetrics.shuffleWriteMetrics.foreach(
_.incShuffleWriteTime(System.nanoTime - startTime))
writeMetrics.incShuffleWriteTime(System.nanoTime - startTime)
sorter = null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ private[spark] class ExternalSorter[K, V, C](
blockId: BlockId,
outputFile: File): Array[Long] = {

val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics()

// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)

Expand All @@ -652,8 +654,8 @@ private[spark] class ExternalSorter[K, V, C](
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics.shuffleWriteMetrics.get)
val writer = blockManager.getDiskWriter(
blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
Expand All @@ -666,8 +668,8 @@ private[spark] class ExternalSorter[K, V, C](
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics.shuffleWriteMetrics.get)
val writer = blockManager.getDiskWriter(
blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)

// finish this task, should get updated shuffleRead
shuffleReadMetrics.incRemoteBytesRead(1000)
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.mergeShuffleReadMetrics()
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0)
Expand Down Expand Up @@ -270,19 +270,17 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with

def makeTaskMetrics(base: Int): TaskMetrics = {
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
val shuffleWriteMetrics = new ShuffleWriteMetrics()
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
taskMetrics.mergeShuffleReadMetrics()
val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
shuffleWriteMetrics.incShuffleBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.setInputMetrics(Some(inputMetrics))
val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.incBytesRead(base + 7)
val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(base + 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,31 +773,29 @@ class JsonProtocolSuite extends SparkFunSuite {
t.incMemoryBytesSpilled(a + c)

if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.incBytesRead(d + e + f)
inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
sr.incLocalBytesRead(a + f)
t.setShuffleReadMetrics(Some(sr))
t.mergeShuffleReadMetrics()
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(a + b + c)
outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
val sw = t.registerShuffleWriteMetrics()
sw.incShuffleBytesWritten(a + b + c)
sw.incShuffleWriteTime(b + c + d)
sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
assert(sorter.numSpills > 0)

// Merging spilled files should not throw assertion error
taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics)
sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile)
} {
// Clean up
Expand Down

0 comments on commit 0785984

Please sign in to comment.