Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33783][SS] Unload State Store Provider after configured keep alive time #30770

Closed
wants to merge 4 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Dec 15, 2020

What changes were proposed in this pull request?

This patch proposes to enable users to control state store provider in more detailed way. A configuration is added for keeping inactive state store provider alive in executors.

Why are the changes needed?

Currently Spark unloads an inactive state store provider in an maintenance task which is run periodically. So it is said one state store provider might be unloaded immediately after it becomes inactive because the maintenance task is asynchronous.

So it is possible some stores are unloaded earilier and others are later. The inconsistent unloading behavior makes harder to estimate query behavior.

It is also inefficient as the state store provider could be reused in next batches and we should be able to have more control of unloading behavior.

Does this PR introduce any user-facing change?

Yes, users can use the added configuration to change the time an inactive state store provider can keep alive.

How was this patch tested?

Unit test.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@viirya

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@viirya

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37442/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37444/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37444/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37442/

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Test build #132841 has finished for PR 30770 at commit 417e5c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 15, 2020

@SparkQA
Copy link

SparkQA commented Dec 15, 2020

Test build #132843 has finished for PR 30770 at commit 417e5c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate the change? I see you're adding lastAliveTime and setting it, but never checking it. stateStoreKeepAliveTime isn't referenced anywhere. Did you miss adding some commit, and did you check the test is failing with latest master branch?

@viirya
Copy link
Member Author

viirya commented Dec 16, 2020

Could you please elaborate the change? I see you're adding lastAliveTime and setting it, but never checking it. stateStoreKeepAliveTime isn't referenced anywhere. Did you miss adding some commit, and did you check the test is failing with latest master branch?

Oops, seems missing some commits. I removed the change accidentally in last commit.

BTW, the test is not failing in latest mater branch. I'm not sure if we can test if the state store provider is loaded after a certain time? Currently it checks if the provider is loaded by eventually(time duration), but once it checks the provider is loaded, it will report success and finish this eventually block. What I want is to check after the duration.

@SparkQA
Copy link

SparkQA commented Dec 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37507/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37507/

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but I'm not 100% sure how much benefits this could give.

Once the state gets huge, the cost to load from state store should be huge and this PR helps to remedy the situation, but the cost to maintain the same store across different executors isn't also small. So that's not a clear benefit but kind of trade-off.

Next, Spark doesn't know about the fact there's another executor holding the state other than active one, hence that's not considered on planning phase.

If I understand correctly, this PR would only help when

  1. Spark fails to assign task to the executor A which holds the state and reports as active one
  2. Spark assigns task to executor B due to 1), and two executors A and B would hold state
  3. Spark fails again to assign task to the executor B which holds the state and reports as active one
  4. Spark assigns task to executor A among all available executors which relies on luck

If the TTL is set to the considerable amount of time, we don't just allow having 2 copies of the state, but multiple copies of the state. To be fair, that is the existing problem, since we only query and unload in maintenance phase, but the TTL is ensuring that the duration is longer than the maintenance interval, exposing more chance on encountering problem.

Revisiting other streaming engines, they simply run the task unless crashed or query terminated, say the slot is reserved, so they don't have such issue. We should either bring enforcement of physical location with non-trivial timeout (I see your doc PR which remedies the issue which is great), or think about sophisticated alternatives, like allowing replications with status (batch IDs they have loaded) so that Spark can pick the best one, but have a chance to fall back to another one. For sure, end users should indicate the overhead of state replication.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

Once the state gets huge, the cost to load from state store should be huge and this PR helps to remedy the situation, but the cost to maintain the same store across different executors isn't also small. So that's not a clear benefit but kind of trade-off.

Yes. It's a trade-off here. We don't have such flexibility for users for now. Normally we will hope to have good trade-off between loading checkpointed states and keeping same store across different executors. I think this is why we don't remove it immediately.

Currently we will face some cases like a store is loaded in a new executor B right before next maintenance task, so the task removes the previous store in the executor A. But another store might be removed after the maintenance interval.

This makes the loading/unloading behavior inconsistent and hard to reason. Generally I think it is better to keep a consistent behavior here.

This is not a trivial configuration as wrong config value could lead to bad case. But it is also why this is just an internal configuration like the maintenance interval (note: I found I forgot to let it be an internal config).

@HeartSaVioR
Copy link
Contributor

The problem is, this is completely relying on luck - this doesn't give any help on physical plan. Again the problem exists even without the PR, but then shouldn't we fix the root cause instead of extending the possibility of luck? At least Spark should be able to know there're other executors still keeping the state, and taking into account while planning.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

The problem is, this is completely relying on luck - this doesn't give any help on physical plan. Again the problem exists even without the PR, but then shouldn't we fix the root cause instead of extending the possibility of luck? At least Spark should be able to know there're other executors still keeping the state, and taking into account while planning.

We already have preferred locations for stateful operations. This is how Spark takes into account when planning physical stateful operations. I think users can adjust locality wait to force Spark doing that.

The proposal of this is to stabilize the unloading behavior, not just to increase the chance of luck. To avoid unload some stores earlier and some stores later. It makes harder to estimate the query behavior. It is possible that a query works because it unloads stores earlier and sometime it doesn't because it unloads stores later.

If you think we should not make it as a configurable item. I can remove it from a configuration and only check if alive time is more than the maintenance interval. It also helps to stabilize this behavior.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

The problem is, this is completely relying on luck - this doesn't give any help on physical plan. Again the problem exists even without the PR, but then shouldn't we fix the root cause instead of extending the possibility of luck? At least Spark should be able to know there're other executors still keeping the state, and taking into account while planning.

BTW, I think it is not entirely relying on luck. We can make Spark SS reuse previous stores reliably. I'm working on it.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 17, 2020

Let's be clear, it is relying on luck "as it is", it requires non-trivial change to not rely on luck.
E.g. You can make state store coordinator to track executors being inactive but not evicted as well, and take all (but with preference for batches these executor are loaded) into account for calculating preferred location.

That's the prerequisite of replication I've mentioned before, but it can be used for this PR as well (till TTL takes place). Now my previous comments make sense to you?

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

Let's be clear, it is relying on luck "as it is", it requires non-trivial change to not rely on luck.
E.g. You can make state store coordinator to track executors being inactive but not evicted as well, and take all into account for calculating preferred location. That's the prerequisite of replication I've mentioned before, but it can be used for this PR as well (till TTL takes place). Now my previous comments make sense to you?

Yea, thanks for comment. I agree with you. I created SPARK-33816, do you think it is a valid direction?

Note, to clarify, for reusing previous stores, TTL doesn't help much as you said. But I don't think this PR is only for increasing the luck to reuse previous store. I mentioned the reason the TTL helps to make store unloading consistent.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37512/

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37512/

@HeartSaVioR
Copy link
Contributor

If you think we should not make it as a configurable item. I can remove it from a configuration and only check if alive time is more than the maintenance interval. It also helps to stabilize this behavior.

Could you please elaborate for "stablization", via describing the situation how things could be broken? It checks with driver that there's another executor serving state before unloading, so I'm not sure what could be broken.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

Could you please elaborate for "stablization", via describing the situation how things could be broken? It checks with driver that there's another executor serving state before unloading, so I'm not sure what could be broken.

For example, an inactive store could be unloaded after few seconds sometimes and after maintenance interval sometimes? So in some cases a query can work, just because a store is unloaded immediately to free memory. But in some cases, it will OOM because the store is unloaded later?

I'm not sure if we are okay with inconsistent behavior like that, although it describes a rare situation.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 17, 2020

The problem explanation sounds me as we should unload ASAP whenever possible instead of delaying, right?

Providing TTL would delay the unload more than current, even giving less luck on encountering problem. We've said "inconsistent" as unload is done between 0 ~ maintenance interval, but TTL doesn't ensure the state will get evicted exactly at that time, hence not sure about the difference. That only draws a line to set lower bound, but to address the problem lower bound should be minimized. The upper bound is between TTL ~ (TTL + maintenance interval) which is higher than current.

To resolve the problem being described, driver should be also able to tell the executor that another executor is registered as active for the state so the executor should be safe to unload and preferably immediate. To do that, bi-directional communication would be required.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

The problem explanation sounds me as we should unload ASAP whenever possible instead of delaying, right?

Providing TTL would delay the unload more than current, even giving less luck on encountering problem. We've said "inconsistent" as unload is done between 0 ~ maintenance interval, but TTL doesn't ensure the state will get evicted exactly at that time, hence not sure about the difference. That only draws a line to set lower bound, but to address the problem lower bound should be minimized. The upper bound is between TTL ~ (TTL + maintenance interval) which is higher than current.

I'm not sure the context, why we don't do unloading asap. TTL is feasible approach I can think of, to address it, under current maintenance mechanism.

Even just set a lower bound, it is still more valuable, if we consider the further change to reuse previous stores more reliably. 0 ~ maintenance is not useful as we cannot make sure if a previous store is still there. TTL at least lets us know it is there until TTL.

To resolve the problem being described, driver should be also able to tell the executor that another executor is registered as active for the state so the executor should be safe to unload and preferably immediate. To do that, bi-directional communication would be required.

Ideally, yes, if we can make uploading as exactly as configured in TTL, it is best. Not sure how many change is required. I will try to look at this too.

Anyway, consider the further change I will make to reuse previous stores, I think it makes this TTL more useful. WDYT?

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132905 has finished for PR 30770 at commit a9ffcc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

why we don't do unloading asap

Because the conversation is one way - executor registers to driver, executor queries from driver, but driver doesn't notify executors. If we can make inactive state being unloaded ASAP it's ideal, but I haven't investigated on the side-effect.

The possible problem (you've mentioned) comes from the fact there're multiple loads on the state across executors. We're providing the estimated memory usage of state store in metric and if I'm not mistaken inactive state is not counted. That is a huge miss on heap-memory based state store provider. Again that's an existing problem, but once we know the problem I don't want to expand the possibility.

So IMHO the right direction would be either trying our best to unload inactive state ASAP, or considering the replication as the further improvement. Not somewhere in between. Even the latter wouldn't be an improvement if we could enforce Spark to respect the active executor of the state.

@HeartSaVioR
Copy link
Contributor

Let's hear about 3rd voices here before making progress.

@SparkQA
Copy link

SparkQA commented Dec 17, 2020

Test build #132910 has finished for PR 30770 at commit f8b774c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

So IMHO the right direction would be either trying our best to unload inactive state ASAP, or considering the replication as the further improvement. Not somewhere in between. Even the latter wouldn't be an improvement if we could enforce Spark to respect the active executor of the state.

Regarding "the replication", I read through the previous comments. I'm not pretty sure if I understand your point correctly. Is it any different than reusing the stores of previous batch? Because seems to me, you are against to have previous stores kept in TTL and reuse them, or I misread your comments? But the replication sounds similar to me. Can you elaborate it?

@HeartSaVioR
Copy link
Contributor

My preference is unloading state as soon as possible if it's not being used. So if this is achievable I don't think we need to investigate alternatives.

The difference between TTL and replication is the condition on eviction. Assume some bad thing happens (the original proposal only helps in edge case, so I think it's not crazy to assume the bad thing) and Spark somehow assigns task for the same state partition to executor A for batch 1 and B for batch 2 and C for batch 3, etc. TTL based eviction will end up keeping all states as loaded unless it reaches TTL. That said, max copies of states on the fly are indeterministic.

Instead, we could restrict max number of loaded state for the same state store. Coordinator can maintain the latest N active executors for the state store, and executor can evict the state on maintenance if the executor isn't included in the list. Assuming the maintenance interval isn't too long, Spark will be able to maintain roughly closed to the max number of copies for state store. (We could even add another maintenance interval for only unloading state which can be even reduced down heavily. The interval is set to the high value because of cost on snapshotting the state.)

The state store coordinator (in driver) has full control of the condition of eviction, which would help when preferredLocation is getting called. (That's easily achievable on TTL case as well once executor is reporting to coordinator, though.)

That only works if end users could tolerate multiple copies (should be configurable) of state store among executors. Given the problematic case assumes the large state, neither TTL nor replication will work for the case. We should just have to reduce down the unnecessary inactive state.

@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

I see. It is much clearer now.

This is already over the original scope of this PR. The purpose of this change is pretty simple: making store unloading more consistent. The problematic point I saw is, we unload stores arbitrarily, it could be early as immediately, it could be late as maintenance interval. As a complement to current maintenance task, it is not proposed to as a total solution for managing inactive stores.

If we are considering better solution for inactive store management than current maintenance task, then yes, TTL approach is far from good.

I originally thought the design of maintenance task is to intentionally keep inactive store and make it possible for reuse in later batch if the same provider is scheduled on the same executor. But from on above discussion, I think it is totally wrong.

Anyway, I agree that to reduce inactive stores is what we should achieve. I think I will be happy to look into in next if it is achievable.

Thanks for the discussion.

@viirya viirya closed this Dec 17, 2020
@viirya
Copy link
Member Author

viirya commented Dec 17, 2020

@HeartSaVioR We can discuss it on https://issues.apache.org/jira/browse/SPARK-33827 for inactive state store management.

HeartSaVioR pushed a commit that referenced this pull request Dec 28, 2020
### What changes were proposed in this pull request?

This patch proposes to unload inactive state store as soon as possible. The timing of unload inactive state stores, happens when we get to load active state store provider at executors. At the time, state store coordinator will return back the state store provider list including loaded stores that are already loaded by other executors in new batch. Each state store provider in the list will go to unload.

### Why are the changes needed?

Per the discussion at #30770, it makes sense to me we should unload inactive state store asap. Now we run a maintenance task periodically to unload inactive state stores. So there will be some delays between a state store becomes inactive and it is unloaded.

However, we can force Spark to always allocate a state store to same executor, by using task locality configuration. This can reduce the possibility to have inactive state store.

Normally, with locality configuration, we might not able to see inactive state store generally. There is still chance an executor can be failed and reallocated, but in this case, inactive state store is also lost too. So it is not an issue.

Making driver-executor bi-directional for unloading inactive state store looks non-trivial, and seems to me, it is not worth, after considering what we can do with locality.

This proposes a simpler but effective approach. We can check if loaded state store is already loaded at other executor during reporting active state store to the coordinator. If so, it means the loaded store is inactive now, and it is going to be unload by the next maintenance task. Then we unload that store immediately.

How do we make sure the loaded state store in previous batch is loaded at other executor in this batch before reporting in this executor? With task locality and preferred location, once an executor is ready to be scheduled, Spark should assign the state store provider previously loaded at the executor. So when this executor gets a new assignment other than previously loaded state store, it means the previously loaded one is already assigned to other executor.

There is still a delay between the state store is loaded at other executor, and unloading it when reporting active state store at this executor. But it should be minimized now. And there won't be multiple state store belonging to same operator are loaded at the same time at one single executor, because once the executor reports any active store, it will unload all inactive stores. This should not be an issue IMHO.

This is a minimal change to unload inactive state store asap without significant change.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes #30827 from viirya/SPARK-33827.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@viirya viirya deleted the statestore-keepalive branch December 27, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants