Skip to content

Commit

Permalink
Attempt to implement proper shuffle write metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent d4e6d89 commit 4f0b770
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 52 deletions.
10 changes: 10 additions & 0 deletions core/pom.xml
Expand Up @@ -361,6 +361,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;

import org.apache.spark.executor.ShuffleWriteMetrics;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* Intercepts write calls and tracks total time spent writing.
*/
final class TimeTrackingFileOutputStream extends OutputStream {

private final ShuffleWriteMetrics writeMetrics;
private final FileOutputStream outputStream;

public TimeTrackingFileOutputStream(
ShuffleWriteMetrics writeMetrics,
FileOutputStream outputStream) {
this.writeMetrics = writeMetrics;
this.outputStream = outputStream;
}

@Override
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); }

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
outputStream.flush();

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Why don't flush and close have tracking?

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

I was modeling this after DiskBlockObjectWriter's TimeTrackingOutputStream, which doesn't track time there:

  /** Intercepts write calls and tracks total time spent writing. Not thread safe. */
  private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
    override def write(i: Int): Unit = callWithTiming(out.write(i))
    override def write(b: Array[Byte]): Unit = callWithTiming(out.write(b))
    override def write(b: Array[Byte], off: Int, len: Int): Unit = {
      callWithTiming(out.write(b, off, len))
    }
    override def close(): Unit = out.close()
    override def flush(): Unit = out.flush()
  }

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Ah ok good point -- I was mis-remembering and thinking those close / flush calls got timed at some point. I wonder if we should just time these, to be safe? It seems like it won't add any significant overhead, because they each only get called once. FileOutputStream.flush() does nothing, but close() does manipulate the file descriptor.

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

Sounds reasonable. I'll also update TimeTrackingOutputStream to do the same thing.

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

Fixed this in 5e189c6 .

}

@Override
public void close() throws IOException {
outputStream.close();
}
}
Expand Up @@ -70,6 +70,7 @@ final class UnsafeShuffleExternalSorter {
private final BlockManager blockManager;
private final TaskContext taskContext;
private final boolean spillingEnabled;
private final ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSize;
Expand Down Expand Up @@ -97,7 +98,8 @@ public UnsafeShuffleExternalSorter(
TaskContext taskContext,
int initialSize,
int numPartitions,
SparkConf conf) throws IOException {
SparkConf conf,
ShuffleWriteMetrics writeMetrics) throws IOException {
this.memoryManager = memoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.blockManager = blockManager;
Expand All @@ -107,6 +109,7 @@ public UnsafeShuffleExternalSorter(
this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.writeMetrics = writeMetrics;
openSorter();
}

Expand All @@ -131,8 +134,24 @@ private void openSorter() throws IOException {
/**
* Sorts the in-memory records and writes the sorted records to a spill file.
* This method does not free the sort data structures.
*
* @param isSpill if true, this indicates that we're writing a spill and that bytes written should
* be counted towards shuffle spill metrics rather than shuffle write metrics.
*/
private SpillInfo writeSpillFile() throws IOException {
private void writeSpillFile(boolean isSpill) throws IOException {

final ShuffleWriteMetrics writeMetricsToUse;

if (isSpill) {
// We're spilling, so bytes written should be counted towards spill rather than write.
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
// them towards shuffle bytes written.
writeMetricsToUse = new ShuffleWriteMetrics();
} else {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
}

// This call performs the actual sort.
final UnsafeShuffleSorter.UnsafeShuffleSorterIterator sortedRecords =
sorter.getSortedIterator();
Expand Down Expand Up @@ -161,17 +180,8 @@ private SpillInfo writeSpillFile() throws IOException {
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
// TODO: audit the metrics-related code and ensure proper metrics integration:
// It's not clear how we should handle shuffle write metrics for spill files; currently, Spark
// doesn't report IO time spent writing spill files (see SPARK-7413). This method,
// writeSpillFile(), is called both when writing spill files and when writing the single output
// file in cases where we didn't spill. As a result, we don't necessarily know whether this
// should be reported as bytes spilled or as shuffle bytes written. We could defer the updating
// of these metrics until the end of the shuffle write, but that would mean that that users
// wouldn't get useful metrics updates in the UI from long-running tasks. Given this complexity,
// I'm deferring these decisions to a separate follow-up commit or patch.
writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics());

writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);

int currentPartition = -1;
while (sortedRecords.hasNext()) {
Expand All @@ -185,8 +195,7 @@ private SpillInfo writeSpillFile() throws IOException {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
}
currentPartition = partition;
writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics());
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);
}

final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
Expand Down Expand Up @@ -220,7 +229,14 @@ private SpillInfo writeSpillFile() throws IOException {
spills.add(spillInfo);
}
}
return spillInfo;

if (isSpill) {
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Why do you increase the shuffle records written for spilled data?

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

The original rationale had something to do with metrics reporting for in-progress tasks, but I think there's a cleaner way to handle this that avoids this confusing line; let me go ahead and refactor this. I was trying to avoid some edge-cases in DiskBlockObjectWriter's own incrementing of this counter for certain writes, but I don't think that this will actually happen here (I have a more detailed explanation of the problem I was trying to avoid, but I think it's moot to go into the details if I can just fix it more cleanly elsewhere).

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Cool sounds good!

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

Ah, I finally remember why I did this: DiskBlockObjectWriter won't update its bytesWritten metrics during writes unless we also increment its records written counter. It appears that the current semantics of recordWritten() are that it's set once the record hits disk, not when the record enters the sort code.

// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
// writeMetrics.incShuffleWriteTime(writeMetricsToUse.shuffleWriteTime());

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Kill this last line?

taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
}
}

