Skip to content

Commit

Permalink
Update Exchange operator's copying logic to account for new shuffle m…
Browse files Browse the repository at this point in the history
…anager
  • Loading branch information
JoshRosen committed May 10, 2015
1 parent 8f5061a commit 67d25ba
Showing 1 changed file with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair

object Exchange {
Expand Down Expand Up @@ -85,29 +86,36 @@ case class Exchange(
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
val shuffleManager = SparkEnv.get.shuffleManager
val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] ||
shuffleManager.isInstanceOf[UnsafeShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
// Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
// However, there are two special cases where we can avoid the copy, described below:
if (partitioner.numPartitions <= bypassMergeThreshold) {
// If the number of output partitions is sufficiently small, then Spark will fall back to
// the old hash-based shuffle write path which doesn't buffer deserialized records.
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
// If we're using the original SortShuffleManager and the number of output partitions is
// sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
//
// This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
false
} else {
// None of the special cases held, so we must copy.
// Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code
// path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls
// back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In
// both cases, we must copy.
true
}
} else {
Expand Down

0 comments on commit 67d25ba

Please sign in to comment.