diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index d142cf59d8085..1c875a15687c1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -17,21 +17,18 @@ package org.apache.spark.shuffle.unsafe; -import org.apache.spark.*; -import org.apache.spark.unsafe.sort.ExternalSorterIterator; -import org.apache.spark.unsafe.sort.UnsafeExternalSorter; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + import scala.Option; import scala.Product2; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; - import com.esotericsoftware.kryo.io.ByteBufferOutputStream; +import org.apache.spark.*; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.scheduler.MapStatus; import org.apache.spark.scheduler.MapStatus$; @@ -44,10 +41,11 @@ import org.apache.spark.storage.BlockObjectWriter; import org.apache.spark.storage.ShuffleBlockId; import org.apache.spark.unsafe.PlatformDependent; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; - -import static org.apache.spark.unsafe.sort.UnsafeSorter.*; +import org.apache.spark.unsafe.sort.ExternalSorterIterator; +import org.apache.spark.unsafe.sort.UnsafeExternalSorter; +import static org.apache.spark.unsafe.sort.UnsafeSorter.PrefixComparator; +import static org.apache.spark.unsafe.sort.UnsafeSorter.RecordComparator; // IntelliJ gets confused and claims that this class should be abstract, but this actually compiles public class UnsafeShuffleWriter implements ShuffleWriter { diff --git a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java index 917cbdb564a15..c69d486705f65 100644 --- a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java +++ b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java @@ -176,8 +176,7 @@ public void remove() { public UnsafeSorterSpillMerger.MergeableIterator getMergeableIterator() { sorter.sort(sortBuffer, 0, sortBufferInsertPosition / 2, sortComparator); - UnsafeSorterSpillMerger.MergeableIterator iter = - new UnsafeSorterSpillMerger.MergeableIterator() { + return new UnsafeSorterSpillMerger.MergeableIterator() { private int position = 0; private Object baseObject; @@ -213,6 +212,5 @@ public long getBaseOffset() { return baseOffset; } }; - return iter; } } diff --git a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillMerger.java index 93278d5a26473..bd3f4424724f6 100644 --- a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillMerger.java @@ -18,12 +18,11 @@ package org.apache.spark.unsafe.sort; import java.util.Comparator; -import java.util.Iterator; import java.util.PriorityQueue; import static org.apache.spark.unsafe.sort.UnsafeSorter.*; -public final class UnsafeSorterSpillMerger { +final class UnsafeSorterSpillMerger { private final PriorityQueue priorityQueue; @@ -39,13 +38,6 @@ public static abstract class MergeableIterator { public abstract long getBaseOffset(); } - public static final class RecordAddressAndKeyPrefix { - public Object baseObject; - public long baseOffset; - public int recordLength; - public long keyPrefix; - } - public UnsafeSorterSpillMerger( final RecordComparator recordComparator, final UnsafeSorter.PrefixComparator prefixComparator) { @@ -74,11 +66,10 @@ public void addSpill(MergeableIterator spillReader) { priorityQueue.add(spillReader); } - public Iterator getSortedIterator() { - return new Iterator() { + public ExternalSorterIterator getSortedIterator() { + return new ExternalSorterIterator() { private MergeableIterator spillReader; - private final RecordAddressAndKeyPrefix record = new RecordAddressAndKeyPrefix(); @Override public boolean hasNext() { @@ -86,7 +77,7 @@ public boolean hasNext() { } @Override - public RecordAddressAndKeyPrefix next() { + public void loadNext() { if (spillReader != null) { if (spillReader.hasNext()) { spillReader.loadNextRecord(); @@ -94,17 +85,10 @@ public RecordAddressAndKeyPrefix next() { } } spillReader = priorityQueue.remove(); - record.baseObject = spillReader.getBaseObject(); - record.baseOffset = spillReader.getBaseOffset(); - record.keyPrefix = spillReader.getPrefix(); - return record; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); + baseObject = spillReader.getBaseObject(); + baseOffset = spillReader.getBaseOffset(); + keyPrefix = spillReader.getPrefix(); } }; } - } diff --git a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillReader.java index 894a593d41f3e..3102e5ab3b6f4 100644 --- a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillReader.java @@ -17,13 +17,14 @@ package org.apache.spark.unsafe.sort; +import java.io.*; + import com.google.common.io.ByteStreams; + import org.apache.spark.storage.BlockId; import org.apache.spark.storage.BlockManager; import org.apache.spark.unsafe.PlatformDependent; -import java.io.*; - final class UnsafeSorterSpillReader extends UnsafeSorterSpillMerger.MergeableIterator { private final File file; diff --git a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillWriter.java index 6085df67d2c2e..33356c3351967 100644 --- a/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillWriter.java +++ b/core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorterSpillWriter.java @@ -17,9 +17,14 @@ package org.apache.spark.unsafe.sort; +import java.io.*; +import java.nio.ByteBuffer; + +import scala.Tuple2; +import scala.reflect.ClassTag; + import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.serializer.DeserializationStream; -import org.apache.spark.serializer.JavaSerializerInstance; import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.storage.BlockId; @@ -27,27 +32,23 @@ import org.apache.spark.storage.BlockObjectWriter; import org.apache.spark.storage.TempLocalBlockId; import org.apache.spark.unsafe.PlatformDependent; -import scala.Tuple2; -import scala.reflect.ClassTag; - -import java.io.*; -import java.nio.ByteBuffer; final class UnsafeSorterSpillWriter { private static final int SER_BUFFER_SIZE = 1024 * 1024; // TODO: tune this - public static final int EOF_MARKER = -1; - byte[] arr = new byte[SER_BUFFER_SIZE]; + static final int EOF_MARKER = -1; + + private byte[] arr = new byte[SER_BUFFER_SIZE]; private final File file; private final BlockId blockId; - BlockObjectWriter writer; - DataOutputStream dos; + private BlockObjectWriter writer; + private DataOutputStream dos; public UnsafeSorterSpillWriter( BlockManager blockManager, int fileBufferSize, - ShuffleWriteMetrics writeMetrics) throws IOException { + ShuffleWriteMetrics writeMetrics) { final Tuple2 spilledFileInfo = blockManager.diskBlockManager().createTempLocalBlock(); this.file = spilledFileInfo._2(); @@ -119,6 +120,8 @@ public void write( public void close() throws IOException { dos.writeInt(EOF_MARKER); writer.commitAndClose(); + writer = null; + dos = null; arr = null; }