-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21595] Separate thresholds for buffering and spilling in ExternalAppendOnlyUnsafeRowArray #18843
Conversation
@hvanhovell : let me know what you think about this. |
Test build #80236 has finished for PR 18843 at commit
|
retest this please |
Test build #80247 has finished for PR 18843 at commit
|
9f66038
to
398ccaf
Compare
Test build #80255 has finished for PR 18843 at commit
|
Test build #80256 has finished for PR 18843 at commit
|
retest this please |
LGTM - pending jenkins |
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. | ||
* This may lead to a performance regression compared to the normal case of using an | ||
* [[ArrayBuffer]] or [[Array]]. | ||
* - If [[numRowsSpillThreshold]] is too high, the in-memory array may occupy more memory than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo? this should be numRowsInMemoryBufferThreshold
. We may spill before reaching numRowsSpillThreshold
if memory is not enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it was a typo. Corrected it
.createWithDefault(4096) | ||
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) | ||
|
||
val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just have one config for both window and SMJ? ideally we can say this config is for ExternalAppendOnlyUnsafeRowArray
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with that. We can even go a step further and just have two configs : in-mem threshold and spill threshold at the ExternalAppendOnlyUnsafeRowArray
for all its clients (currently SMJ, cartesian product, Window). That way we have consistency across all clients and both knobs. One downside is backward compatibility : spill threshold was already defined per operator level and people might be using it in prod.
Let me know what you think about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's keep them separated for each operator.
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + | ||
"join operator") | ||
.intConf | ||
.createWithDefault(Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a reasonable default value? won't it lead to OOM according to the document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the current value. I suppose you want to be able to tune it if you have to. Not all of us are running Spark at FB scale :)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before introducing ExternalAppendOnlyUnsafeRowArray
, SMJ used to hold in-memory data in scala's ArrayBuffer
. Its backed by an array which would at max be Int.MaxValue
in size... so this default is keeping things as they were before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
Test build #80451 has finished for PR 18843 at commit
|
398ccaf
to
a69969c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR as per review comments by @cloud-fan. I haven't made changes for all his comments and replied for more discussion in those places
* - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. | ||
* This may lead to a performance regression compared to the normal case of using an | ||
* [[ArrayBuffer]] or [[Array]]. | ||
* - If [[numRowsSpillThreshold]] is too high, the in-memory array may occupy more memory than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it was a typo. Corrected it
.createWithDefault(4096) | ||
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) | ||
|
||
val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with that. We can even go a step further and just have two configs : in-mem threshold and spill threshold at the ExternalAppendOnlyUnsafeRowArray
for all its clients (currently SMJ, cartesian product, Window). That way we have consistency across all clients and both knobs. One downside is backward compatibility : spill threshold was already defined per operator level and people might be using it in prod.
Let me know what you think about that.
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " + | ||
"join operator") | ||
.intConf | ||
.createWithDefault(Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before introducing ExternalAppendOnlyUnsafeRowArray
, SMJ used to hold in-memory data in scala's ArrayBuffer
. Its backed by an array which would at max be Int.MaxValue
in size... so this default is keeping things as they were before.
Test build #80504 has finished for PR 18843 at commit
|
def sortMergeJoinExecBufferSpillThreshold: Int = | ||
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD) | ||
|
||
def sortMergeJoinExecBufferInMemoryThreshold: Int = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we introduce a similar config for cartesian product?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Since there was no in-memory buffer for cartesian product before, I am using a conservative value 4096 for the in-memory buffer threshold. However, the spill threshold is set to UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD
like it was before.
LGTM except one question, thanks for the fix! |
a69969c
to
ab5cd2e
Compare
LGTM, pending jenkins |
retest this please |
jenkins test this please |
Test build #80536 has finished for PR 18843 at commit
|
Merging to master/2.2. Thanks! |
…nalAppendOnlyUnsafeRowArray ## What changes were proposed in this pull request? [SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre #16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers). Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes #18843 from tejasapatil/SPARK-21595. (cherry picked from commit 9443999) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
…nalAppendOnlyUnsafeRowArray ## What changes were proposed in this pull request? [SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers). Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes apache#18843 from tejasapatil/SPARK-21595.
…nalAppendOnlyUnsafeRowArray ## What changes were proposed in this pull request? [SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre apache#16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers). Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control. ## How was this patch tested? Added unit tests Author: Tejas Patil <tejasp@fb.com> Closes apache#18843 from tejasapatil/SPARK-21595. (cherry picked from commit 9443999) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
What changes were proposed in this pull request?
SPARK-21595 reported that there is excessive spilling to disk due to default spill threshold for
ExternalAppendOnlyUnsafeRowArray
being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre #16909) would hold data in an array for first 4096 records post which it would switch toUnsafeExternalSorter
and start spilling to disk after reachingspark.shuffle.spill.numElementsForceSpillThreshold
(or earlier if there was paucity of memory due to excessive consumers).Currently the (switch from in-memory to
UnsafeExternalSorter
) and (UnsafeExternalSorter
spilling to disk) forExternalAppendOnlyUnsafeRowArray
is controlled by a single threshold. This PR aims to separate that to have more granular control.How was this patch tested?
Added unit tests