/**
Expand All @@ -233,13 +249,12 @@ void spill() throws IOException {
org.apache.spark.util.Utils.bytesToString(getMemoryUsage()) + " to disk (" +
(spills.size() + (spills.size() > 1 ? " times" : " time")) + " so far)");

final SpillInfo spillInfo = writeSpillFile();
writeSpillFile(true);
final long sorterMemoryUsage = sorter.getMemoryUsage();
sorter = null;
shuffleMemoryManager.release(sorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(spillInfo.file.length());

openSorter();
}
Expand Down Expand Up @@ -389,7 +404,8 @@ public void insertRecord(
public SpillInfo[] closeAndGetSpills() throws IOException {
try {
if (sorter != null) {
writeSpillFile();
// Do not count the final file towards the spill count.
writeSpillFile(false);
freeMemory();
}
return spills.toArray(new SpillInfo[spills.size()]);
Expand Down
Expand Up @@ -157,7 +157,8 @@ private void open() throws IOException {
taskContext,
INITIAL_SORT_BUFFER_SIZE,
partitioner.numPartitions(),
sparkConf);
sparkConf,
writeMetrics);
serBuffer = new MyByteArrayOutputStream(1024 * 1024);
serOutputStream = serializer.serializeStream(serBuffer);
}
Expand Down Expand Up @@ -210,6 +211,12 @@ void forceSorterToSpill() throws IOException {
sorter.spill();
}

/**
* Merge zero or more spill files together, choosing the fastest merging strategy based on the
* number of spills and the IO compression codec.
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
Expand All @@ -223,30 +230,42 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Note: we'll have to watch out for corner-cases in this code path when working on shuffle
// metrics integration, since any metrics updates that are performed during the merge will
// also have to be done here. In this branch, the shuffle technically didn't need to spill
// because we're only trying to merge one file, so we may need to ensure that metrics that
// would otherwise be counted as spill metrics are actually counted as regular write
// metrics.
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
Files.move(spills[0].file, outputFile);
return spills[0].partitionLengths;
} else {
final long[] partitionLengths;
// There are multiple spills to merge, so none of these spill files' lengths were counted
// towards our shuffle write count or shuffle write time. If we use the slow merge path,
// then the final output file's size won't necessarily be equal to the sum of the spill
// files' sizes. To guard against this case, we look at the output file's actual size when
// computing shuffle bytes written.
//
// We allow the individual merge methods to report their own IO times since different merge
// strategies use different IO techniques. We count IO during merge towards the shuffle
// shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
// branch in ExternalSorter.
if (fastMergeEnabled && fastMergeIsSupported) {
// Compression is disabled or we are using an IO compression codec that supports
// decompression of concatenated compressed streams, so we can perform a fast spill merge
// that doesn't need to interpret the spilled bytes.
if (transferToEnabled) {
logger.debug("Using transferTo-based fast merge");
return mergeSpillsWithTransferTo(spills, outputFile);
partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
} else {
logger.debug("Using fileStream-based fast merge");
return mergeSpillsWithFileStream(spills, outputFile, null);
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
}
} else {
logger.debug("Using slow merge");
return mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
}
// The final shuffle spill's write would have directly updated shuffleBytesWritten, so

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Why does the spill's write update shuffleBytesWritten? Is this for consistency with current behavior? (this seems hacky)

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

This is a little hacky. When closing an UnsafeShuffleExternalSorter that has already spilled once but has buffered in-memory records, we write out the in-memory records to an on-disk file but do not count that final write as bytes spilled (instead, it's accounted as shuffle write). The final merge needs to be counted as shuffle write, but this will lead to double-counting of the final SpillInfo's bytes (I'm in the process of renaming SpillInfo to clarify that it is also used for non-spill cases (maybe ChunkInfo or PartitionedFileInfo or something)).

In the case where we've never spilled, we go through the non-merge if branch above, which doesn't perform any metrics updates because those writes would have already been recorded in UnsafeExternalShuffleSorter.close().

This is a little convoluted since I've been trying to balance the competing goals of having the metrics update logic consolidated in one place vs. getting good incremental metrics updates for in-progress tasks. I'll see if I can find a good way to clean this up or add better comments.

// we need to decrement to avoid double-counting this write.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
if (outputFile.exists() && !outputFile.delete()) {
Expand All @@ -271,7 +290,8 @@ private long[] mergeSpillsWithFileStream(
}
for (int partition = 0; partition < numPartitions; partition++) {
final long initialFileLength = outputFile.length();
mergedFileOutputStream = new FileOutputStream(outputFile, true);
mergedFileOutputStream =
new TimeTrackingFileOutputStream(writeMetrics, new FileOutputStream(outputFile, true));

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

Does it make sense for TimeTrackingFileOutputStream to just take writeMetrics and outputFile as constructor parameters, and construct the FileOutputStream itself?

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen May 12, 2015

Author Contributor

From a dependency-injection standpoint, I don't think that it makes sense to pass outputFile into TimeTrackingOutputStream's constructor when it really only needs a FileOutputStream to accomplish its work (the only use of outputFile would be to pass it downstream into another constructor, which seems a little odd).

This comment has been minimized.

Copy link
@kayousterhout

kayousterhout May 12, 2015

Contributor

ok

if (compressionCodec != null) {
mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
}
Expand Down Expand Up @@ -321,6 +341,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
long bytesToTransfer = partitionLengthInSpill;
final FileChannel spillInputChannel = spillInputChannels[i];
final long writeStartTime = System.nanoTime();
while (bytesToTransfer > 0) {
final long actualBytesTransferred = spillInputChannel.transferTo(
spillInputChannelPositions[i],
Expand All @@ -329,6 +350,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down

0 comments on commit 4f0b770

Please sign in to comment.