-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice. #25341
[SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice. #25341
Conversation
The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.
* stream might compress or encrypt the bytes before persisting the data to the backing | ||
* data store. | ||
*/ | ||
long getNumBytesWritten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we _might- still need this - appears to be used by the sort shuffle writer per https://github.com/apache/spark/pull/25342/files#diff-fe378a929dd1f5c5ac8bff90dab743b1R87... hmm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the metrics after the commit (by adding up all the partition lengths)?
Otherwise it doesn't seem horrible to keep this in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just keep the API for now.
Test build #108588 has finished for PR 25341 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good job~
Addressed comments |
Test build #109244 has finished for PR 25341 at commit
|
@vanzin is this ok to merge? |
Don't know. Need time to review and I've been busy. |
Merging to master. |
} | ||
} | ||
lengths[i] = writer.getNumBytesWritten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick question here. So after this change, there's no place to call ShufflePartitionWriter.getNumBytesWritten()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That method is still there.
The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API. Existing unit tests. Closes apache#25341 from mccheah/dont-redundantly-store-part-lengths. Authored-by: mcheah <mcheah@palantir.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
* Bring implementation into closer alignment with upstream. Step to ease merge conflict resolution and build failure problems when we pull in changes from upstream. * Cherry-pick BypassMergeSortShuffleWriter changes and shuffle writer API changes * [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API. Existing unit tests. Closes apache#25341 from mccheah/dont-redundantly-store-part-lengths. Authored-by: mcheah <mcheah@palantir.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer. Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions. Closes apache#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer. Lead-authored-by: mcheah <mcheah@palantir.com> Co-authored-by: mccheah <mcheah@palantir.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API. * Resolve build issues and remaining semantic conflicts * More build fixes * More build fixes * Attempt to fix build * More build fixes * [SPARK-29072] Put back usage of TimeTrackingOutputStream for UnsafeShuffleWriter and ShufflePartitionPairsWriter. * Address comments * Import ordering * Fix stream reference
What changes were proposed in this pull request?
The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API.
How was this patch tested?
Existing unit tests.