From 5e189c6f9347dab9aa8fada32117b6c74780dc06 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 12 May 2015 15:17:11 -0700 Subject: [PATCH] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. --- .../shuffle/unsafe/UnsafeShuffleWriter.java | 3 ++- .../TimeTrackingOutputStream.java | 25 +++++++++++-------- .../spark/storage/BlockObjectWriter.scala | 24 +++--------------- 3 files changed, 20 insertions(+), 32 deletions(-) rename core/src/main/java/org/apache/spark/{shuffle/unsafe => storage}/TimeTrackingOutputStream.java (75%) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index f097af35d87f8..9c288dc7e8f77 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -50,6 +50,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager; import org.apache.spark.shuffle.ShuffleWriter; import org.apache.spark.storage.BlockManager; +import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.memory.TaskMemoryManager; @@ -301,7 +302,7 @@ private long[] mergeSpillsWithFileStream( for (int partition = 0; partition < numPartitions; partition++) { final long initialFileLength = outputFile.length(); mergedFileOutputStream = - new TimeTrackingFileOutputStream(writeMetrics, new FileOutputStream(outputFile, true)); + new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true)); if (compressionCodec != null) { mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream); } diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/TimeTrackingOutputStream.java b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java similarity index 75% rename from core/src/main/java/org/apache/spark/shuffle/unsafe/TimeTrackingOutputStream.java rename to core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java index 8b5ba49e67204..0cd3c7d242660 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/TimeTrackingOutputStream.java +++ b/core/src/main/java/org/apache/spark/storage/TimeTrackingOutputStream.java @@ -15,25 +15,23 @@ * limitations under the License. */ -package org.apache.spark.shuffle.unsafe; +package org.apache.spark.storage; -import org.apache.spark.executor.ShuffleWriteMetrics; - -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import org.apache.spark.executor.ShuffleWriteMetrics; + /** - * Intercepts write calls and tracks total time spent writing. + * Intercepts write calls and tracks total time spent writing in order to update shuffle write + * metrics. Not thread safe. */ -final class TimeTrackingFileOutputStream extends OutputStream { +public final class TimeTrackingOutputStream extends OutputStream { private final ShuffleWriteMetrics writeMetrics; - private final FileOutputStream outputStream; + private final OutputStream outputStream; - public TimeTrackingFileOutputStream( - ShuffleWriteMetrics writeMetrics, - FileOutputStream outputStream) { + public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream outputStream) { this.writeMetrics = writeMetrics; this.outputStream = outputStream; } @@ -49,7 +47,8 @@ public void write(int b) throws IOException { public void write(byte[] b) throws IOException { final long startTime = System.nanoTime(); outputStream.write(b); - writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); } + writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); + } @Override public void write(byte[] b, int off, int len) throws IOException { @@ -60,11 +59,15 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public void flush() throws IOException { + final long startTime = System.nanoTime(); outputStream.flush(); + writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); } @Override public void close() throws IOException { + final long startTime = System.nanoTime(); outputStream.close(); + writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 8bc4e205bc3c6..a33f22ef52687 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -86,16 +86,6 @@ private[spark] class DiskBlockObjectWriter( extends BlockObjectWriter(blockId) with Logging { - /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ - private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { - override def write(i: Int): Unit = callWithTiming(out.write(i)) - override def write(b: Array[Byte]): Unit = callWithTiming(out.write(b)) - override def write(b: Array[Byte], off: Int, len: Int): Unit = { - callWithTiming(out.write(b, off, len)) - } - override def close(): Unit = out.close() - override def flush(): Unit = out.flush() - } /** The file channel, used for repositioning / truncating the file. */ private var channel: FileChannel = null @@ -136,7 +126,7 @@ private[spark] class DiskBlockObjectWriter( throw new IllegalStateException("Writer already closed. Cannot be reopened.") } fos = new FileOutputStream(file, true) - ts = new TimeTrackingOutputStream(fos) + ts = new TimeTrackingOutputStream(writeMetrics, fos) channel = fos.getChannel() bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializerInstance.serializeStream(bs) @@ -150,9 +140,9 @@ private[spark] class DiskBlockObjectWriter( if (syncWrites) { // Force outstanding writes to disk and track how long it takes objOut.flush() - callWithTiming { - fos.getFD.sync() - } + val start = System.nanoTime() + fos.getFD.sync() + writeMetrics.incShuffleWriteTime(System.nanoTime() - start) } } { objOut.close() @@ -251,12 +241,6 @@ private[spark] class DiskBlockObjectWriter( reportedPosition = pos } - private def callWithTiming(f: => Unit) = { - val start = System.nanoTime() - f - writeMetrics.incShuffleWriteTime(System.nanoTime() - start) - } - // For testing private[spark] override def flush() { objOut.flush()