New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29939][CORE] Add spark.shuffle.mapStatus.compression.codec conf #26611
Conversation
cc @dbtsai , @tgravescs , @gatorsmile |
@@ -1016,6 +1016,13 @@ package object config { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
private[spark] val MAP_STATUS_COMPRESS = | |||
ConfigBuilder("spark.shuffle.mapStatus.compress") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, @gatorsmile , is this what you asked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 .
We compress this always. Previously, it was Gzip. Now, master
branch uses ZStd
.
In this PR, it seems that you are switching to lz4
which is the worst compression codec in this case.
IIRC, @gatorsmile ask to a configuration to choose the codec. Not to choose the shouldCompress or Not
. I'm a little confused.
val objOut = new ObjectOutputStream(codec.compressedOutputStream(out)) | ||
val shouldCompress = conf.get(MAP_STATUS_COMPRESS) | ||
val codec = if (shouldCompress) { | ||
Some(CompressionCodec.createCodec(conf, conf.get(IO_COMPRESSION_CODEC.key, "zstd"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a regression, @Ngone51 . IO_COMPRESSION_CODEC is lz4
by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm... actually, I'm indicated to use zstd
by default in this way. There's difference between conf.get(IO_COMPRESSION_CODEC.key, "zstd")
and conf.get(IO_COMPRESSION_CODEC)
, right ? Or something wrong with the former one ?
Test build #114172 has finished for PR 26611 at commit
|
Yeah. Actually, I have the same mind. Let me revert this. |
can we follow
to create a new config for map status? then we can set the default value correctly. |
Test build #114186 has finished for PR 26611 at commit
|
Yes. @cloud-fan 's example is what we need, @Ngone51 ~ |
.doc("The codec used to compress MapStatus, which is generated by ShuffleMapTask. " + | ||
"By default, Spark provides four codecs: lz4, lzf, snappy, and zstd. You can also " + | ||
"use fully qualified class names to specify the codec. If this is not given, " + | ||
"spark.io.compression.codec will be used.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this sentence if we don't fallback to spark.io.compression.codec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops
Test build #114209 has finished for PR 26611 at commit
|
Test build #114212 has finished for PR 26611 at commit
|
retest this please |
Test build #114222 has finished for PR 26611 at commit
|
When would you want to set this separately? i'm just curious how often varying the codec for all these cases helps |
Personally I like fewer configs as well. Shall we use a fixed codec for event log too (added in #24921)? We should be consistent internally. |
@cloud-fan and @srowen . IIUC, @gatorsmile asked this as a way to fallback to I also prefer a single best default implementation without configuration, but there is a reason we used |
Please confirm the reason of your request, @gatorsmile . |
If it's justified for event log, then it's justified for map status as well. I think we should also give a different value for the event log codec. A possible use case: people develop a custom codec for RDD/shuffle and set it with |
We can also mark these configs as interval if we don't expect users to change it. |
Yes. The previous my comment is about
Also, yes. #24921 allows it for
Thanks. |
LGTM |
Test build #114370 has finished for PR 26611 at commit
|
All test passed. The failure is
cc @viirya , @HyukjinKwon . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master.
Thank you all.
Test FAILed. |
Thanks all! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too
I was out of the office last week, it seems odd to me that we added this config but then we left it internal, what was the reasoning? If we think it's something worth changing, why not expose it. if a user has a problem with Zstd they have no way of knowing about this config unless they go through code or some other spark expert tells them about it. |
It's an interesting philosophical question. If it's internal, why can users set it at all? I think it means something more like "developer API": use it if you know what you're doing, and it's subject to change without notice. It's not documented because, well, the audience is often reading the source code or consulting contributors directly for advanced, niche needs. These also often exist as safety-valve flags that are really not intended for use ever, unless something unforeseen goes wrong and we have to advertise a quick fix. So yeah, the semantics are a little fuzzy. Given that I generally prefer fewer configs anyway, anything we can do to keep this limited to the niche use cases that might need it probably helps avoid the info overload that is the Spark config page. |
Yeah I definitely get that and we have left configs internal in the past, it just seemed here the only reason I saw was in case there is issue with Zstd and if we really think that could happen or has happened then maybe it shouldn't be internal. if there hasn't been then did we really need this config at all like was mentioned in original pr discussion. Anyway, I'm fine with it just wondering if more reasoning behind it. We can always open it up later. |
Using difficult compression codec usually has its general purpose I think such as the efficiency given the pattern of the data. I dont believe zstd is always better in all cases. However, yeah I agree that it is important to describe the reason in general, and it better had to be clarified. |
What changes were proposed in this pull request?
Add a new conf named
spark.shuffle.mapStatus.compression.codec
for user to decide which codec should be used(default byzstd
) forMapStatus
compression.Why are the changes needed?
We already have this functionality for
broadcast
/rdd
/shuffle
/shuflleSpill
,so it might be better to have the same functionality for
MapStatus
as well.Does this PR introduce any user-facing change?
Yes, user now could use
spark.shuffle.mapStatus.compression.codec
to decide which codec should be used duringMapStatus
compression.How was this patch tested?
N/A