New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32077][CORE] Support host-local shuffle data reading when external shuffle service is disabled #28911
Conversation
@attilapiros @tgravescs @jiangxb1987 Please take a look, thanks! |
Test build #124426 has finished for PR 28911 at commit
|
thanks for working on this, was interested in this as well. Can you update the description to include details on your overall approach - where do you get the directories from, etc? |
@tgravescs updated the description, thanks! |
Jenkins, retest this please. |
Test build #124454 has finished for PR 28911 at commit
|
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleFetchSuite.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
Test build #124662 has finished for PR 28911 at commit
|
I've updated the PR. Could you take another look? |
Test build #124905 has finished for PR 28911 at commit
|
Jenkins, retest this please. |
Test build #125437 has finished for PR 28911 at commit
|
@Ngone51 I am still catching up on changes; as part of #25299 or subsequently (or here) are we updating preferred locality for shuffle tasks to account for ability to do node local reads ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a quick look and had a few comments, overall approach seems fine. I need to take a more in depth review.
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
@mridulm we don't and no need to do it. The current implementation of spark/core/src/main/scala/org/apache/spark/MapOutputTracker.scala Lines 610 to 624 in 6fcb70e
|
@tgravescs Thanks for the review. I'll try to address them tomorrow. |
The output is host, but size computation does not aggregate by host.
…On Thu, Jul 9, 2020 at 9:45 AM wuyi ***@***.***> wrote:
@tgravescs <https://github.com/tgravescs> Thanks for the review. I'll try
to address them tomorrow.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#28911 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAMETFBZGIMLWOFUE234CBDR2XX4NANCNFSM4OFYIFCQ>
.
|
Ah, I get your point and I can imagine how it may affect current locality preference. Let's take an example to see if we're on the same page. For example, now we have executor1 and executor2 on node1, executor3 and executor4 on node2. And we also have 10 shuffle data bytes on executor1 and executor2 from task1 and task2 separately. Besides, we also have 40 shuffle data bytes on executor3 and executor4 from task3 and task4 separately. (Assuming all the shuffle data are for the same reduce partition.) With the current implementation of However, node1 can also be a preferred location if we aggregate the size of the shuffle data on the same host, since we will have (10 + 10 / 10 + 10 + 40 + 40) = 0.2 >= 0.2. It looks reasonable to me. cc @attilapiros @tgravescs @jiangxb1987 @holdenk Any ideas? |
The fix for this need not necessarily come in this PR, but can be a feature addition. The solution is fairly straightforward, given existing implementation of |
Test build #125589 has finished for PR 28911 at commit
|
Personally, I'd save locality changes for a follow up PR. Making changes in core is pretty hard, so long as we have a JIRA and it's a good incremental chunk of work keeping it smaller for review (and potential revert if something goes wrong) is better (of course there are situations where that isn't possible, but I think changing locality calculations would be strictly additive.) |
* (when this is a NettyBlockTransferService). Note there's only one executor when this is a | ||
* NettyBlockTransferService because we ask one specific executor at a time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify the last sentence here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I got this.
When the external shuffle service is target by this request we can collect the local dirs of multiple executors at once (as all the host local dirs are available in the external shuffle service running on the host as it is central component in this sense on that host).
But here we can request the local dirs for only one executor: the one which handles the request itself.
@Ngone51 what about adding an assert here:
spark/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Line 119 in 1126341
val execId = getLocalDirs.execIds.head |
Checking the array only contains one executor ID and it is equal with the
executorId
of the blockManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the check to ensure it's the only one executor id but didn't check its equality with blockManager's executor id. Because we only have BlockDataManager
in NettyBlockRpcServer
which does not expose executor id.
I am still wondering whether it's worthwhile to expose it for the sanity check purpose.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
Thank you for the review. I'll try to address them tomorrow! |
1126341
to
bcb6012
Compare
Test build #126227 has finished for PR 28911 at commit
|
Jenkins, retest this please. |
Test build #126234 has finished for PR 28911 at commit
|
If you want, sure! @holdenk . |
Test build #128022 has finished for PR 28911 at commit
|
@Ngone51 The PR description mentions disabled dynamic allocation as requirement but this was changed as result of a review finding. Could you please update it? |
Thank you @dongjoon-hyun for the detailed review. It helps a lot to improve PR. |
@holdenk Sure, please feel free to add any comments! |
@attilapiros Updated, thanks for the reminder! |
Test build #128093 has finished for PR 28911 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minor points of clarification, but no blocking concerns from me after this review.
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
Test build #128139 has finished for PR 28911 at commit
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master for Apache Spark 3.1.0 on December 2020.
Thank you, @Ngone51 and all.
Thank you all!! |
…rnal shuffle service is disabled This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled. Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors. To implement the feature, this PR extends the `HostLocalDirManager ` for both `ExternalBlockStoreClient` and `NettyBlockTransferService`. Also, this PR adds `getHostLocalDirs` for `NettyBlockTransferService` as `ExternalBlockStoreClient` does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request message`GetLocalDirsForExecutors` for simple. After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled. Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable `spark.shuffle.readHostLocalDisk` but also `spark.shuffle.service.enabled`. After this PR, enable `spark.shuffle.readHostLocalDisk` should be enough, and external shuffle service is no longer a pre-requirement. Added test and tested manually. Closes #28911 from Ngone51/support_node_local_shuffle. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR adds support to read host-local shuffle data from disk directly when external shuffle service is disabled.
Similar to #25299, we first try to get local disk directories for the shuffle data, which is located at the same host with the current executor. The only difference is, in #25299, it gets the directories from the external shuffle service while in this PR, it gets the directory from the executors.
To implement the feature, this PR extends the
HostLocalDirManager
for bothExternalBlockStoreClient
andNettyBlockTransferService
. Also, this PR addsgetHostLocalDirs
forNettyBlockTransferService
asExternalBlockStoreClient
does, in order to send the get-dir-request to the corresponding executor. And this PR resued the request messageGetLocalDirsForExecutors
for simple.Why are the changes needed?
After SPARK-27651 / #25299, Spark can read host-local shuffle data directly from disk when external shuffle service is enabled. To extend the future, we can also support it when the external shuffle service is disabled.
Does this PR introduce any user-facing change?
Yes. Before this PR, to use the host-local shuffle reading feature, users should not only enable
spark.shuffle.readHostLocalDisk
but alsospark.shuffle.service.enabled
. After this PR, enablespark.shuffle.readHostLocalDisk
should be enough, and external shuffle service is no longer a pre-requirement.How was this patch tested?
Added test and tested manually.