From e995d1a14565bfed400ba19a92d6a3035c3cca5a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 12 May 2015 11:51:01 -0700 Subject: [PATCH] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. --- .../spark/shuffle/unsafe/PackedRecordPointer.java | 3 +++ .../spark/shuffle/unsafe/UnsafeShuffleManager.scala | 10 ++++++++-- .../shuffle/unsafe/UnsafeShuffleManagerSuite.scala | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java index 35d7b7b6651d0..6d61b1b9e34da 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java @@ -37,6 +37,9 @@ final class PackedRecordPointer { static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes + /** + * The maximum partition identifier that can be encoded. Note that partition ids start from 0. + */ static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215 /** Bit mask for the lower 40 bits of a long. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala index 5db64e2144abe..4785e8c0f91a3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala @@ -33,6 +33,12 @@ private class UnsafeShuffleHandle[K, V]( } private[spark] object UnsafeShuffleManager extends Logging { + + /** + * The maximum number of shuffle output partitions that UnsafeShuffleManager supports. + */ + val MAX_SHUFFLE_OUTPUT_PARTITIONS = PackedRecordPointer.MAXIMUM_PARTITION_ID + 1 + /** * Helper method for determining whether a shuffle should use the optimized unsafe shuffle * path or whether it should fall back to the original sort-based shuffle. @@ -50,9 +56,9 @@ private[spark] object UnsafeShuffleManager extends Logging { } else if (dependency.keyOrdering.isDefined) { log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined") false - } else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) { + } else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) { log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " + - s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions") + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions") false } else { log.debug(s"Can use UnsafeShuffle for shuffle $shufId") diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala index 9c91948bdc1e4..49a04a2a45280 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala @@ -93,7 +93,7 @@ class UnsafeShuffleManagerSuite extends FunSuite with Matchers { // We do not support shuffles with more than 16 million output partitions assert(!canUseUnsafeShuffle(shuffleDep( - partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1), + partitioner = new HashPartitioner(UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS + 1), serializer = kryo, keyOrdering = None, aggregator = None,