Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29657][CORE] Iterator spill supporting radix sort with null prefix #26323

Closed
wants to merge 2 commits into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Oct 30, 2019

What changes were proposed in this pull request?

Support ChainedIterator with SortedIterator spill

Why are the changes needed?

In the case of radix sort, when the insertRecord part of the keyPrefix is null, the iterator type returned by getSortedIterator is ChainedIterator.
Currently ChainedIterator does not support spill, causing UnsafeExternalSorter to take up a lot of execution memory, allocatePage fails, throw SparkOutOfMemoryError Unable to acquire xxx bytes of memory, got 0

Does this PR introduce any user-facing change?

No

How was this patch tested?

add ut

@cxzl25
Copy link
Contributor Author

cxzl25 commented Oct 30, 2019

UnsafeInMemorySorter#getSortedIterator
Use radix sort, some keyPrefix has null, return ChainedIterator

if (nullBoundaryPos > 0) {
assert radixSortSupport != null : "Nulls are only stored separately with radix sort";
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
// The null order is either LAST or FIRST, regardless of sorting direction (ASC|DESC)
if (radixSortSupport.nullsFirst()) {
queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
} else {
queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
}
return new UnsafeExternalSorter.ChainedIterator(queue);

readingIterator.spill

public long spill() throws IOException {
synchronized (this) {
if (!(upstream instanceof UnsafeInMemorySorter.SortedIterator && nextUpstream == null
&& numRecords > 0)) {
return 0L;
}

The following is a log of an error we encountered in the production environment.

[Executor task launch worker for task 66055] INFO TaskMemoryManager: Memory used in task 66055
[Executor task launch worker for task 66055] INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@39dd866e: 64.0 KB
[Executor task launch worker for task 66055] INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@74d17927: 4.6 GB
[Executor task launch worker for task 66055] INFO TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@31478f9c: 61.0 MB
[Executor task launch worker for task 66055] INFO TaskMemoryManager: 0 bytes of memory were used by task 66055 but are not associated with specific consumers
[Executor task launch worker for task 66055] INFO TaskMemoryManager: 4962998749 bytes of memory are used for execution and 2218326 bytes of memory are used for storage
[Executor task launch worker for task 66055] ERROR Executor: Exception in task 42.3 in stage 29.0 (TID 66055)
SparkOutOfMemoryError: Unable to acquire 3436 bytes of memory, got 0

@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 2, 2019

@JoshRosen @davies
Can you look at this pr when you are free? Thank you.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 7, 2019

@cloud-fan Can you look at this question? Thank you.
Can really improve the success rate in some extreme scenarios.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Nov 18, 2019

@wangyum @HyukjinKwon @srowen
Can you look at this pr? Thank you.
This pr was raised for a while but was not confirmed.

@srowen
Copy link
Member

srowen commented Nov 18, 2019

I don't know enough to evaluate the change. It looks like it touches some important code.

@cloud-fan
Copy link
Contributor

OK to test

@@ -518,19 +520,52 @@ public int getNumRecords() {

public long spill() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low-level code is hard to review as nobody remembers all the details all the time. Can you briefly explain the spill process to help people review?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before SPARK-14851, UnsafeInMemorySorter.getSortedIterator(), the returned type is UnsafeInMemorySorter.SortedIterator.

Now, when the radio sort is turned on and there are some empty values, the return type is UnsafeExternalSorter.ChainedIterator(UnsafeInMemorySorter.SortedIterator).
In UnsafeExternalSorter.SpillableIterator#spill, this situation cannot be spilled.

When UnsafeExternalSorter#getSortedIterator is called, part of the last written data is saved in memory.

When the task needs more execution memory, the spill fails.

TaskMemoryManager#acquireExecutionMemory->UnsafeExternalSorter#spill->UnsafeExternalSorter.SpillableIterator#spill

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants