[CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single ShuffleBlockInfo#2524
[CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single ShuffleBlockInfo#2524s0nskar wants to merge 10 commits intoapache:mainfrom
Conversation
|
@waitinfuture Does these changes looks ok? Also in ticket you have mentioned
Why do we want to limit the size at half of shuffleChunkSize and not use full shuffleChunkSize. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2524 +/- ##
==========================================
+ Coverage 40.17% 40.81% +0.64%
==========================================
Files 218 220 +2
Lines 13742 14104 +362
Branches 1214 1258 +44
==========================================
+ Hits 5520 5755 +235
- Misses 7905 8020 +115
- Partials 317 329 +12 ☔ View full report in Codecov by Sentry. |
waitinfuture
left a comment
There was a problem hiding this comment.
Hi @s0nskar , thanks for the PR! I left a comment, PTAL :)
| sortedBlock.offset = fileIndex; | ||
| sortedBlock.length = length; | ||
| sortedShuffleBlocks.add(sortedBlock); | ||
| fileIndex += transferBlock(offset, length); |
There was a problem hiding this comment.
I think we should put fileIndex += transferBlock(offset, length); below the if-else block, because we need make sure the offset of the first sortedBlock equals to 0.
There was a problem hiding this comment.
Nice catch, Fixed it!
There was a problem hiding this comment.
I'm trying to setup UT locally it should've been caught by it. Facing below error while running the tests, have you seen this error.
scala: bad option: -P:silencer:globalFilters=.*deprecated.*
There was a problem hiding this comment.
I don't see such error, I usually run local UT through Intellij or using sbt https://celeborn.apache.org/docs/latest/developers/sbt/#testing-with-sbt , you can try out this document
There was a problem hiding this comment.
Thanks! This is working for me ✌️
The reason is to avoid that the next batch exceeds half the shuffleChunkSize so the whole length exceeds shuffleChunkSize. But I think this is trivial, it's ok to use full shuffleChunkSize. |
| // combine multiple `ShuffleBlockInfo` into a single `ShuffleBlockInfo` of size | ||
| // less than `shuffleChunkSize` | ||
| if (!sortedShuffleBlocks.isEmpty() && | ||
| sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1).length + length <= shuffleChunkSize) { |
There was a problem hiding this comment.
@waitinfuture Just wanted to double check if <= condition would be fine here, or shuffleChunkSize is exclusive upperBound.
There was a problem hiding this comment.
<= is fine here, shuffleChunkSize is not exclusive upper bound, in fact it's not a strict bound.
|
Thanks for the review @waitinfuture |
| // size of compacted `ShuffleBlockInfo` does not exceed `shuffleChunkSize` | ||
| if (!sortedShuffleBlocks.isEmpty() | ||
| && sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1).length + length | ||
| <= shuffleChunkSize) { |
There was a problem hiding this comment.
The condition "<= shuffleChunkSize" may cause the returned chunk's size to approach 2 * chunkSize. This won't be a trouble but changes the default behavior.
There was a problem hiding this comment.
Yes, say shuffleChunkSize is 8m, the generated blocks can be two 7.9m chunks, when fetch data it can return the two chunks together, which is 15.8m. That's the reason why I mentioned in the jira to use half of shuffleChunkSize, in which the worst case is 3.9m * 3 = 11.7m.
There was a problem hiding this comment.
Maybe narrow the threshold to some fraction * shuffleChunkSize is more safe, say 0.25, WDYT?
sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1).length + length
<= 0.25 * shuffleChunkSize
There was a problem hiding this comment.
yeah, that sounds good. I can make this threshold configurable with default value of 0.25 to start with.
Although i'm not super clear about the issue in the discussion. Why would fetch data will return two or three chunks together making it 7.9 * 2 = 15.8m or 3.9 * 3 = 11.7 as mentioned above. Do clients cache the initial read chunk while reading the next one. If that is the case, can someone point me to this piece of code.
There was a problem hiding this comment.
yeah, that sounds good. I can make this threshold configurable with default value of 0.25 to start with.
Although i'm not super clear about the issue in the discussion. Why would fetch data will return two or three chunks together making it 7.9 * 2 = 15.8m or 3.9 * 3 = 11.7 as mentioned above. Do clients cache the initial read chunk while reading the next one. If that is the case, can someone point me to this piece of code.
Hi @s0nskar , when creating ReduceFileMeta it will compact contiguous ShuffleBlockInfos into a chunk, it first adds the current ShuffleBlockInfo then checks whether size exceeds fetchChunkSize:
When Worker handles fetch, it sends one chunk at a time, and chunk boundaries are stored in ReduceFileMeta#chunkOffsets.
There was a problem hiding this comment.
got it, thanks for the context.
There was a problem hiding this comment.
As per documentation fetchChunkSize is "Max chunk size". So shouldn't we change above condition to respect it. As you mentioned, if the ShuffleBlocks are generated close of 8mb then chunk can be close to 16mb.
We can change this condition to generate offsets closer to fetchChunkSize with some error factor of 10 or 20%. wdyt?
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
| // combine multiple small length `ShuffleBlockInfo` for same mapId such that | ||
| // size of compacted `ShuffleBlockInfo` does not exceed `shuffleChunkSize` | ||
| if (!sortedShuffleBlocks.isEmpty() | ||
| && sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1).length + length |
There was a problem hiding this comment.
can we introduce a variable for sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1)
There was a problem hiding this comment.
refactored the code a little to simplify it.
|
Hi @s0nskar , please run |
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
waitinfuture
left a comment
There was a problem hiding this comment.
LGTM, thanks! Merging to main(v0.5.0)/branch-0.4(v0.4.2)
…leBlockInfo Merging smaller `ShuffleBlockInfo` corresponding into same mapID, such that size of each block does not exceeds `celeborn.shuffle.chunk.size` As sorted ShuffleBlocks are contiguous, we can compact multiple `ShuffleBlockInfo` into one as long as the size of compacted one does not exceeds half of `celeborn.shuffle.chunk.size`. This way we can decrease the number of ShuffleBlock objects. No Existing UTs Closes #2524 from s0nskar/CELEBORN-1410. Lead-authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Co-authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
What changes were proposed in this pull request?
Merging smaller
ShuffleBlockInfocorresponding into same mapID, such that size of each block does not exceedsceleborn.shuffle.chunk.sizeWhy are the changes needed?
As sorted ShuffleBlocks are contiguous, we can compact multiple
ShuffleBlockInfointo one as long as the size of compacted one does not exceeds half ofceleborn.shuffle.chunk.size. This way we can decrease the number of ShuffleBlock objects.Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UTs