Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31952][SQL]Fix incorrect memory spill metric when doing Aggregate #28780

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeInMemorySorter inMemorySorter,
long existingMemoryConsumption) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you set this value in the caller side (UnsafeKVExternalSorter)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong preference on this. One benefits is that others who calls createWithExistingInMemorySorter will not forget to update memory spilled any more. (though only UnsafeKVExternalSorter used this function currently)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same to use inMemorySorter.getMemoryUsage?

Also, shall we update sorter.totalSpillBytes?

Copy link
Member

@Ngone51 Ngone51 Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, if we can do final long spillSize = freeMemory() + inMemorySorter.getMemoryUsage() in sorter.spill()?

Seem like the size of inMemorySorter is included in log info by getMemoryUsage(), while spillSize doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same to use inMemorySorter.getMemoryUsage?

Also, shall we update sorter.totalSpillBytes?

  1. It's not the same with inMemorySorter.getMemoryUsage.

When we insert a record into UnsafeExternalSorter, the record itself is copied to sorter's allocatedPages, and stores the memory address into InMemorySorter. So when we spill it, the memory size is the sum of inMemorySorter.getMemoryUsage
and allocatePages, that's what it does in UnsafeExternalSorter.spill.

But when we do sorter.spill in UnsafeExternalSorter.createWithExistingInMemorySorter, the sorter's allocatedPages is empty since we didn't copy any records to it. The memory address in inMemorySorter points to the memory pages in BytesToBytesMap.

The passed parameter denotes the size of memory pages in BytesToBytesMap.

  1. yes, seems we also need to update sorter.totalSpillBytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for your explanation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for reference, where do we update the disk spill metrics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see it at the end of sorter.spill().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of sorter.spill, it increase memory bytes spilled and disk bytes spilled.
But the size of memory bytes spilled is not correct because the real data pages are not UnsafeExternalSorter.allocatedPages, but BytesToBytesMap.dataPages, as explained here #28780 (comment), but the disk bytes spilled is correct because it's the real size write to disk.

// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
return sorter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public UnsafeKVExternalSorter(
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
numElementsForSpillThreshold,
inMemSorter);
inMemSorter,
map.getTotalMemoryConsumption());

// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
// anymore, so the underline array could be used by map again.
Expand Down