Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array #41419

Closed
wants to merge 5 commits into from

Conversation

mcdull-zhang
Copy link
Contributor

@mcdull-zhang mcdull-zhang commented Jun 1, 2023

What changes were proposed in this pull request?

When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

Why are the changes needed?

Avoid the occurrence of the following OOM exceptions:

Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Production environment manual verification && Pass existing unit tests

@github-actions github-actions bot added the SQL label Jun 1, 2023
@wangyum
Copy link
Member

wangyum commented Jun 2, 2023

@mcdull-zhang Please fix the PR title and PR description according to the new changes?

@mcdull-zhang mcdull-zhang changed the title [SPARK-43911] [SQL] Directly use Set to consume iterator data to deduplicate, thereby reducing memory usage [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array Jun 2, 2023
@mcdull-zhang
Copy link
Contributor Author

@mcdull-zhang Please fix the PR title and PR description according to the new changes?

modified

@wangyum wangyum closed this in 595ad30 Jun 4, 2023
wangyum pushed a commit that referenced this pull request Jun 4, 2023
…ent the creation of large Array

### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment manual verification && Pass existing unit tests

Closes #41419 from mcdull-zhang/reduce_memory_usage.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 595ad30)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Jun 4, 2023

Merged to master and branch-3.4.

@HyukjinKwon
Copy link
Member

I don't know why but seems like this PR causes the build broken with OOM. Let me revert this and see how it goes if you don't mind.

@HyukjinKwon
Copy link
Member

#41452

@wangyum
Copy link
Member

wangyum commented Jun 6, 2023

+1 to revert it. It is because the size of Set can be much bigger than Array:

val plan = sql("select id from range(100000)").queryExecution.executedPlan
println(com.carrotsearch.sizeof.RamUsageEstimator.sizeOf(plan.executeToIterator().toSet))
println(com.carrotsearch.sizeof.RamUsageEstimator.sizeOf(plan.executeToIterator().toArray))

Output:

11460896
7600016

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…ent the creation of large Array

### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment manual verification && Pass existing unit tests

Closes apache#41419 from mcdull-zhang/reduce_memory_usage.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…ent the creation of large Array

### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment manual verification && Pass existing unit tests

Closes apache#41419 from mcdull-zhang/reduce_memory_usage.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 595ad30)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
GladwinLee pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
…ent the creation of large Array

### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment manual verification && Pass existing unit tests

Closes apache#41419 from mcdull-zhang/reduce_memory_usage.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 595ad30)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
…ent the creation of large Array

### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates.

In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM.

The approach here is to directly call the toSet of the iterator to deduplicate, which can prevent the creation of a large array.

### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space
	at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
	at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
	at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
	at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
	at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment manual verification && Pass existing unit tests

Closes apache#41419 from mcdull-zhang/reduce_memory_usage.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 595ad30)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants