-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38273][SQL] decodeUnsafeRows
's iterators should close underlying input streams
#35613
Conversation
867467a
to
ef600eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution, @kevins-29 .
cc @viirya , @sunchao , @HyukjinKwon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask where did you get the result and on which Java? When I tried on master
branch, the number range is a little different. If you don't mind, could you update the PR description according to the master branch result?
$ while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x 1354164 | grep "total kB" | awk '{print $4}'); sleep 10; done;
"2022-02-22 09:24:16",466124
"2022-02-22 09:24:26",466188 <= empty spark shell
"2022-02-22 09:24:36",918172
"2022-02-22 09:24:46",1193860
"2022-02-22 09:24:56",1229620
"2022-02-22 09:25:06",1243004
"2022-02-22 09:25:16",1252664 <= write finish
"2022-02-22 09:25:26",1702372
"2022-02-22 09:25:36",1774608
"2022-02-22 09:25:46",1824896
"2022-02-22 09:25:56",1879808
"2022-02-22 09:26:06",1931040
"2022-02-22 09:26:16",1977812
"2022-02-22 09:26:26",1993824
"2022-02-22 09:26:36",1993824 <= read finish
Hi @dongjoon-hyun, I ran the above tests on a build from master. The test was run using OpenJDK 11. Did you export Also the assumption was the default |
Yes, I followed the instruction on Ubuntu/Java17 without any other options. Let me try it again with Java 11. |
With Java 11, the scale is similar.
To be clear, this patch shows the improvement (2037392 -> 1739112), of course.
|
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
With regards to the discrepancy when re-testing. I made a mistake when documenting the script, I originally used 1,000,000 records per a batch, not 100,000. I have updated the script accordingly. |
Thank you for the investigation and update! |
decodeUnsafeRows
's iterators should close underlying input streams
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay.
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
Can one of the admins verify this patch? |
Looks fine to me 2 |
+CC @zhouyejoe |
I did a final run with all of the above changes: Results
|
Thank you for the final update. Please put it into the PR description, @kevins-29 . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, All. It seems that most comments are addressed.
I'm going to merge this tomorrow morning.
Please let me know if you are still reviewing.
I am done with reviewing, looks pretty good |
…an.scala Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
…ying input streams ### What changes were proposed in this pull request? Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly. This happens in Spark driver only. ### Why are the changes needed? SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of `CompressionCodec.compressedInputStream` would need to manually close the stream as this would no longer be handled by the finaliser mechanism. In SparkPlan, the result of `CompressionCodec.compressedInputStream` is wrapped in an Iterator which never calls close. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? #### Spark Shell Configuration ```bash $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g" $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd ``` #### Test Script ```scala import java.sql.Timestamp import java.time.Instant import spark.implicits._ case class Record(timestamp: Timestamp, batch: Long, value: Long) (1 to 300).foreach { batch => sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()), batch, _)).toDS.write.parquet(s"test_data/batch_$batch") } (1 to 300).foreach(batch => spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect()) ``` #### Memory Monitor ```shell $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done; ``` #### Results ##### Before ``` "2022-02-22 11:55:23",1400016 "2022-02-22 11:55:33",1522024 "2022-02-22 11:55:43",1587812 "2022-02-22 11:55:53",1631868 "2022-02-22 11:56:03",1657252 "2022-02-22 11:56:13",1659728 "2022-02-22 11:56:23",1664640 "2022-02-22 11:56:33",1674152 "2022-02-22 11:56:43",1697320 "2022-02-22 11:56:53",1689636 "2022-02-22 11:57:03",1783888 "2022-02-22 11:57:13",1896920 "2022-02-22 11:57:23",1950492 "2022-02-22 11:57:33",2010968 "2022-02-22 11:57:44",2066560 "2022-02-22 11:57:54",2108232 "2022-02-22 11:58:04",2158188 "2022-02-22 11:58:14",2211344 "2022-02-22 11:58:24",2260180 "2022-02-22 11:58:34",2316352 "2022-02-22 11:58:44",2367412 "2022-02-22 11:58:54",2420916 "2022-02-22 11:59:04",2472132 "2022-02-22 11:59:14",2519888 "2022-02-22 11:59:24",2571372 "2022-02-22 11:59:34",2621992 "2022-02-22 11:59:44",2672400 "2022-02-22 11:59:54",2728924 "2022-02-22 12:00:04",2777712 "2022-02-22 12:00:14",2834272 "2022-02-22 12:00:24",2881344 "2022-02-22 12:00:34",2935552 "2022-02-22 12:00:44",2984896 "2022-02-22 12:00:54",3034116 "2022-02-22 12:01:04",3087092 "2022-02-22 12:01:14",3134432 "2022-02-22 12:01:25",3198316 "2022-02-22 12:01:35",3193484 "2022-02-22 12:01:45",3193212 "2022-02-22 12:01:55",3192872 "2022-02-22 12:02:05",3191772 "2022-02-22 12:02:15",3187780 "2022-02-22 12:02:25",3177084 "2022-02-22 12:02:35",3173292 "2022-02-22 12:02:45",3173292 "2022-02-22 12:02:55",3173292 ``` ##### After ``` "2022-02-22 12:05:03",1377124 "2022-02-22 12:05:13",1425132 "2022-02-22 12:05:23",1564060 "2022-02-22 12:05:33",1616116 "2022-02-22 12:05:43",1637448 "2022-02-22 12:05:53",1637700 "2022-02-22 12:06:03",1653912 "2022-02-22 12:06:13",1659532 "2022-02-22 12:06:23",1673368 "2022-02-22 12:06:33",1687580 "2022-02-22 12:06:43",1711076 "2022-02-22 12:06:53",1849752 "2022-02-22 12:07:03",1861528 "2022-02-22 12:07:13",1871200 "2022-02-22 12:07:24",1878860 "2022-02-22 12:07:34",1879332 "2022-02-22 12:07:44",1886552 "2022-02-22 12:07:54",1884160 "2022-02-22 12:08:04",1880924 "2022-02-22 12:08:14",1876084 "2022-02-22 12:08:24",1878800 "2022-02-22 12:08:34",1879068 "2022-02-22 12:08:44",1880088 "2022-02-22 12:08:54",1880160 "2022-02-22 12:09:04",1880496 "2022-02-22 12:09:14",1891672 "2022-02-22 12:09:24",1878552 "2022-02-22 12:09:34",1876136 "2022-02-22 12:09:44",1890056 "2022-02-22 12:09:54",1878076 "2022-02-22 12:10:04",1882440 "2022-02-22 12:10:14",1893172 "2022-02-22 12:10:24",1894216 "2022-02-22 12:10:34",1894204 "2022-02-22 12:10:44",1894716 "2022-02-22 12:10:54",1894720 "2022-02-22 12:11:04",1894720 "2022-02-22 12:11:15",1895232 "2022-02-22 12:11:25",1895496 "2022-02-22 12:11:35",1895496 ``` Closes #35613 from kevins-29/spark-38273. Lead-authored-by: Kevin Sewell <kevins_25@apple.com> Co-authored-by: kevins-29 <100220899+kevins-29@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 43c89dc) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Thank you, @kevins-29 , @viirya , @HyukjinKwon , @sunchao . @kevins-29 . I added you to the Apache Spark contributor group and assigned SPARK-38273 to you. |
…ying input streams ### What changes were proposed in this pull request? Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly. This happens in Spark driver only. ### Why are the changes needed? SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of `CompressionCodec.compressedInputStream` would need to manually close the stream as this would no longer be handled by the finaliser mechanism. In SparkPlan, the result of `CompressionCodec.compressedInputStream` is wrapped in an Iterator which never calls close. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? #### Spark Shell Configuration ```bash $> export SPARK_SUBMIT_OPTS="-XX:+AlwaysPreTouch -Xms1g" $> $SPARK_HOME/bin/spark-shell --conf spark.io.compression.codec=zstd ``` #### Test Script ```scala import java.sql.Timestamp import java.time.Instant import spark.implicits._ case class Record(timestamp: Timestamp, batch: Long, value: Long) (1 to 300).foreach { batch => sc.parallelize(1 to 1000000).map(Record(Timestamp.from(Instant.now()), batch, _)).toDS.write.parquet(s"test_data/batch_$batch") } (1 to 300).foreach(batch => spark.read.parquet(s"test_data/batch_$batch").as[Record].repartition().collect()) ``` #### Memory Monitor ```shell $> while true; do echo \"$(date +%Y-%m-%d' '%H:%M:%S)\",$(pmap -x <PID> | grep "total kB" | awk '{print $4}'); sleep 10; done; ``` #### Results ##### Before ``` "2022-02-22 11:55:23",1400016 "2022-02-22 11:55:33",1522024 "2022-02-22 11:55:43",1587812 "2022-02-22 11:55:53",1631868 "2022-02-22 11:56:03",1657252 "2022-02-22 11:56:13",1659728 "2022-02-22 11:56:23",1664640 "2022-02-22 11:56:33",1674152 "2022-02-22 11:56:43",1697320 "2022-02-22 11:56:53",1689636 "2022-02-22 11:57:03",1783888 "2022-02-22 11:57:13",1896920 "2022-02-22 11:57:23",1950492 "2022-02-22 11:57:33",2010968 "2022-02-22 11:57:44",2066560 "2022-02-22 11:57:54",2108232 "2022-02-22 11:58:04",2158188 "2022-02-22 11:58:14",2211344 "2022-02-22 11:58:24",2260180 "2022-02-22 11:58:34",2316352 "2022-02-22 11:58:44",2367412 "2022-02-22 11:58:54",2420916 "2022-02-22 11:59:04",2472132 "2022-02-22 11:59:14",2519888 "2022-02-22 11:59:24",2571372 "2022-02-22 11:59:34",2621992 "2022-02-22 11:59:44",2672400 "2022-02-22 11:59:54",2728924 "2022-02-22 12:00:04",2777712 "2022-02-22 12:00:14",2834272 "2022-02-22 12:00:24",2881344 "2022-02-22 12:00:34",2935552 "2022-02-22 12:00:44",2984896 "2022-02-22 12:00:54",3034116 "2022-02-22 12:01:04",3087092 "2022-02-22 12:01:14",3134432 "2022-02-22 12:01:25",3198316 "2022-02-22 12:01:35",3193484 "2022-02-22 12:01:45",3193212 "2022-02-22 12:01:55",3192872 "2022-02-22 12:02:05",3191772 "2022-02-22 12:02:15",3187780 "2022-02-22 12:02:25",3177084 "2022-02-22 12:02:35",3173292 "2022-02-22 12:02:45",3173292 "2022-02-22 12:02:55",3173292 ``` ##### After ``` "2022-02-22 12:05:03",1377124 "2022-02-22 12:05:13",1425132 "2022-02-22 12:05:23",1564060 "2022-02-22 12:05:33",1616116 "2022-02-22 12:05:43",1637448 "2022-02-22 12:05:53",1637700 "2022-02-22 12:06:03",1653912 "2022-02-22 12:06:13",1659532 "2022-02-22 12:06:23",1673368 "2022-02-22 12:06:33",1687580 "2022-02-22 12:06:43",1711076 "2022-02-22 12:06:53",1849752 "2022-02-22 12:07:03",1861528 "2022-02-22 12:07:13",1871200 "2022-02-22 12:07:24",1878860 "2022-02-22 12:07:34",1879332 "2022-02-22 12:07:44",1886552 "2022-02-22 12:07:54",1884160 "2022-02-22 12:08:04",1880924 "2022-02-22 12:08:14",1876084 "2022-02-22 12:08:24",1878800 "2022-02-22 12:08:34",1879068 "2022-02-22 12:08:44",1880088 "2022-02-22 12:08:54",1880160 "2022-02-22 12:09:04",1880496 "2022-02-22 12:09:14",1891672 "2022-02-22 12:09:24",1878552 "2022-02-22 12:09:34",1876136 "2022-02-22 12:09:44",1890056 "2022-02-22 12:09:54",1878076 "2022-02-22 12:10:04",1882440 "2022-02-22 12:10:14",1893172 "2022-02-22 12:10:24",1894216 "2022-02-22 12:10:34",1894204 "2022-02-22 12:10:44",1894716 "2022-02-22 12:10:54",1894720 "2022-02-22 12:11:04",1894720 "2022-02-22 12:11:15",1895232 "2022-02-22 12:11:25",1895496 "2022-02-22 12:11:35",1895496 ``` Closes apache#35613 from kevins-29/spark-38273. Lead-authored-by: Kevin Sewell <kevins_25@apple.com> Co-authored-by: kevins-29 <100220899+kevins-29@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 43c89dc) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Wrapping the DataInputStream in the SparkPlan.decodeUnsafeRows method with a NextIterator as opposed to a plain Iterator, this will allow us to close the DataInputStream properly. This happens in Spark driver only.
Why are the changes needed?
SPARK-34647 replaced the ZstdInputStream with ZstdInputStreamNoFinalizer. This meant that all usages of
CompressionCodec.compressedInputStream
would need to manually close the stream as this would no longer be handled by the finaliser mechanism.In SparkPlan, the result of
CompressionCodec.compressedInputStream
is wrapped in an Iterator which never calls close.Does this PR introduce any user-facing change?
No
How was this patch tested?
Spark Shell Configuration
Test Script
Memory Monitor
Results
Before
After