Skip to content

Commit

Permalink
Remove ability to disable spilling in UnsafeShuffleExternalSorter.
Browse files Browse the repository at this point in the history
There's no obvious use-case for allowing users to disable spark.shuffle.spill
and run out of memory. Because this configuration isn't deprecated as of this
patch, I've added code to log a warning to let users know if their preference
will be ignored by the new shuffle manager.
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent 57312c9 commit 6276168
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ final class UnsafeShuffleExternalSorter {
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
private final boolean spillingEnabled;
private final ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
Expand Down Expand Up @@ -107,7 +106,6 @@ public UnsafeShuffleExternalSorter(
this.taskContext = taskContext;
this.initialSize = initialSize;
this.numPartitions = numPartitions;
this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

Expand All @@ -121,12 +119,10 @@ public UnsafeShuffleExternalSorter(
private void initializeForWriting() throws IOException {
// TODO: move this sizing calculation logic into a static method of sorter:
final long memoryRequested = initialSize * 8L;
if (spillingEnabled) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
if (memoryAcquired != memoryRequested) {
shuffleMemoryManager.release(memoryAcquired);
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
if (memoryAcquired != memoryRequested) {
shuffleMemoryManager.release(memoryAcquired);
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}

this.sorter = new UnsafeShuffleInMemorySorter(initialSize);
Expand Down Expand Up @@ -291,7 +287,7 @@ public void cleanupAfterError() {
logger.error("Unable to delete spill file {}", spill.file.getPath());
}
}
if (spillingEnabled && sorter != null) {
if (sorter != null) {
shuffleMemoryManager.release(sorter.getMemoryUsage());
sorter = null;
}
Expand All @@ -307,24 +303,19 @@ public void cleanupAfterError() {
*/
private boolean haveSpaceForRecord(int requiredSpace) {
assert (requiredSpace > 0);
// The sort array will automatically expand when inserting a new record, so we only need to
// worry about it having free space when spilling is enabled.
final boolean sortBufferHasSpace = !spillingEnabled || sorter.hasSpaceForAnotherRecord();
final boolean dataPageHasSpace = requiredSpace <= freeSpaceInCurrentPage;
return (sortBufferHasSpace && dataPageHasSpace);
return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
}

/**
* Allocates more memory in order to insert an additional record. If spilling is enabled, this
* will request additional memory from the {@link ShuffleMemoryManager} and spill if the requested
* memory can not be obtained. If spilling is disabled, then this will allocate memory without
* coordinating with the ShuffleMemoryManager.
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size.
*/
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
if (spillingEnabled && !sorter.hasSpaceForAnotherRecord()) {
if (!sorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort buffer");
final long oldSortBufferMemoryUsage = sorter.getMemoryUsage();
final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2;
Expand All @@ -347,16 +338,14 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
} else {
if (spillingEnabled) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
}
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,14 @@ private[spark] object UnsafeShuffleManager extends Logging {
*
* For more details on UnsafeShuffleManager's design, see SPARK-7081.
*/
private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager {
private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {

if (!conf.getBoolean("spark.shuffle.spill", true)) {
logWarning(
"spark.shuffle.spill was set to false, but this is ignored by UnsafeShuffleManager; " +
"its optimized shuffles will continue to spill to disk when necessary.")
}


private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf)

Expand Down

0 comments on commit 6276168

Please sign in to comment.