-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3613] Record only average block size in MapStatus for large stages #2470
Conversation
@rxin my understanding is that MapStatus is used to check whether a map output file contain data for a certain reducer. Why do we use actual size instead of a boolean flag? Thanks! |
It's more than that. We use estimated sizes to track the total size of outstanding fetches, and try to bound that to a certain size in case an executor sends too many requests and runs out of memory. |
Thanks for the reply. Another question, In hash shuffle write, the data may be screwed for different map output file. For some cases, the reducer may try to fetch many files which does not contain its data. What is the overhead does this introduces? |
It really depends on the number of zero-sized blocks. One thing we can possibly do is to create a compressed bitmap to track zero sized blocks, as discussed here: http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminate-copy-while-sending-data-any-Akka-experts-here-td7127.html#a7146 Maybe we can use the ewah by @lemire |
@Ishiihara let me know if you are interested in working on adding compressed bitmap to this. |
@lemire our requirements here are very simple. We just need to have a bitmap to track the position of zero-sized blocks in Spark shuffle. Things we need from the bitmap implementation are:
So unlike databases, we don't need updates or intersection. I saw that you have published a new archive paper on roaring bitmaps too. Which one would you recommend for this workload? |
@rxin I am definitely interested in working on adding compressed bitmap. What is the first step? Thanks. |
@rxin We are currently working with the Druid.io guys to integrate Roaring (http://roaringbitmap.org). We get good results and even support memory mapped bitmaps (with ByteBuffer). At this point, I would recommend you try out roaring. I am available to help. |
@Ishiihara Get in touch if you have questions. |
test this please |
QA tests have started for PR 2470 at commit
|
QA tests have finished for PR 2470 at commit
|
Test FAILed. |
Jenkins, test this please. |
QA tests have started for PR 2470 at commit
|
QA tests have started for PR 2470 at commit
|
QA tests have finished for PR 2470 at commit
|
QA tests have finished for PR 2470 at commit
|
Test FAILed. |
hm mima failed even though MapStatus is private[spark]. I will add an exclude. |
QA tests have started for PR 2470 at commit
|
QA tests have finished for PR 2470 at commit
|
Test PASSed. |
*/ | ||
private[spark] class DetailedMapStatus( | ||
private[this] var loc: BlockManagerId, | ||
private[this] var compressedSizes: Array[Byte]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why private this? Is this for performance reasons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. i really have no need for an accessor here
QA tests have started for PR 2470 at commit
|
Great, LPGTM |
I'm glad it is P! |
QA tests have finished for PR 2470 at commit
|
Test FAILed. |
QA tests have started for PR 2470 at commit
|
QA tests have finished for PR 2470 at commit
|
Merging in master. |
I also filed a new jira for the compressed bitmap thing: https://issues.apache.org/jira/browse/SPARK-3740 |
@rxin I looked through Roaring bitmap and that is a highly compressed bitmap compared with other bitmap implementations. I will start working on this and keep you updated with progress and issues coming up during implementation. Thanks! |
This changes the way we send MapStatus from executors back to driver for large stages (>2000 tasks). For large stages, we no longer send one byte per block. Instead, we just send the average block size.
This makes large jobs (tens of thousands of tasks) much more reliable since the driver no longer sends huge amount of data.