Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23989][SQL] exchange should copy data before non-serialized shuffle #21101

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Apr 18, 2018

What changes were proposed in this pull request?

In Spark SQL, we usually reuse the UnsafeRow instance and need to copy the data when a place buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

ShuffleExchangeExec.needToCopyObjectsBeforeShuffle misses the case that, if spark.sql.shuffle.partitions is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.

TODO: test

How was this patch tested?

todo.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89513 has finished for PR 21101 at commit 40b2c5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If we're using the original SortShuffleManager and the number of output partitions is
// sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializer.supportsRelocationOfSerializedObjects) {
} else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
Copy link
Contributor

@JoshRosen JoshRosen Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was almost going to suggest that we should we check for both conditions with an && here just as future-proofing in case serializer was changed, but I can now see why that isn't a huge risk in the current codebase: we always use an UnsafeRowSerializer here now. It was only in the pre-Tungsten era that we could use either UnsafeRowSerializer or SparkSqlSerializer here.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

hvanhovell commented Apr 19, 2018

Merging to master and 2.3. Let me know if more further backports are needed.

asfgit pushed a commit that referenced this pull request Apr 19, 2018
…uffle

## What changes were proposed in this pull request?

In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects.

Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle.

`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle.

This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange.

TODO: test

## How was this patch tested?

todo.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21101 from cloud-fan/shuffle.

(cherry picked from commit 6e19f76)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
@asfgit asfgit closed this in 6e19f76 Apr 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants