-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25224][SQL] Improvement of Spark SQL ThriftServer memory management #22219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Did you verify this feature manually? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: two more spaces for indentation?
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is it better to put them into one line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is it better to put them into one line?
Yes, I verified results of a variety of queries, and memory & performance. This patch passed all our query test. And there was no performance degradation in our test cases. And below is result of memory test. After patch : 283910.0KB -> 316108.3KB => 31.44MB increases Memory improvement is very large, because the size of compressed result buffer surprisingly smaller than I expected. Decompressed InternalRows are collected immediately after sending them while Young GC is done, so the usage of Old Gen Heap is much smaller than before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be good to make it public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, why is it public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And add a description for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to make public. There is no "action" to decode rows incrementally except toLocalIterator. toLocalIterator has poor performance for data sources that have many partitions or for selecting rows with limited count. So it would be good choice providing another option for reducing memory pressure of decompressing & deserializing result rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can always make it public later if there is such requirement. We should be careful to add public api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we need to consider the decision carefully. At least, if we would decide it as public, is it better to add @experimental
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add description to collectCountAndIterator, but don't make it public or experimental yet.
Would it possible to prepare test cases? IIUC, this feature can be enabled without thriftServer by writing some test code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some description for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And add a description for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of above change looks just refactoring. Looks fine but may be avoided to reduce diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kiszk
Yeah, I'll prepare test cases.
@viirya
Above are changed to execute decodeUnsafeRows lazily for reduce peak memory. Changing type of numPartsToTry to val may be refactoring part that can be separated from this patch. If reviewers want to revert this refactoring, I can separate it and make another trivial pull request for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kiszk
Do we revert this commit to reduce diff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also fine to revert this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When incremental collect is disabled, and users want to use FETCH_FIRST
, we expect the returned rows are cached and can get iterators again. Now FETCH_FIRST
will trigger re-execution no matter incremental collect or not. I think this maybe performance regression in some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the case you commented (incremental collect is disabled & FETCH_FIRST) has performance degradation. If total rows are bigger than batchCollectLimit, I thought it is not suitable case for caching decompressed rows because of memory pressure. If FETCH_FIRST caches compressed rows(not decompressed rows) regardless of row count, the result that exceed batchLimit can be cached too. But "Iterator" return type may not a good choice for that. Instead "View" of scala is proper choice, because "Iterator" can be created again with "View" of compressed rows, but it causes much more source change so I didn't do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya
I share my idea of solving the problem you commented.
- Change the return type of "collectCountAndIterator" to tuple of (Long, SeqView)
- The SeqView is created from encoded result array(which is the result of getByteArrayRdd().collect() in SparkPlan), and holds deserializing operations defined in DataSet.
- Change type of resultList in SparkExecuteStatementOperation to Option[Iterable[SparkRow]], because both Array & SeqView are Iterable.
- ThriftServer checks if row count exceeds THRIFTSERVER_BATCH_COLLECTION_LIMIT, and decide.
-> if row count > THRIFTSERVER_BATCH_COLLECTION_LIMIT => resultList cache SeqView.
-> else resultList caches Array which is collected from SeqView. => resultList cache Array.
How do you think about this idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should try to cache encoded result if row count >THRIFTSERVER_BATCH_COLLECTION_LIMIT
when incremental collect is disabled. It sounds to me more close to what the mode does.
Otherwise its behavior looks close as incremental collect mode as it does re-execution. Besides, it collects all data back to driver in encoded format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to cache it. Thank you for reply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should expose this as an API. This JIRA/PR don't target this API anyway, right? shall we just leave it as private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I agree with you.
Add test cases. |
Change the accessor of collectCountAndIterator to private[sql]. And updated doc of feature that I define in ThriftServer. |
@kiszk @viirya @HyukjinKwon @cloud-fan |
@Dooyoung-Hwang Would it possible to add a test case to verify result with and without incrementalCollects by changing a value of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"client'. Only valid if ${THRIFTSERVER_INCREMENTAL_COLLECT.key} is false. " +
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a thriftserver specific issue, can we do the same thing by fixing code only in the thriftserver package?
IMHO we'd be better not to modify code in the sql package as much as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, there is no API to deserialize result iteratively in Dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean there is no way to implement that functionality outside Dataset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I mean. I thought that 'deserializer' is declared with private, so there is now way to get 'deserializer' out of Dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about changing private to private[sql], then implementing this based on the deserializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, you don't prefer adding function to Dataset. If withAction & deserializer are changed to private[sql], this implementation can be moved out. Is this function useful for other SQL server to reduce memory usage of query execution? I don't think it looks good because Projection is created in the outside of Dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd be better to separate this PR into two parts you proposed in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to separate this PR by sql part and thriftserver part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, collectCountAndSeqView
and executeTakeSeqView
depend on each other? If no, please split them into separate PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you keep the current behavior? Then, please implement a SeqView
iteration model turned on/off by a new option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I'll change this feature as boolean. Thank you for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@HyukjinKwon can this be merged? |
ok to test |
nope not yet. It needs some more review iterations. |
Test build #97363 has finished for PR 22219 at commit
|
ffafd62
to
f05570c
Compare
Test build #97376 has finished for PR 22219 at commit
|
Test build #97388 has finished for PR 22219 at commit
|
Dear reviewers (cc : @dongjoon-hyun ) I updated these.
|
Test build #97452 has finished for PR 22219 at commit
|
Test build #97530 has finished for PR 22219 at commit
|
ok to test |
Test build #97893 has finished for PR 22219 at commit
|
please merge |
I cannot progress further, because there is no review-comment anymore. |
I have been using this patch for sometimes too and I can confirm that it helped a lot with OutOfMemory and GC issues on the thrift server. I believe it could benefit other users if merged. |
@Dooyoung-Hwang sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala conflict? |
When collect result with this api, only compressed rows reside in heap memory, so that heap usage of collection can be controlled by caller side adaptively.
1. executeTakeIterator returns iterator of decoded rows but only encoded rows reside in heap space 2. Now excuteTake use executeTakeIterator. So it decodes row exactly n-times.
- So that Thriftserver can decide whether to decompress & deserialize them all together or do it incrementally with considering total row count. If total row count is bigger than configured threshold(spark.sql.thriftServer. batchCollectionLimit), rows are decoded incrementally before they are sent to client. Else case is collected all together for performance. - Add feature to SQLConf for configuring output result rows. spark.sql.thriftServer.batchCollectionLimit : When a count of result row exceed this, result rows are collected incrementally. Only valid for non-incremental collection. Default is no limit. - Use return type to SeqView to use cached result for the FETCH_FIRST
136a4f9
to
aa0cb41
Compare
Test build #102744 has finished for PR 22219 at commit
|
@HyukjinKwon pls merge |
Can we implement something like JDBC' ResultSet |
Maybe FETCH_REVERSE or previous() would be difficult, because this feature is based on the Iterator of scala. |
Test build #102801 has finished for PR 22219 at commit
|
has this patch been introduced into spark? can i use this on the new spark 2.4? |
Net merged yet. |
Can one of the admins verify this patch? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
@Dooyoung-Hwang @maropu , is this merged ? we are facing these issues in production. if its merged, can we use this in spark 3.2.0 |
This is not merged, @RamakrishnaChilaka . |
@dongjoon-hyun , will this be merged ? if yes, can we please update this Patch, so that it works for spark 3.2.0. Please confirm. thanks. |
What changes were proposed in this pull request?
Spark SQL only have two options for managing thriftserver memory - enable spark.sql.thriftServer.incrementalCollect or not
The case of enabling spark.sql.thriftServer.incrementalCollects
The case of disabling spark.sql.thriftServer.incrementalCollects
The improvement idea of solving these problems is below.
DataSet does not decompress & deserialize result, and just return total row count & iterator to SQL-Executor. By doing that, only compressed data reside in memory, so that the memory usage is not only much lower than before but can be controlled with spark.driver.maxResultSize config.
After SQL-Executor get total row count & iterator from DataSet, SQL-Executor could decide whether deserializing them collectively or iteratively with considering returned row count.
How was this patch tested?
Add test cases.