From 76ca65e0c36fca292e32cb948d5507a4270c24e0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 16:51:17 -0700 Subject: [PATCH 1/5] Add regression test for SPARK-3426. --- .../scala/org/apache/spark/ShuffleSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 15aa4d83800fa..40e0f4b95b8ae 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -242,6 +242,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex assert(thrown.getClass === classOf[SparkException]) assert(thrown.getMessage.toLowerCase.contains("serializable")) } + + test("shuffle with shuffle.spill.compress=true, shuffle.compress=false (SPARK-3426)") { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.shuffle.spill.compress", "true") + .set("spark.shuffle.compress", "false") + .set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext(conf) + sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect() + } + + test("shuffle with shuffle.spill.compress=false, shuffle.compress=true (SPARK-3426)") { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.shuffle.spill.compress", "false") + .set("spark.shuffle.compress", "true") + .set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext(conf) + sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect() + } } object ShuffleSuite { From 91e7e40cc74529bba42f29cd59168eb3d9a3348f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Oct 2014 10:23:17 -0700 Subject: [PATCH 2/5] Combine tests into single test of all combinations --- .../scala/org/apache/spark/ShuffleSuite.scala | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 40e0f4b95b8ae..2bdd84ce69ab8 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -243,26 +243,28 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex assert(thrown.getMessage.toLowerCase.contains("serializable")) } - test("shuffle with shuffle.spill.compress=true, shuffle.compress=false (SPARK-3426)") { - val conf = new SparkConf() - .setAppName("test") - .setMaster("local") - .set("spark.shuffle.spill.compress", "true") - .set("spark.shuffle.compress", "false") - .set("spark.shuffle.memoryFraction", "0.001") - sc = new SparkContext(conf) - sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect() - } - - test("shuffle with shuffle.spill.compress=false, shuffle.compress=true (SPARK-3426)") { - val conf = new SparkConf() - .setAppName("test") - .setMaster("local") - .set("spark.shuffle.spill.compress", "false") - .set("spark.shuffle.compress", "true") - .set("spark.shuffle.memoryFraction", "0.001") - sc = new SparkContext(conf) - sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect() + test("shuffle with different compression settings (SPARK-3426)") { + for ( + shuffleSpillCompress <- Set(true, false); + shuffleCompress <- Set(true, false) + ) { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString) + .set("spark.shuffle.compress", shuffleCompress.toString) + .set("spark.shuffle.memoryFraction", "0.001") + resetSparkContext() + sc = new SparkContext(conf) + try { + sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect() + } catch { + case e: Exception => + val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," + + s" spark.shuffle.compress=$shuffleCompress" + throw new Exception(errMsg, e) + } + } } } From 2c687b94a9f2e3cb8b38e3ed1bc55eefd9780133 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 21 Oct 2014 23:56:24 -0700 Subject: [PATCH 3/5] Fix SPARK-3426. --- .../org/apache/spark/storage/BlockId.scala | 11 ++++++++--- .../org/apache/spark/storage/BlockManager.scala | 3 ++- .../apache/spark/storage/DiskBlockManager.scala | 17 +++++++++++++---- .../util/collection/ExternalAppendOnlyMap.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 9 +++++++-- 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index a83a3f468ae5f..8df5ec6bde184 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -83,9 +83,14 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { def name = "input-" + streamId + "-" + uniqueId } -/** Id associated with temporary data managed as blocks. Not serializable. */ -private[spark] case class TempBlockId(id: UUID) extends BlockId { - def name = "temp_" + id +/** Id associated with temporary local data managed as blocks. Not serializable. */ +private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { + def name = "temp_local_" + id +} + +/** Id associated with temporary shuffle data managed as blocks. Not serializable. */ +private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId { + def name = "temp_shuffle_" + id } // Intended only for testing purposes diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0ce2a3f631b15..4cc97923658bc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1071,7 +1071,8 @@ private[spark] class BlockManager( case _: ShuffleBlockId => compressShuffle case _: BroadcastBlockId => compressBroadcast case _: RDDBlockId => compressRdds - case _: TempBlockId => compressShuffleSpill + case _: TempLocalBlockId => compressShuffleSpill + case _: TempShuffleBlockId => compressShuffle case _ => false } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a715594f198c2..6633a1db57e59 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -98,11 +98,20 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon getAllFiles().map(f => BlockId(f.getName)) } - /** Produces a unique block id and File suitable for intermediate results. */ - def createTempBlock(): (TempBlockId, File) = { - var blockId = new TempBlockId(UUID.randomUUID()) + /** Produces a unique block id and File suitable for storing local intermediate results. */ + def createTempLocalBlock(): (TempLocalBlockId, File) = { + var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { - blockId = new TempBlockId(UUID.randomUUID()) + blockId = new TempLocalBlockId(UUID.randomUUID()) + } + (blockId, getFile(blockId)) + } + + /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ + def createTempShuffleBlock(): (TempShuffleBlockId, File) = { + var blockId = new TempShuffleBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 0c088da46aa5e..26fa0cb6d7bde 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -153,7 +153,7 @@ class ExternalAppendOnlyMap[K, V, C]( * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ override protected[this] def spill(collection: SizeTracker): Unit = { - val (blockId, file) = diskBlockManager.createTempBlock() + val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, curWriteMetrics) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index d1b06d14acbd2..6621bf8772abd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -38,6 +38,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId} * * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. * + * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied + * to its use in sort-based shuffle (for example, its block compression is controlled by + * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other + * non-shuffle contexts where we might want to use different configuration settings. + * * @param aggregator optional Aggregator with combine functions to use for merging data * @param partitioner optional Partitioner; if given, sort by partition ID and then key * @param ordering optional Ordering to sort keys within each partition; should be a total ordering @@ -259,7 +264,7 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) - val (blockId, file) = diskBlockManager.createTempBlock() + val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) var objectsWritten = 0 // Objects written since the last flush @@ -338,7 +343,7 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { - val (blockId, file) = diskBlockManager.createTempBlock() + val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } } From c8dd8f28be4382d02594607b50d694448027f8f9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Oct 2014 10:31:10 -0700 Subject: [PATCH 4/5] Add comment explaining use of createTempShuffleBlock(). --- .../org/apache/spark/util/collection/ExternalSorter.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6621bf8772abd..dae88b6365a8a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -264,6 +264,9 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) + // Because these files may be read during shuffle, they must be compressed using + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -343,6 +346,9 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { + // Because these files may be read during shuffle, they must be compressed using + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } From 1921cf683b6460a1c31d1694eb5b26f46a90125f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Oct 2014 10:33:28 -0700 Subject: [PATCH 5/5] Minor edit for clarity. --- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index dae88b6365a8a..c1ce13683b569 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -264,7 +264,7 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) - // Because these files may be read during shuffle, they must be compressed using + // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() @@ -346,7 +346,7 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { - // Because these files may be read during shuffle, they must be compressed using + // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock()