Skip to content

Commit

Permalink
Add notes on maximum # of supported shuffle partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 11, 2015
1 parent 0d4d199 commit ec6d626
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ final class PackedRecordPointer {

static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes

static final int MAXIMUM_PARTITION_ID = 1 << 24; // 16777216

/** Bit mask for the lower 40 bits of a long. */
private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL;

Expand All @@ -62,6 +64,7 @@ final class PackedRecordPointer {
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
*/
public static long packPointer(long recordPointer, int partitionId) {
assert (partitionId <= MAXIMUM_PARTITION_ID);
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.shuffle.unsafe;

import java.io.IOException;
import java.util.Comparator;

import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;

final class UnsafeShuffleSorter {
Expand Down Expand Up @@ -59,8 +61,17 @@ public long getMemoryUsage() {
return sortBuffer.length * 8L;
}

// TODO: clairify assumption that pointer points to record length.
public void insertRecord(long recordPointer, int partitionId) {
/**
* Inserts a record to be sorted.
*
* @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
* certain pointer compression techniques used by the sorter, the sort can
* only operate on pointers that point to locations in the first
* {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
* @param partitionId the partition id, which must be less than or equal to
* {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
*/
public void insertRecord(long recordPointer, int partitionId) throws IOException {
if (!hasSpaceForAnotherRecord()) {
expandSortBuffer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public UnsafeShuffleWriter(
int mapId,
TaskContext taskContext,
SparkConf sparkConf) {
final int numPartitions = handle.dependency().partitioner().numPartitions();
if (numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) {
throw new IllegalArgumentException(
"UnsafeShuffleWriter can only be used for shuffles with at most " +
PackedRecordPointer.MAXIMUM_PARTITION_ID + " reduce partitions");
}
this.blockManager = blockManager;
this.shuffleBlockResolver = shuffleBlockResolver;
this.memoryManager = memoryManager;
Expand Down

0 comments on commit ec6d626

Please sign in to comment.