Skip to content

Commit

Permalink
Harmonization with shuffle's unsafe sorter
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 6, 2015
1 parent 206bfa2 commit ebf9eea
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +41,7 @@ public final class UnsafeExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

private static final int PAGE_SIZE = 1024 * 1024; // TODO: tune this
private static final int PAGE_SIZE = 1 << 27; // 128 megabytes

private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
Expand Down Expand Up @@ -107,6 +108,12 @@ private void openSorter() throws IOException {

@VisibleForTesting
public void spill() throws IOException {
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
numSpills,
numSpills > 1 ? " times" : " time");

final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSize, writeMetrics);
spillWriters.add(spillWriter);
Expand All @@ -129,14 +136,13 @@ public void spill() throws IOException {
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(spillWriter.numberOfSpilledBytes());
numSpills++;
final long threadId = Thread.currentThread().getId();
// TODO: messy; log _before_ spill
logger.info("Thread " + threadId + " spilling in-memory map of " +
org.apache.spark.util.Utils.bytesToString(spillSize) + " to disk (" +
(numSpills + ((numSpills > 1) ? " times" : " time")) + " so far)");
openSorter();
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
}

public long freeMemory() {
long memoryFreed = 0;
final Iterator<MemoryBlock> iter = allocatedPages.iterator();
Expand Down

0 comments on commit ebf9eea

Please sign in to comment.