From ebf9eeafb0ae8a1cbf69acff20fbc77e4bcbc695 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 18 Jun 2015 15:39:05 -0700 Subject: [PATCH] Harmonization with shuffle's unsafe sorter --- .../unsafe/sort/UnsafeExternalSorter.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index d0ede69e9e66c..a58a6bcb23bdd 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -26,6 +26,7 @@ import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; +import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public final class UnsafeExternalSorter { private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class); - private static final int PAGE_SIZE = 1024 * 1024; // TODO: tune this + private static final int PAGE_SIZE = 1 << 27; // 128 megabytes private final PrefixComparator prefixComparator; private final RecordComparator recordComparator; @@ -107,6 +108,12 @@ private void openSorter() throws IOException { @VisibleForTesting public void spill() throws IOException { + logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", + Thread.currentThread().getId(), + Utils.bytesToString(getMemoryUsage()), + numSpills, + numSpills > 1 ? " times" : " time"); + final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSize, writeMetrics); spillWriters.add(spillWriter); @@ -129,14 +136,13 @@ public void spill() throws IOException { taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); taskContext.taskMetrics().incDiskBytesSpilled(spillWriter.numberOfSpilledBytes()); numSpills++; - final long threadId = Thread.currentThread().getId(); - // TODO: messy; log _before_ spill - logger.info("Thread " + threadId + " spilling in-memory map of " + - org.apache.spark.util.Utils.bytesToString(spillSize) + " to disk (" + - (numSpills + ((numSpills > 1) ? " times" : " time")) + " so far)"); openSorter(); } + private long getMemoryUsage() { + return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE); + } + public long freeMemory() { long memoryFreed = 0; final Iterator iter = allocatedPages.iterator();