From de40b9d7109536feceed1aff219f1cb7a2da64a8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 12 May 2015 15:38:57 -0700 Subject: [PATCH] More comments to try to explain metrics code --- .../apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index 9c288dc7e8f77..22791965dd962 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -258,8 +258,11 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { logger.debug("Using slow merge"); partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec); } - // The final shuffle spill's write would have directly updated shuffleBytesWritten, so - // we need to decrement to avoid double-counting this write. + // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has + // in-memory records, we write out the in-memory records to a file but do not count that + // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs + // to be counted as shuffle write, but this will lead to double-counting of the final + // SpillInfo's bytes. writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length()); writeMetrics.incShuffleBytesWritten(outputFile.length()); return partitionLengths;