Skip to content

Commit

Permalink
Attempt to clarify confusing metrics update code
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent 5e189c6 commit df07699
Showing 1 changed file with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
}
// TODO: add a test that detects whether we leave this call out:
writer.recordWritten();
}

Expand All @@ -229,11 +228,23 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
}
}

if (!isLastFile) {
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
// relies on its `recordWritten()` method being called in order to trigger periodic updates to
// `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
// counter at a higher-level, then the in-progress metrics for records written and bytes
// written would get out of sync.
//
// When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
// in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
// metrics to the true write metrics here. The reason for performing this copying is so that
// we can avoid reporting spilled bytes as shuffle write bytes.
//
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
// writeMetrics.incShuffleWriteTime(writeMetricsToUse.shuffleWriteTime());
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
}
}
Expand Down

0 comments on commit df07699

Please sign in to comment.