New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35181][CORE] Use zstd for spark.io.compression.codec by default #32286
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to reduce the disk IOs and traffic during shuffle
@dongjoon-hyun How about CPU load. If user's job is CPU bound than zstd can introduce perf regression in the case if zstd consumes more CPU.
Test build #137777 has finished for PR 32286 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems OK for consistency and maybe better speed; we should mention it in a migration guide.
@@ -52,6 +52,8 @@ class AdaptiveQueryExecSuite | |||
|
|||
import testImplicits._ | |||
|
|||
override protected def sparkConf = super.sparkConf.set("spark.io.compression.codec", "lz4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be define more like a method with braces, etc or is that how similar code does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late reply.
For this one, we can use this simple form like the other places.
$ git grep 'override protected def sparkConf' | grep super
sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "")
sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala: override protected def sparkConf = super.sparkConf.set("spark.io.compression.codec", "lz4")
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableRecoverPartitionsSuite.scala: override protected def sparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableRecoverPartitionsSuite.scala: override protected def sparkConf = super.sparkConf
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala: override protected def sparkConf: SparkConf = super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "")
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala: override protected def sparkConf: SparkConf = super.sparkConf
Or, like the following~
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #138306 has finished for PR 32286 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138308 has finished for PR 32286 at commit
|
I have tested this with a few runs of TPCDS query q64 that is shuffle-intensive. I see indeed a very inportant reduction of shuffle write and read bytes (about -40% as reported by @dongjoon-hyun). Also shuffleWriteTime and shuffleFetchWaitTime are improved (about -25% and a -75%) in my test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is probably good idea, to trade a little more CPU for less I/O and storage. @dongjoon-hyun I'm a little concerned about changing a fairly fundamental default. Would you put this in 3.2.0? not out of the question IMHO just needs to be in a migration guide I think, and maybe worth one more ping to dev@ to solicit input.
Thank you for your feedbacks, @LucaCanali and @srowen . |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139946 has finished for PR 32286 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #140053 has finished for PR 32286 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #140081 has finished for PR 32286 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor queries, I like the change.
@@ -66,6 +66,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl | |||
.setAppName("test") | |||
.set(UI_ENABLED, false) | |||
.set(IO_ENCRYPTION_ENABLED, enableIOEncryption) | |||
.set(IO_COMPRESSION_CODEC.key, "lz4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, does this test require it to be lz4 ?
Same for AdaptiveQueryExecSuite
below - why not rely on the new default value of zstd ?
Thank you for review, @mridulm . The change is required because the UT depends on the results based on the intermediate statistics of the query. |
Thanks for clarifying @dongjoon-hyun ! |
Thank you. Sure, @mridulm . |
This PR aims to use `zstd` as `spark.io.compression.codec` instead of `lz4` in order to reduce the disk IOs and traffic during shuffle processing and worker decommission storage migration (between executors and to external storage). - Since SPARK-29434 and SPARK-29576, Apache Spark 3.0+ uses ZSTD MapOutputStatus Ser/Deser instead of `GZIP`. - Since SPARK-34503, Apache Spark 3.2 uses ZSTD for `spark.eventLog.compression.codec` by default. **BEFORE** **AFTER**
Looks good to me (pending other reviews comments ofcourse). |
Thank you for your comments, @mridulm . I'm looking at the stability of GitHub Action. |
Test build #140387 has started for PR 32286 at commit |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140554 has finished for PR 32286 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR aims to use
zstd
asspark.io.compression.codec
instead oflz4
in order to reduce the disk IOs and traffic during shuffle processing and worker decommission storage migration (between executors and to external storage).spark.shuffle.mapStatus.compression.codec
by default instead ofGZIP
.spark.eventLog.compression.codec
by default instead ofLZ4
.Why are the changes needed?
To reduce the disk footprint. For TPCDS 3TB case,
zstd
has 44% less shuffle write size and 43% less shuffle read sizeFor some cases, the query execution with
zstd
io is 20% faster thanlz4
io.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pass the CIs.