diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index aaa4945eca9b2..e674195b67d4f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -70,7 +70,6 @@ final class UnsafeShuffleExternalSorter { private final ShuffleMemoryManager shuffleMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; - private final boolean spillingEnabled; private final ShuffleWriteMetrics writeMetrics; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ @@ -107,7 +106,6 @@ public UnsafeShuffleExternalSorter( this.taskContext = taskContext; this.initialSize = initialSize; this.numPartitions = numPartitions; - this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true); // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; @@ -121,12 +119,10 @@ public UnsafeShuffleExternalSorter( private void initializeForWriting() throws IOException { // TODO: move this sizing calculation logic into a static method of sorter: final long memoryRequested = initialSize * 8L; - if (spillingEnabled) { - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); - if (memoryAcquired != memoryRequested) { - shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); - } + final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); + if (memoryAcquired != memoryRequested) { + shuffleMemoryManager.release(memoryAcquired); + throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); } this.sorter = new UnsafeShuffleInMemorySorter(initialSize); @@ -291,7 +287,7 @@ public void cleanupAfterError() { logger.error("Unable to delete spill file {}", spill.file.getPath()); } } - if (spillingEnabled && sorter != null) { + if (sorter != null) { shuffleMemoryManager.release(sorter.getMemoryUsage()); sorter = null; } @@ -307,24 +303,19 @@ public void cleanupAfterError() { */ private boolean haveSpaceForRecord(int requiredSpace) { assert (requiredSpace > 0); - // The sort array will automatically expand when inserting a new record, so we only need to - // worry about it having free space when spilling is enabled. - final boolean sortBufferHasSpace = !spillingEnabled || sorter.hasSpaceForAnotherRecord(); - final boolean dataPageHasSpace = requiredSpace <= freeSpaceInCurrentPage; - return (sortBufferHasSpace && dataPageHasSpace); + return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); } /** - * Allocates more memory in order to insert an additional record. If spilling is enabled, this - * will request additional memory from the {@link ShuffleMemoryManager} and spill if the requested - * memory can not be obtained. If spilling is disabled, then this will allocate memory without - * coordinating with the ShuffleMemoryManager. + * Allocates more memory in order to insert an additional record. This will request additional + * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be + * obtained. * * @param requiredSpace the required space in the data page, in bytes, including space for storing * the record size. */ private void allocateSpaceForRecord(int requiredSpace) throws IOException { - if (spillingEnabled && !sorter.hasSpaceForAnotherRecord()) { + if (!sorter.hasSpaceForAnotherRecord()) { logger.debug("Attempting to expand sort buffer"); final long oldSortBufferMemoryUsage = sorter.getMemoryUsage(); final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2; @@ -347,16 +338,14 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException { throw new IOException("Required space " + requiredSpace + " is greater than page size (" + PAGE_SIZE + ")"); } else { - if (spillingEnabled) { - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); - if (memoryAcquired < PAGE_SIZE) { - shuffleMemoryManager.release(memoryAcquired); - spill(); - final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); - if (memoryAcquiredAfterSpilling != PAGE_SIZE) { - shuffleMemoryManager.release(memoryAcquiredAfterSpilling); - throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory"); - } + final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); + if (memoryAcquired < PAGE_SIZE) { + shuffleMemoryManager.release(memoryAcquired); + spill(); + final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); + if (memoryAcquiredAfterSpilling != PAGE_SIZE) { + shuffleMemoryManager.release(memoryAcquiredAfterSpilling); + throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory"); } } currentPage = memoryManager.allocatePage(PAGE_SIZE); diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala index 4785e8c0f91a3..2da7691f5af8f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala @@ -113,7 +113,14 @@ private[spark] object UnsafeShuffleManager extends Logging { * * For more details on UnsafeShuffleManager's design, see SPARK-7081. */ -private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager { +private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { + + if (!conf.getBoolean("spark.shuffle.spill", true)) { + logWarning( + "spark.shuffle.spill was set to false, but this is ignored by UnsafeShuffleManager; " + + "its optimized shuffles will continue to spill to disk when necessary.") + } + private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf)