Skip to content

Commit

Permalink
[SPARK-12885][MINOR] Rename 3 fields in ShuffleWriteMetrics
Browse files Browse the repository at this point in the history
This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have:

```
inputMetrics.recordsRead
outputMetrics.bytesWritten
shuffleReadMetrics.localBlocksFetched
...
shuffleWriteMetrics.shuffleRecordsWritten
shuffleWriteMetrics.shuffleBytesWritten
shuffleWriteMetrics.shuffleWriteTime
```

The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names.

Parent PR: #10717

Author: Andrew Or <andrew@databricks.com>

Closes #10811 from andrewor14/rename-things.
  • Loading branch information
Andrew Or authored and JoshRosen committed Jan 19, 2016
1 parent 323d51f commit 2b5d11f
Show file tree
Hide file tree
Showing 24 changed files with 126 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
Expand Down Expand Up @@ -203,7 +203,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
Expand Down Expand Up @@ -411,7 +411,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {

/**
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
@volatile private var _bytesWritten: Long = _
def bytesWritten: Long = _bytesWritten
private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
@volatile private var _writeTime: Long = _
def writeTime: Long = _writeTime
private[spark] def incWriteTime(value: Long) = _writeTime += value
private[spark] def decWriteTime(value: Long) = _writeTime -= value

/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
@volatile private var _recordsWritten: Long = _
def recordsWritten: Long = _recordsWritten
private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
def shuffleBytesWritten: Long = bytesWritten
@deprecated("use writeTime instead", "2.0.0")
def shuffleWriteTime: Long = writeTime
@deprecated("use recordsWritten instead", "2.0.0")
def shuffleRecordsWritten: Long = recordsWritten

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {

// Shuffle write
showBytesDistribution("shuffle bytes written:",
(_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
(_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)

// Fetch & I/O
showMillisDistribution("fetch wait time:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
writeMetrics.incWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
shuffleState.completedMapTasks.add(mapId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val startTime = System.nanoTime()
sorter.stop()
context.taskMetrics.shuffleWriteMetrics.foreach(
_.incShuffleWriteTime(System.nanoTime - startTime))
_.incWriteTime(System.nanoTime - startTime))
sorter = null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
raw.shuffleWriteMetrics
}
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.shuffleBytesWritten),
writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
writeTime = submetricQuantiles(_.shuffleWriteTime)
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption

Expand Down Expand Up @@ -283,9 +283,9 @@ private[v1] object AllStagesResource {

def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
bytesWritten = internal.shuffleBytesWritten,
writeTime = internal.shuffleWriteTime,
recordsWritten = internal.shuffleRecordsWritten
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
writeMetrics.incWriteTime(System.nanoTime() - start)
}
} {
objOut.close()
Expand Down Expand Up @@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
writeMetrics.incBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
Expand All @@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
// truncating the file to its initial position.
try {
if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
writeMetrics.decBytesWritten(reportedPosition - initialPosition)
writeMetrics.decRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down Expand Up @@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)
writeMetrics.incRecordsWritten(1)

if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
Expand All @@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)

val shuffleWriteDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta

val shuffleWriteRecordsDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantiles(shuffleReadRemoteSizes)

val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}

val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}

val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
Expand Down Expand Up @@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
val shuffleWriteTime =
(metricsOpt.flatMap(_.shuffleWriteMetrics
.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong
.map(_.writeTime)).getOrElse(0L) / 1e6).toLong
val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)

val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
Expand Down Expand Up @@ -930,13 +930,13 @@ private[ui] class TaskDataSource(
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")

val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
val shuffleWriteReadable = maybeShuffleWrite
.map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
.map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
val shuffleWriteRecords = maybeShuffleWrite
.map(_.shuffleRecordsWritten.toString).getOrElse("")
.map(_.recordsWritten.toString).getOrElse("")

val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
val writeTimeSortable = maybeWriteTime.getOrElse(0L)
val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
if (ms == 0) "" else UIUtils.formatDuration(ms)
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,11 @@ private[spark] object JsonProtocol {
("Total Records Read" -> shuffleReadMetrics.recordsRead)
}

// TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = {
("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~
("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~
("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten)
("Shuffle Bytes Written" -> shuffleWriteMetrics.bytesWritten) ~
("Shuffle Write Time" -> shuffleWriteMetrics.writeTime) ~
("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten)
}

def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
Expand Down Expand Up @@ -752,9 +753,9 @@ private[spark] object JsonProtocol {

def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = {
val metrics = new ShuffleWriteMetrics
metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long])
metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written")
metrics.incBytesWritten((json \ "Shuffle Bytes Written").extract[Long])
metrics.incWriteTime((json \ "Shuffle Write Time").extract[Long])
metrics.setRecordsWritten((json \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0))
metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
batchSizes.append(curWriteMetrics.shuffleBytesWritten)
_diskBytesSpilled += curWriteMetrics.bytesWritten
batchSizes.append(curWriteMetrics.bytesWritten)
objectsWritten = 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += spillMetrics.shuffleBytesWritten
batchSizes.append(spillMetrics.shuffleBytesWritten)
_diskBytesSpilled += spillMetrics.bytesWritten
batchSizes.append(spillMetrics.bytesWritten)
spillMetrics = null
objectsWritten = 0
}
Expand Down
Loading

0 comments on commit 2b5d11f

Please sign in to comment.