Skip to content
Permalink
Browse files

[SPARK-23989][SQL] exchange should copy data before non-serialized sh…

…uffle

## What changes were proposed in this pull request?

In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.

TODO: test

## How was this patch tested?

todo.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21101 from cloud-fan/shuffle.

(cherry picked from commit 6e19f76)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
  • Loading branch information...
cloud-fan authored and hvanhovell committed Apr 19, 2018
1 parent 7fb1117 commit fb968215ca014c5cf40097a3c4588bbee11e2c02
@@ -153,12 +153,9 @@ object ShuffleExchangeExec {
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
*
* @param partitioner the partitioner for the shuffle
* @param serializer the serializer that will be used to write rows
* @return true if rows should be copied before being shuffled, false otherwise
*/
private def needToCopyObjectsBeforeShuffle(
partitioner: Partitioner,
serializer: Serializer): Boolean = {
private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner): Boolean = {
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
// passed instead of directly passing the number of partitions in order to guard against
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
@@ -167,22 +164,24 @@ object ShuffleExchangeExec {
val shuffleManager = SparkEnv.get.shuffleManager
val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val numParts = partitioner.numPartitions
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
if (numParts <= bypassMergeThreshold) {
// If we're using the original SortShuffleManager and the number of output partitions is
// sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializer.supportsRelocationOfSerializedObjects) {
} else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
// SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records
// prior to sorting them. This optimization is only applied in cases where shuffle
// dependency does not specify an aggregator or ordering and the record serializer has
// certain properties. If this optimization is enabled, we can safely avoid the copy.
// certain properties and the number of partitions doesn't exceed the limitation. If this
// optimization is enabled, we can safely avoid the copy.
//
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
// need to check whether the optimization is enabled and supported by our serializer.
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, and the
// serializer in Spark SQL always satisfy the properties, so we only need to check whether
// the number of partitions exceeds the limitation.
false
} else {
// Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
@@ -298,7 +297,7 @@ object ShuffleExchangeExec {
rdd
}

if (needToCopyObjectsBeforeShuffle(part, serializer)) {
if (needToCopyObjectsBeforeShuffle(part)) {
newRdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }

0 comments on commit fb96821

Please sign in to comment.
You can’t perform that action at this time.