From 1db845a097ae3882d9d84b3dabe9b9668212c2b3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 18 Jun 2015 16:18:16 -0700 Subject: [PATCH] Many more changes to harmonize with shuffle sorter --- .../unsafe/sort/UnsafeExternalSorter.java | 169 +++++++++++------- .../unsafe/sort/UnsafeInMemorySorter.java | 43 ++--- 2 files changed, 125 insertions(+), 87 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a58a6bcb23bdd..5a63e72940357 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -17,7 +17,13 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.io.IOException; +import java.util.LinkedList; + import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; @@ -27,12 +33,6 @@ import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; import org.apache.spark.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; /** * External sorter based on {@link UnsafeInMemorySorter}. @@ -42,28 +42,36 @@ public final class UnsafeExternalSorter { private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class); private static final int PAGE_SIZE = 1 << 27; // 128 megabytes + @VisibleForTesting + static final int MAX_RECORD_SIZE = PAGE_SIZE - 4; private final PrefixComparator prefixComparator; private final RecordComparator recordComparator; private final int initialSize; - private int numSpills = 0; - private UnsafeInMemorySorter sorter; - private final TaskMemoryManager memoryManager; private final ShuffleMemoryManager shuffleMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; - private final LinkedList allocatedPages = new LinkedList(); - private final boolean spillingEnabled; - private final int fileBufferSize; private ShuffleWriteMetrics writeMetrics; + /** The buffer size to use when writing spills using DiskBlockObjectWriter */ + private final int fileBufferSizeBytes; + + /** + * Memory pages that hold the records being sorted. The pages in this list are freed when + * spilling, although in principle we could recycle these pages across spills (on the other hand, + * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager + * itself). + */ + private final LinkedList allocatedPages = new LinkedList(); + // These variables are reset after spilling: + private UnsafeInMemorySorter sorter; private MemoryBlock currentPage = null; private long currentPagePosition = -1; + private long freeSpaceInCurrentPage = 0; - private final LinkedList spillWriters = - new LinkedList(); + private final LinkedList spillWriters = new LinkedList<>(); public UnsafeExternalSorter( TaskMemoryManager memoryManager, @@ -81,41 +89,44 @@ public UnsafeExternalSorter( this.recordComparator = recordComparator; this.prefixComparator = prefixComparator; this.initialSize = initialSize; - this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true); // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units - this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; - openSorter(); + this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; + initializeForWriting(); } // TODO: metrics tracking + integration with shuffle write metrics + // need to connect the write metrics to task metrics so we count the spill IO somewhere. - private void openSorter() throws IOException { + /** + * Allocates new sort data structures. Called when creating the sorter and after each spill. + */ + private void initializeForWriting() throws IOException { this.writeMetrics = new ShuffleWriteMetrics(); - // TODO: connect write metrics to task metrics? // TODO: move this sizing calculation logic into a static method of sorter: final long memoryRequested = initialSize * 8L * 2; - if (spillingEnabled) { - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); - if (memoryAcquired != memoryRequested) { - shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire memory!"); - } + final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); + if (memoryAcquired != memoryRequested) { + shuffleMemoryManager.release(memoryAcquired); + throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); } this.sorter = new UnsafeInMemorySorter(memoryManager, recordComparator, prefixComparator, initialSize); } + /** + * Sort and spill the current records in response to memory pressure. + */ @VisibleForTesting - public void spill() throws IOException { + void spill() throws IOException { logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), - numSpills, - numSpills > 1 ? " times" : " time"); + spillWriters.size(), + spillWriters.size() > 1 ? " times" : " time"); final UnsafeSorterSpillWriter spillWriter = - new UnsafeSorterSpillWriter(blockManager, fileBufferSize, writeMetrics); + new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics); spillWriters.add(spillWriter); final UnsafeSorterIterator sortedRecords = sorter.getSortedIterator(); while (sortedRecords.hasNext()) { @@ -134,9 +145,7 @@ public void spill() throws IOException { shuffleMemoryManager.release(sorterMemoryUsage); final long spillSize = freeMemory(); taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - taskContext.taskMetrics().incDiskBytesSpilled(spillWriter.numberOfSpilledBytes()); - numSpills++; - openSorter(); + initializeForWriting(); } private long getMemoryUsage() { @@ -145,72 +154,98 @@ private long getMemoryUsage() { public long freeMemory() { long memoryFreed = 0; - final Iterator iter = allocatedPages.iterator(); - while (iter.hasNext()) { - memoryManager.freePage(iter.next()); - shuffleMemoryManager.release(PAGE_SIZE); - memoryFreed += PAGE_SIZE; - iter.remove(); + for (MemoryBlock block : allocatedPages) { + memoryManager.freePage(block); + shuffleMemoryManager.release(block.size()); + memoryFreed += block.size(); } + allocatedPages.clear(); currentPage = null; currentPagePosition = -1; + freeSpaceInCurrentPage = 0; return memoryFreed; } - private void ensureSpaceInDataPage(int requiredSpace) throws IOException { + /** + * Checks whether there is enough space to insert a new record into the sorter. + * + * @param requiredSpace the required space in the data page, in bytes, including space for storing + * the record size. + + * @return true if the record can be inserted without requiring more allocations, false otherwise. + */ + private boolean haveSpaceForRecord(int requiredSpace) { + assert (requiredSpace > 0); + return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage)); + } + + /** + * Allocates more memory in order to insert an additional record. This will request additional + * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be + * obtained. + * + * @param requiredSpace the required space in the data page, in bytes, including space for storing + * the record size. + */ + private void allocateSpaceForRecord(int requiredSpace) throws IOException { // TODO: merge these steps to first calculate total memory requirements for this insert, // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the // data page. - if (!sorter.hasSpaceForAnotherRecord() && spillingEnabled) { - final long oldSortBufferMemoryUsage = sorter.getMemoryUsage(); - final long memoryToGrowSortBuffer = oldSortBufferMemoryUsage * 2; - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowSortBuffer); - if (memoryAcquired < memoryToGrowSortBuffer) { + if (!sorter.hasSpaceForAnotherRecord()) { + logger.debug("Attempting to expand sort pointer array"); + final long oldPointerArrayMemoryUsage = sorter.getMemoryUsage(); + final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2; + final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray); + if (memoryAcquired < memoryToGrowPointerArray) { shuffleMemoryManager.release(memoryAcquired); spill(); } else { - sorter.expandSortBuffer(); - shuffleMemoryManager.release(oldSortBufferMemoryUsage); + sorter.expandPointerArray(); + shuffleMemoryManager.release(oldPointerArrayMemoryUsage); } } - final long spaceInCurrentPage; - if (currentPage != null) { - spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage.getBaseOffset()); - } else { - spaceInCurrentPage = 0; - } - if (requiredSpace > PAGE_SIZE) { - // TODO: throw a more specific exception? - throw new IOException("Required space " + requiredSpace + " is greater than page size (" + - PAGE_SIZE + ")"); - } else if (requiredSpace > spaceInCurrentPage) { - if (spillingEnabled) { + if (requiredSpace > freeSpaceInCurrentPage) { + logger.trace("Required space {} is less than free space in current page ({})", requiredSpace, + freeSpaceInCurrentPage); + // TODO: we should track metrics on the amount of space wasted when we roll over to a new page + // without using the free space at the end of the current page. We should also do this for + // BytesToBytesMap. + if (requiredSpace > PAGE_SIZE) { + throw new IOException("Required space " + requiredSpace + " is greater than page size (" + + PAGE_SIZE + ")"); + } else { final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); if (memoryAcquired < PAGE_SIZE) { shuffleMemoryManager.release(memoryAcquired); spill(); - final long memoryAcquiredAfterSpill = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); - if (memoryAcquiredAfterSpill != PAGE_SIZE) { - shuffleMemoryManager.release(memoryAcquiredAfterSpill); - throw new IOException("Can't allocate memory!"); + final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE); + if (memoryAcquiredAfterSpilling != PAGE_SIZE) { + shuffleMemoryManager.release(memoryAcquiredAfterSpilling); + throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory"); } } + currentPage = memoryManager.allocatePage(PAGE_SIZE); + currentPagePosition = currentPage.getBaseOffset(); + freeSpaceInCurrentPage = PAGE_SIZE; + allocatedPages.add(currentPage); } - currentPage = memoryManager.allocatePage(PAGE_SIZE); - currentPagePosition = currentPage.getBaseOffset(); - allocatedPages.add(currentPage); - logger.info("Acquired new page! " + allocatedPages.size() * PAGE_SIZE); } } + /** + * Write a record to the sorter. + */ public void insertRecord( Object recordBaseObject, long recordBaseOffset, int lengthInBytes, long prefix) throws IOException { // Need 4 bytes to store the record length. - ensureSpaceInDataPage(lengthInBytes + 4); + final int totalSpaceRequired = lengthInBytes + 4; + if (!haveSpaceForRecord(totalSpaceRequired)) { + allocateSpaceForRecord(totalSpaceRequired); + } final long recordAddress = memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 5ddabb63e649d..c084290ba6117 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -70,12 +70,12 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ - private long[] sortBuffer; + private long[] pointerArray; /** * The position in the sort buffer where new records can be inserted. */ - private int sortBufferInsertPosition = 0; + private int pointerArrayInsertPosition = 0; public UnsafeInMemorySorter( final TaskMemoryManager memoryManager, @@ -83,39 +83,42 @@ public UnsafeInMemorySorter( final PrefixComparator prefixComparator, int initialSize) { assert (initialSize > 0); - this.sortBuffer = new long[initialSize * 2]; + this.pointerArray = new long[initialSize * 2]; this.memoryManager = memoryManager; - this.sorter = new Sorter(UnsafeSortDataFormat.INSTANCE); + this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); } public long getMemoryUsage() { - return sortBuffer.length * 8L; + return pointerArray.length * 8L; } public boolean hasSpaceForAnotherRecord() { - return sortBufferInsertPosition + 2 < sortBuffer.length; + return pointerArrayInsertPosition + 2 < pointerArray.length; } - public void expandSortBuffer() { - final long[] oldBuffer = sortBuffer; - sortBuffer = new long[oldBuffer.length * 2]; - System.arraycopy(oldBuffer, 0, sortBuffer, 0, oldBuffer.length); + public void expandPointerArray() { + final long[] oldArray = pointerArray; + // Guard against overflow: + final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : Integer.MAX_VALUE; + pointerArray = new long[newLength]; + System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length); } /** - * Insert a record into the sort buffer. + * Inserts a record to be sorted. * - * @param objectAddress pointer to a record in a data page, encoded by {@link TaskMemoryManager}. + * @param recordPointer pointer to a record in a data page, encoded by {@link TaskMemoryManager}. + * @param keyPrefix a user-defined key prefix */ - public void insertRecord(long objectAddress, long keyPrefix) { + public void insertRecord(long recordPointer, long keyPrefix) { if (!hasSpaceForAnotherRecord()) { - expandSortBuffer(); + expandPointerArray(); } - sortBuffer[sortBufferInsertPosition] = objectAddress; - sortBufferInsertPosition++; - sortBuffer[sortBufferInsertPosition] = keyPrefix; - sortBufferInsertPosition++; + pointerArray[pointerArrayInsertPosition] = recordPointer; + pointerArrayInsertPosition++; + pointerArray[pointerArrayInsertPosition] = keyPrefix; + pointerArrayInsertPosition++; } private static final class SortedIterator extends UnsafeSorterIterator { @@ -171,7 +174,7 @@ public void loadNext() { * {@code next()} will return the same mutable object. */ public UnsafeSorterIterator getSortedIterator() { - sorter.sort(sortBuffer, 0, sortBufferInsertPosition / 2, sortComparator); - return new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer); + sorter.sort(pointerArray, 0, pointerArrayInsertPosition / 2, sortComparator); + return new SortedIterator(memoryManager, pointerArrayInsertPosition, pointerArray); } }