[SPARK-31069][CORE] high cpu caused by chunksBeingTransferred in external shuffle service#27831
[SPARK-31069][CORE] high cpu caused by chunksBeingTransferred in external shuffle service#27831chrysan wants to merge 1 commit intoapache:masterfrom
Conversation
… when iterating many streams
|
ok to test. |
|
Test build #119457 has finished for PR 27831 at commit
|
| for (StreamState streamState: streams.values()) { | ||
| sum += streamState.chunksBeingTransferred.get(); | ||
| } |
There was a problem hiding this comment.
Does this becomes a blocker? Or how much performance improvement we can gain in the new approach?
There was a problem hiding this comment.
It makes no big different if the shuffle server handles not many chunks. While in our production environment, we found when the number of chunks reach 100,000 or more, sometimes most of the cpu resource are occupied by iterating and calculation the total number. Then no cpu resource to handle request and response data, which makes everything stuck.
There was a problem hiding this comment.
Have you measured how long the thread is being stuck on calculation here? And how frequently this method is called?
It would be ideal to craft a simple benchmark code and experiment with some variations: num of chunks, contentions (no contention, one thread calling chunkSent, N threads doing the same).
There was a problem hiding this comment.
Looks like the method is not called at only once, so the optimization seems to make sense. Still would be better to have some numbers to back up rationalization of the patch.
| StreamState state = entry.getValue(); | ||
| if (state.associatedChannel == channel) { | ||
| streams.remove(entry.getKey()); | ||
| totalChunksBeingTransferred.addAndGet((-state.chunksBeingTransferred.get())); |
There was a problem hiding this comment.
It is possible to get rid of state.chunksBeingTransferred totally?
There was a problem hiding this comment.
No we can't because without it, we do not know how many to remove from total when channel terminated.
There was a problem hiding this comment.
Maybe we can track transferring chunks by channel, e.g. Map[Channel, Count]? The number of channels should be less than streams.
And, if we can not get rid of state.chunksBeingTransferred, connectionTerminated could also be somewhat time consuming as it also traverse all streams.
There was a problem hiding this comment.
And, if we can not get rid of state.chunksBeingTransferred, connectionTerminated could also be somewhat time consuming as it also traverse all streams.
IIUC, connectionTerminated won't be called frequently compared to chunksBeingTransferred which would less matter.
|
Retest this please. |
|
Test build #119581 has finished for PR 27831 at commit
|
|
retest this, please |
|
Test build #119744 has finished for PR 27831 at commit
|
|
retest this, please |
|
Test build #119748 has finished for PR 27831 at commit
|
|
@chrysan Could you please fill up the template form of PR? Providing number would be nice as I commented, but yes it seems obvious your patch helps for such case you've mentioned.
|
|
Please provide benchmark result of this change, thanks! Because this change introduces another stateful variable we need to maintain, that could potentially lead to logic conflicts. We should only accept this PR in case it does make significant performance difference. |
|
@chrysan Any update on this? |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…se hight cpu cost in external shuffle service when `maxChunksBeingTransferred` use default value ### What changes were proposed in this pull request? Followup from #27831 , origin author chrysan. Each request it will check `chunksBeingTransferred ` ``` public long chunksBeingTransferred() { long sum = 0L; for (StreamState streamState: streams.values()) { sum += streamState.chunksBeingTransferred.get(); } return sum; } ``` such as ``` long chunksBeingTransferred = streamManager.chunksBeingTransferred(); if (chunksBeingTransferred >= maxChunksBeingTransferred) { logger.warn("The number of chunks being transferred {} is above {}, close the connection.", chunksBeingTransferred, maxChunksBeingTransferred); channel.close(); return; } ``` It will traverse `streams` repeatedly and we know that fetch data chunk will access `stream` too, there cause two problem: 1. repeated traverse `streams`, the longer the length, the longer the time 2. lock race in ConcurrentHashMap `streams` In this PR, when `maxChunksBeingTransferred` use default value, we avoid compute `chunksBeingTransferred ` since we don't care about this. If user want to set this configuration and meet performance problem, you can also backport PR #27831 ### Why are the changes needed? Speed up getting `chunksBeingTransferred` and avoid lock race in object `streams` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed UT Closes #30139 from AngersZhuuuu/SPARK-31069. Lead-authored-by: angerszhu <angers.zhu@gmail.com> Co-authored-by: chrysan <chrysanxia@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This change is targeted to speed up the calculation of chunksBeingTransferred to avoid high cpu when there are many stream requests.
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?