From 5a599066b59c1b0ff0df5e2ded837a4d8764e1fe Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 00:23:24 -0800 Subject: [PATCH 1/5] [SPARK-5762] Fix shuffle write time for sort-based shuffle --- .../org/apache/spark/util/collection/ExternalSorter.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eaec5a71e6819..17d5196a72b5b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -723,6 +723,7 @@ private[spark] class ExternalSorter[K, V, C]( partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null + val writeStartTime = System.currentTimeMillis try { out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { @@ -740,6 +741,8 @@ private[spark] class ExternalSorter[K, V, C]( in.close() } } + context.taskMetrics.shuffleWriteMetrics.map( + _.incShuffleWriteTime(System.currentTimeMillis - writeStartTime)) } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. From d77327652ea702ad5771d6baab5f406504f53714 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 00:40:22 -0800 Subject: [PATCH 2/5] Use nano time --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 17d5196a72b5b..dbc5000a6de65 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -723,7 +723,7 @@ private[spark] class ExternalSorter[K, V, C]( partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null - val writeStartTime = System.currentTimeMillis + val writeStartTime = System.nanoTime try { out = new FileOutputStream(outputFile, true) for (i <- 0 until numPartitions) { @@ -742,7 +742,7 @@ private[spark] class ExternalSorter[K, V, C]( } } context.taskMetrics.shuffleWriteMetrics.map( - _.incShuffleWriteTime(System.currentTimeMillis - writeStartTime)) + _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. From ace156c3bac206e772879c012ad58571f0a0bde7 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 10:59:45 -0800 Subject: [PATCH 3/5] Moved metrics to finally block --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 2 ++ .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 27496c5a289cb..9d41ca71fcb38 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -63,7 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } + val fileOpenStartTime = System.nanoTime val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) + writeMetrics.incShuffleWriteTime(System.nanoTime - fileOpenStartTime) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index dbc5000a6de65..d239b9f27a7ac 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -740,9 +740,9 @@ private[spark] class ExternalSorter[K, V, C]( if (in != null) { in.close() } + context.taskMetrics.shuffleWriteMetrics.map( + _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } - context.taskMetrics.shuffleWriteMetrics.map( - _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. From 94e4237fd122508382a95bf4d4309a449b6ac408 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 11:31:02 -0800 Subject: [PATCH 4/5] Removed open time metrics added inadvertently --- .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 9d41ca71fcb38..27496c5a289cb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -63,9 +63,7 @@ private[spark] class SortShuffleWriter[K, V, C]( sorter.insertAll(records) } - val fileOpenStartTime = System.nanoTime val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) - writeMetrics.incShuffleWriteTime(System.nanoTime - fileOpenStartTime) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) From 5c6f3d9e284ae04939330b9864590a26b0a9fa6d Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 12 Feb 2015 12:53:52 -0800 Subject: [PATCH 5/5] Use foreach --- .../scala/org/apache/spark/util/collection/ExternalSorter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index d239b9f27a7ac..d69f2d9048055 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -740,7 +740,7 @@ private[spark] class ExternalSorter[K, V, C]( if (in != null) { in.close() } - context.taskMetrics.shuffleWriteMetrics.map( + context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } } else {