-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30033][core] Manage shuffle IO plugins using Spark's plugin system. #26670
Conversation
…stem. SPARK-25299 is introducing a new plugin interface for shuffle IO; currently, parts of that API provide lifecycle methods that are already covered by the plugin API that was added in SPARK-29396. This change makes some modifications so that: - The driver and executor components of the shuffle plugin extend their respective counterparts in the generic plugin API. - The shuffle IO plugin is managed by the same code that manages other generic plugins. This simplifies and reuses similar code that exists in both implementations, and also provides more functionality to shuffle plugins: not only do they have more contextual information (without having to query APIs like SparkEnv) but they also have access to other functionality in the plugin API that would otherwise require touching internal Spark APIs. There is a small change to the generic plugin API to avoid registering an RPC endpoint and starting threads when not needed; plugins now must explicitly say they want to handle RPC messages for the endpoint to be created. This is done because the default shuffle plugin is now loaded by the plugin system, and does not need the RPC functionality. (This API hasn't been released yet so it's ok to make the change.) The only downside is that initialization of the SortShuffleManager in executors is a bit weird, because of the order in which things are initialized: the shuffle manager is initialized by SparkEnv, and plugin initialization happens after that. In any case, all initialization is done before any tasks are allowed to run.. Currently, the shuffle plugin is always loaded, regardless of whether the sort shuffle manager is being used; this was already the case in the driver, but now is also the case in the executors. It shouldn't be hard to fix that if needed. Tested with existing and updated unit tests.
Test build #114434 has finished for PR 26670 at commit
|
core/src/main/java/org/apache/spark/api/plugin/PluginRpcEndpoint.java
Outdated
Show resolved
Hide resolved
It's surprising to me that the PR Builder ignores |
Test build #114475 has finished for PR 26670 at commit
|
Test build #116100 has finished for PR 26670 at commit
|
I haven't looked at the code yet, what exactly do you mean initialized by SparkEnv? Do you just mean that sparkEnv instantiates the shuffle manager class so its created before you possibly would want to initialize something in the plugin? within the executor plugin it seems like having other plugins initialize first would make sense in case shuffle relied on another plugin initialization, but I supposed the reverse could be true as well. |
Test build #117361 has finished for PR 26670 at commit
|
The shuffle manager is initialized by SparkEnv (look for Some more verbose initialization where the plugin context has multiple "init" methods (for pre- and post- SparkEnv, for example) could solve it, but then the internal state of the plugin context needs to be mutable. Anyway, didn't look too much into making that initialization cleaner. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
SPARK-25299 is introducing a new plugin interface for shuffle IO; currently,
parts of that API provide lifecycle methods that are already covered by the
plugin API that was added in SPARK-29396.
This change makes some modifications so that:
respective counterparts in the generic plugin API.
generic plugins.
This simplifies and reuses similar code that exists in both implementations,
and also provides more functionality to shuffle plugins: not only do they have
more contextual information (without having to query APIs like SparkEnv) but
they also have access to other functionality in the plugin API that would
otherwise require touching internal Spark APIs.
There is a small change to the generic plugin API to avoid registering an
RPC endpoint and starting threads when not needed; plugins now must explicitly
say they want to handle RPC messages for the endpoint to be created. This is
done because the default shuffle plugin is now loaded by the plugin system,
and does not need the RPC functionality. (This API hasn't been released yet
so it's ok to make the change.)
The only downside is that initialization of the SortShuffleManager in executors
is a bit weird, because of the order in which things are initialized: the
shuffle manager is initialized by SparkEnv, and plugin initialization happens
after that. In any case, all initialization is done before any tasks are
allowed to run..
Currently, the shuffle plugin is always loaded, regardless of whether the sort
shuffle manager is being used; this was already the case in the driver, but
now is also the case in the executors. It shouldn't be hard to fix that if
needed.
Tested with existing and updated unit tests.