-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33870][CORE] Enable spark.storage.replication.proactive by default #30876
Conversation
Kubernetes integration test starting |
cc @cloud-fan |
Kubernetes integration test status success |
Test build #133174 has finished for PR 30876 at commit
|
Could you review this, @HyukjinKwon ? |
@@ -384,7 +384,7 @@ package object config { | |||
"get the replication level of the block to the initial number") | |||
.version("2.2.0") | |||
.booleanConf | |||
.createWithDefault(false) | |||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe just enable this by default when we're in Kubernates? I am okay with enabling it by default too if other people are fine. cc @tgravescs and @Ngone51 too FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the other resource managers, this will be helpful because this is a kind of self-healing code. And, this code has been here for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how widely this is used, particularly as it is not enabled by default.
Especially in context of dynamic resource allocation, it can become very chatty when executor's start getting dropped.
Given this, I am not very keen on enabling it atleast for yarn. Thoughts @tgravescs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @mridulm . Actually, we are using it now and it's a good time to test it by default, isn't it?
I am not sure how widely this is used, particularly as it is not enabled by default.
For the following, Apache Spark usually drop only empty executors. If you are saying a storage timeout
configuration, I believe that what we need is to improve storage timeout
configuration behavior after this enabling. I guess storage timeout
had better not cause any chatty
situation, of course.
Especially in context of dynamic resource allocation, it can become very chatty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the ping. I think I'm OK with the change. And shall we document the behaviour change in core-migration-guide.md
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll update the PR, @Ngone51 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past, I found this to be noisy for the cases where replication was enabled - but this was a while back, and I would like to understand better what the 'cost' of enabling this for nontrivial usecases is for master : disabled by default means only developers who specifically test for it pay the price; not everyone.
It is quite common for an application to have references to a persisted RDD even after its use - with the loss of the RDD blocks having little to no functional impact.
This is similar to loss of blocks for an unreplicated persisted RDD - we do not proactively recompute the lost blocks; but do so on demand.
If the idea is we enable this for master, and evaluate the impact over the next 6 months and revisit at the end, I am fine with that: but an evaluation would need to be done before this goes out - else anyone using replicated storage will also get hit with the impact of proactive replication as well, and will need to disable this for their applications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but an evaluation would need to be done before this goes out
or perhaps identify the subset of conditions where it makes sense to enable it by default.
Also, cc @mridulm . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @mridulm.
I'm trying to understand the risk you mentioned for YARN environment.
Could you give me more hints about your concerns on this at the YARN dynamic allocation situation? We can fix it the behavior and move forward if that's valid.
Hi, All. The core migration guide is updated. |
Test build #133259 has finished for PR 30876 at commit
|
Kubernetes integration test starting |
LGTM if tests pass. |
Kubernetes integration test status success |
Thank you, @Ngone51 . The one failure is the following and it's fixed on the master branch at the very latest commit.
|
Thank you all! Merged to master for Apache Spark 3.2.0. |
Would be nice if we hold off merging when there is ongoing discussion, unless there is an immediate need to push changes (like hotfix) |
Oh, sorry, @mridulm . Do you want me to revert this? |
If you are reluctant on enabling this, we can revert this and I'll make another PR, @mridulm . |
Also, I hope we can address your concerns as new official JIRA issues instead of abandoning the existing Spark features. The features behind the |
There are usecases for which this is an invaluable feature - particularly for applications with aggressive DRA or flakey cluster env. |
Let us continue with the discussion - this is a trivial enough PR to revert if we decide to do so. |
Thank you, @mridulm . Sure! In terms of resource management, currently Your recent series of external shuffle service patches are helpful of course. In addition to that, there are more helpful options like this. For example, there is private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
val replicatedSuccessfully = bm.replicateBlock(
blockToReplicate.blockId,
blockToReplicate.replicas.toSet,
blockToReplicate.maxReplicas,
maxReplicationFailures = Some(maxReplicationFailuresForDecommission))
if (replicatedSuccessfully) {
logInfo(s"Block ${blockToReplicate.blockId} offloaded successfully, Removing block now")
bm.removeBlock(blockToReplicate.blockId)
logInfo(s"Block ${blockToReplicate.blockId} removed")
} else {
logWarning(s"Failed to offload block ${blockToReplicate.blockId}")
}
replicatedSuccessfully
} However, sometimes, K8s doesn't wait for an enough time and kill executors during migration processing at its predefined grace period. At that time, This is my focused use case and I love to hear your concerns. You have all my ears. 😄 |
Thanks for the details, that definitely sounds like a good rationale to enable it by default for k8s: for other resource managers, this does not necessarily apply. |
Thank you for your advice. I added my replies.
Yes, it does.
Sure, I'll make it sure to identify the desirable and problematic conditions and try to document it officially at Apache Spark 3.2.0 timeframe. This feature is added at 2.2.0 and seems to need more community love due to the recent environment changes especially in cloud environments. BTW, for the following comments (@mridulm and @HyukjinKwon ),
The following questions come to me.
This is a kind of self-healing feature. So, I believe it can help (1) ~ (4). For the followings, it's unclear to me.
Where is the location of the In this PR, although Apache Spark RDD maintains the lineages, the number of replicas and proactive recovery is a matter of |
Merry Christmas and Happy New Year, @mridulm , @HyukjinKwon , @Ngone51 . 😄 |
@dongjoon-hyun proactive replication only applies to persisted RDD blocks, not shuffle blocks - not sure if I am missing something here. Even for persisted RDD blocks, it specifically applies when RDD is persisted with storage levels where Please let me know if I am missing something in my understanding. [1] ESS serving disk backed blocks might have some corner cases to this flow which I have not thought through. |
@mridulm . Your comments are true and nothing wrong to me. I agree with you in every bits and we can disable this back for the problematic cases before Apache Spark 3.2.0 vote. To do that, I believe that we are able to agree that we need to identify what are the problematic corner cases in this threads. At least, we need to provide a better document to the community about this option if some PMCs already have an implicit knowledge about the reasons why this option should be prohibited in YARN environment. It's an invaluable knowledge for the community to share. Besides, we are continuing this discussion because your initial concerns are crucial to the community. AFAIK, nobody else shared the concerns before explicitly.
Could you elaborate about your concern more specifically?
Again, I'm not aiming to protect the default value of the configuration. It's just a configuration and the decision is up to us (you and me and all the community member). It's easy to disable this or to abandon this while it's difficult to improve this for Apache Spark. I'm trying to understand why this should be prohibited in some resource managers or in a normal Spark operation environment and trying to make the Apache Spark better for those cases. That's the reason why I tried to go deeper for that part by proposing the potential points and asking you similar questions repeatedly in this thread specifically. We will have many choices in Apache Spark 3.2.0 if the implicit knowledge is shared more.
So far, I didn't get your answers explicitly. Please let me know if I missed something there. |
(Sigh, github prematurely posted my previous comment - fleshing it out here). As I mentioned above, the flag helps applications which are fine with paying the overhead for proactive replication. I have sketched some cases where proactive replication does not help, and others where they could be useful - these are examples ofcourse : but in the end, it is specific to the application. Making it default will impact all applications which have replication > 1: given this PR is proposing to make it the default, I would like to know if there was any motivating reason to make this change ? If the cost of proactive replication is close to zero now (my experiments were from a while back), ofcourse the discussion is moot - did we have any results for this ? Note that the above is orthogonal to DRA evicting an executor via storage timeout configuration. |
Specifically for this usecase, we dont need to make it a spark default right ? If true, then sure, for those applications/cluster env, making proactive replication an application default might makes sense. In the scenario above though, how do we handle everything else ? |
Thank you, @mridulm . I really appreciate your replies. Let me follow your thoughts.
I know that this is a holiday season and I'm really grateful about your opinions. If you don't mind, can we have a Zoom meeting when you are available, @mridulm ? I think we have different ideas on the open source development and about the scope of this work. I want to make a progress in this area in Apache Spark 3.2.0 by completing a document or a better implementation or anything more. Please let me know if you can have a Zoom meeting. Thanks! |
Before answering specific queries below, I want to set the context. I was trying to understand what the impact would be, what the tradeoffs involved are, when we enable by default:
As I mentioned earlier, I am fine with collecting data by enabling this flag by default.
Spark is self-healing via lineage :-)
I am not proposing to change the default behavior, you are ... hence my query :-)
Couple of points here:
If we can do better on this, I am definitely very keen on it !
This was in response to the scenario described.
I am not sure where we got this from my comments ("because you insisted that YARN doesn't need this feature from the beginning. I'm still not sure that YARN environment is so invincible like that") ? I clearly miscommunicated something here ! My comment on yarn was in agreement with @HyukjinKwon's suggestion. The other was in response to the specific k8s scenario you presented - "currently K8s environment is more aggressive than the other existing resource managers".
Sure ! |
Thanks for here and the email, @mridulm . I replied your email with my zoom link. |
…ault ### What changes were proposed in this pull request? This PR aims to enable `spark.storage.replication.proactive` by default for Apache Spark 3.2.0. ### Why are the changes needed? `spark.storage.replication.proactive` is added by SPARK-15355 at Apache Spark 2.2.0 and has been helpful when the block manager loss occurs frequently like K8s environment. ### Does this PR introduce _any_ user-facing change? Yes, this will make the Spark jobs more robust. ### How was this patch tested? Pass the existing UTs. Closes apache#30876 from dongjoon-hyun/SPARK-33870. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 90d6f86) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This PR aims to enable
spark.storage.replication.proactive
by default for Apache Spark 3.2.0.Why are the changes needed?
spark.storage.replication.proactive
is added by SPARK-15355 at Apache Spark 2.2.0 and has been helpful when the block manager loss occurs frequently like K8s environment.Does this PR introduce any user-facing change?
Yes, this will make the Spark jobs more robust.
How was this patch tested?
Pass the existing UTs.