Skip to content

Commit

Permalink
Document UnsafeShuffleManager.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 11, 2015
1 parent ec6d626 commit ae538dc
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.sort.SortShuffleManager

/**
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the new shuffle.
*/
private class UnsafeShuffleHandle[K, V](
shuffleId: Int,
override val numMaps: Int,
Expand All @@ -30,6 +33,10 @@ private class UnsafeShuffleHandle[K, V](
}

private[spark] object UnsafeShuffleManager extends Logging {
/**
* Helper method for determining whether a shuffle should use the optimized unsafe shuffle
* path or whether it should fall back to the original sort-based shuffle.
*/
def canUseUnsafeShuffle[K, V, C](dependency: ShuffleDependency[K, V, C]): Boolean = {
val shufId = dependency.shuffleId
val serializer = Serializer.getSerializer(dependency.serializer)
Expand All @@ -43,13 +50,63 @@ private[spark] object UnsafeShuffleManager extends Logging {
} else if (dependency.keyOrdering.isDefined) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
false
} else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) {
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions")
false
} else {
log.debug(s"Can use UnsafeShuffle for shuffle $shufId")
true
}
}
}

/**
* A shuffle implementation that uses directly-managed memory to implement several performance
* optimizations for certain types of shuffles. In cases where the new performance optimizations
* cannot be applied, this shuffle manager delegates to [[SortShuffleManager]] to handle those
* shuffles.
*
* UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold:
*
* - The shuffle dependency specifies no aggregation or output ordering.
* - The shuffle serializer supports relocation of serialized values (this is currently supported
* by KryoSerializer and Spark SQL's custom serializers).
* - The shuffle produces fewer than 16777216 output partitions.
* - No individual record is larger than 128 MB when serialized.
*
* In addition, extra spill-merging optimizations are automatically applied when the shuffle
* compression codec supports concatenation of serialized streams. This is currently supported by
* Spark's LZF serializer.
*
* At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager.
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
* to produce the final output file.
*
* UnsafeShuffleManager optimizes this process in several ways:
*
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
* consumption and GC overheads. This optimization requires the record serializer to have certain
* properties to allow serialized records to be re-ordered without requiring deserialization.
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
*
* - It uses a specialized cache-efficient sorter ([[UnsafeShuffleExternalSorter]]) that sorts
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
* record in the sorting array, this fits more of the array into cache.
*
* - The spill merging procedure operates on blocks of serialized records that belong to the same
* partition and does not need to deserialize records during the merge.
*
* - When the spill compression codec supports concatenation of compressed data, the spill merge
* simply concatenates the serialized and compressed spill partitions to produce the final output
* partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
* and avoids the need to allocate decompression or copying buffers during the merge.
*
* For more details on UnsafeShuffleManager's design, see SPARK-7081.
*/
private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManager {

private[this] val sortShuffleManager: SortShuffleManager = new SortShuffleManager(conf)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe

import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{FunSuite, Matchers}

import org.apache.spark._
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}

/**
* Tests for the fallback logic in UnsafeShuffleManager. Actual tests of shuffling data are
* performed in other suites.
*/
class UnsafeShuffleManagerSuite extends FunSuite with Matchers {

import UnsafeShuffleManager.canUseUnsafeShuffle

private class RuntimeExceptionAnswer extends Answer[Object] {
override def answer(invocation: InvocationOnMock): Object = {
throw new RuntimeException("Called non-stubbed method, " + invocation.getMethod.getName)
}
}

private def shuffleDep(
partitioner: Partitioner,
serializer: Option[Serializer],
keyOrdering: Option[Ordering[Any]],
aggregator: Option[Aggregator[Any, Any, Any]],
mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = {
val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new RuntimeExceptionAnswer())
doReturn(0).when(dep).shuffleId
doReturn(partitioner).when(dep).partitioner
doReturn(serializer).when(dep).serializer
doReturn(keyOrdering).when(dep).keyOrdering
doReturn(aggregator).when(dep).aggregator
doReturn(mapSideCombine).when(dep).mapSideCombine
dep
}

test("supported shuffle dependencies") {
val kryo = Some(new KryoSerializer(new SparkConf()))

assert(canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = None,
aggregator = None,
mapSideCombine = false
)))

val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]])
when(rangePartitioner.numPartitions).thenReturn(2)
assert(canUseUnsafeShuffle(shuffleDep(
partitioner = rangePartitioner,
serializer = kryo,
keyOrdering = None,
aggregator = None,
mapSideCombine = false
)))

}

test("unsupported shuffle dependencies") {
val kryo = Some(new KryoSerializer(new SparkConf()))
val java = Some(new JavaSerializer(new SparkConf()))

// We only support serializers that support object relocation
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = java,
keyOrdering = None,
aggregator = None,
mapSideCombine = false
)))

// We do not support shuffles with more than 16 million output partitions
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1),
serializer = kryo,
keyOrdering = None,
aggregator = None,
mapSideCombine = false
)))

// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = None,
mapSideCombine = false
)))
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = None,
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
mapSideCombine = false
)))
// We do not support shuffles that perform any kind of aggregation or sorting of keys
assert(!canUseUnsafeShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
serializer = kryo,
keyOrdering = Some(mock(classOf[Ordering[Any]])),
aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
mapSideCombine = true
)))
}

}

0 comments on commit ae538dc

Please sign in to comment.