From ec6d62613144b7c7cbcc08fd9eb6fecd341b303d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 11 May 2015 11:20:07 -0700 Subject: [PATCH] Add notes on maximum # of supported shuffle partitions. --- .../spark/shuffle/unsafe/PackedRecordPointer.java | 3 +++ .../spark/shuffle/unsafe/UnsafeShuffleSorter.java | 15 +++++++++++++-- .../spark/shuffle/unsafe/UnsafeShuffleWriter.java | 6 ++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java index 8c0940d23420b..ee991ee26f7a0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java @@ -37,6 +37,8 @@ final class PackedRecordPointer { static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes + static final int MAXIMUM_PARTITION_ID = 1 << 24; // 16777216 + /** Bit mask for the lower 40 bits of a long. */ private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL; @@ -62,6 +64,7 @@ final class PackedRecordPointer { * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class. */ public static long packPointer(long recordPointer, int partitionId) { + assert (partitionId <= MAXIMUM_PARTITION_ID); // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51); diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java index 5acbc6c1c4f2f..8e66fbaf4c645 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSorter.java @@ -17,8 +17,10 @@ package org.apache.spark.shuffle.unsafe; +import java.io.IOException; import java.util.Comparator; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; final class UnsafeShuffleSorter { @@ -59,8 +61,17 @@ public long getMemoryUsage() { return sortBuffer.length * 8L; } - // TODO: clairify assumption that pointer points to record length. - public void insertRecord(long recordPointer, int partitionId) { + /** + * Inserts a record to be sorted. + * + * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to + * certain pointer compression techniques used by the sorter, the sort can + * only operate on pointers that point to locations in the first + * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page. + * @param partitionId the partition id, which must be less than or equal to + * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}. + */ + public void insertRecord(long recordPointer, int partitionId) throws IOException { if (!hasSpaceForAnotherRecord()) { expandSortBuffer(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index df05a95506f4b..438852cd1408c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -100,6 +100,12 @@ public UnsafeShuffleWriter( int mapId, TaskContext taskContext, SparkConf sparkConf) { + final int numPartitions = handle.dependency().partitioner().numPartitions(); + if (numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) { + throw new IllegalArgumentException( + "UnsafeShuffleWriter can only be used for shuffles with at most " + + PackedRecordPointer.MAXIMUM_PARTITION_ID + " reduce partitions"); + } this.blockManager = blockManager; this.shuffleBlockResolver = shuffleBlockResolver; this.memoryManager = memoryManager;