Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31952][SQL]Fix incorrect memory spill metric when doing Aggregate #28780

Closed
wants to merge 1 commit into from

Conversation

WangGuangxin
Copy link
Contributor

What changes were proposed in this pull request?

It happends when hash aggregate downgrades to sort based aggregate.
UnsafeExternalSorter.createWithExistingInMemorySorter calls spill on an InMemorySorter immediately, but the memory pointed by InMemorySorter is acquired by outside BytesToBytesMap, instead the allocatedPages in UnsafeExternalSorter. So the memory spill bytes metric is always 0, but disk bytes spill metric is right.

Related code is at https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L232.

It can be reproduced by following step.

bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1"
scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json")

Before this patch, the metric is
image

After this patch, the metric is
image

Does this PR introduce any user-facing change?

No

How was this patch tested?

Test manually

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you set this value in the caller side (UnsafeKVExternalSorter)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong preference on this. One benefits is that others who calls createWithExistingInMemorySorter will not forget to update memory spilled any more. (though only UnsafeKVExternalSorter used this function currently)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same to use inMemorySorter.getMemoryUsage?

Also, shall we update sorter.totalSpillBytes?

Copy link
Member

@Ngone51 Ngone51 Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, if we can do final long spillSize = freeMemory() + inMemorySorter.getMemoryUsage() in sorter.spill()?

Seem like the size of inMemorySorter is included in log info by getMemoryUsage(), while spillSize doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same to use inMemorySorter.getMemoryUsage?

Also, shall we update sorter.totalSpillBytes?

  1. It's not the same with inMemorySorter.getMemoryUsage.

When we insert a record into UnsafeExternalSorter, the record itself is copied to sorter's allocatedPages, and stores the memory address into InMemorySorter. So when we spill it, the memory size is the sum of inMemorySorter.getMemoryUsage
and allocatePages, that's what it does in UnsafeExternalSorter.spill.

But when we do sorter.spill in UnsafeExternalSorter.createWithExistingInMemorySorter, the sorter's allocatedPages is empty since we didn't copy any records to it. The memory address in inMemorySorter points to the memory pages in BytesToBytesMap.

The passed parameter denotes the size of memory pages in BytesToBytesMap.

  1. yes, seems we also need to update sorter.totalSpillBytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for your explanation.

@cloud-fan
Copy link
Contributor

I'm not familiar with this part. what does "spill (memory)" mean?

@WangGuangxin
Copy link
Contributor Author

I'm not familiar with this part. what does "spill (memory)" mean?

It's memoryBytesSpilled in TaskMetrics, denotes the number of in-memory bytes spilled.

@WangGuangxin
Copy link
Contributor Author

also cc @Ngone51 @JoshRosen since you are some related changes

@cloud-fan
Copy link
Contributor

number of in-memory bytes spilled

what's the difference between it and spill (disk)? IIUC spill means we dump data from memory to disk.

@WangGuangxin
Copy link
Contributor Author

number of in-memory bytes spilled

what's the difference between it and spill (disk)? IIUC spill means we dump data from memory to disk.

They are trying to describe the same piece of data when dumping from memory from disk. spill (memory) is the size these data occupied in memory, while spill (disk) means the size these data occupied in disk (after serialization and compression).

UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for reference, where do we update the disk spill metrics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see it at the end of sorter.spill().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of sorter.spill, it increase memory bytes spilled and disk bytes spilled.
But the size of memory bytes spilled is not correct because the real data pages are not UnsafeExternalSorter.allocatedPages, but BytesToBytesMap.dataPages, as explained here #28780 (comment), but the disk bytes spilled is correct because it's the real size write to disk.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 23, 2020

shall we set sorter.totalSpillBytes? then we can update the metrics correctly in sorter.spill.

@github-actions
Copy link

github-actions bot commented Oct 2, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 2, 2020
@github-actions github-actions bot closed this Oct 3, 2020
cloud-fan pushed a commit that referenced this pull request Jan 11, 2021
…gate

### What changes were proposed in this pull request?

This PR takes over #28780.

1. Counted the spilled memory size when creating the `UnsafeExternalSorter` with the existing `InMemorySorter`

2. Accumulate the `totalSpillBytes` when merging two `UnsafeExternalSorter`

### Why are the changes needed?

As mentioned in #28780:

> It happends when hash aggregate downgrades to sort based aggregate.
`UnsafeExternalSorter.createWithExistingInMemorySorter` calls spill on an `InMemorySorter` immediately, but the memory pointed by `InMemorySorter` is acquired by outside `BytesToBytesMap`, instead the allocatedPages in `UnsafeExternalSorter`. So the memory spill bytes metric is always 0, but disk bytes spill metric is right.

Besides, this PR also fixes the `UnsafeExternalSorter.merge` by accumulating the `totalSpillBytes` of two sorters. Thus, we can report the correct spilled size in `HashAggregateExec.finishAggregate`.

Issues can be reproduced by the following step by checking the SQL metrics in UI:

```
bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1"
scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json")
```

Before:

<img width="200" alt="WeChatfe5146180d91015e03b9a27852e9a443" src="https://user-images.githubusercontent.com/16397174/103625414-e6fc6280-4f75-11eb-8b93-c55095bdb5b8.png">

After:

<img width="200" alt="WeChat42ab0e73c5fbc3b14c12ab85d232071d" src="https://user-images.githubusercontent.com/16397174/103625420-e8c62600-4f75-11eb-8e1f-6f5e8ab561b9.png">

### Does this PR introduce _any_ user-facing change?

Yes, users can see the correct spill metrics after this PR.

### How was this patch tested?

Tested manually and added UTs.

Closes #31035 from Ngone51/SPARK-31952.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4afca0f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jan 11, 2021
…gate

### What changes were proposed in this pull request?

This PR takes over #28780.

1. Counted the spilled memory size when creating the `UnsafeExternalSorter` with the existing `InMemorySorter`

2. Accumulate the `totalSpillBytes` when merging two `UnsafeExternalSorter`

### Why are the changes needed?

As mentioned in #28780:

> It happends when hash aggregate downgrades to sort based aggregate.
`UnsafeExternalSorter.createWithExistingInMemorySorter` calls spill on an `InMemorySorter` immediately, but the memory pointed by `InMemorySorter` is acquired by outside `BytesToBytesMap`, instead the allocatedPages in `UnsafeExternalSorter`. So the memory spill bytes metric is always 0, but disk bytes spill metric is right.

Besides, this PR also fixes the `UnsafeExternalSorter.merge` by accumulating the `totalSpillBytes` of two sorters. Thus, we can report the correct spilled size in `HashAggregateExec.finishAggregate`.

Issues can be reproduced by the following step by checking the SQL metrics in UI:

```
bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1"
scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json")
```

Before:

<img width="200" alt="WeChatfe5146180d91015e03b9a27852e9a443" src="https://user-images.githubusercontent.com/16397174/103625414-e6fc6280-4f75-11eb-8b93-c55095bdb5b8.png">

After:

<img width="200" alt="WeChat42ab0e73c5fbc3b14c12ab85d232071d" src="https://user-images.githubusercontent.com/16397174/103625420-e8c62600-4f75-11eb-8e1f-6f5e8ab561b9.png">

### Does this PR introduce _any_ user-facing change?

Yes, users can see the correct spill metrics after this PR.

### How was this patch tested?

Tested manually and added UTs.

Closes #31035 from Ngone51/SPARK-31952.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Ngone51 added a commit to Ngone51/spark that referenced this pull request Jan 12, 2021
…gate

### What changes were proposed in this pull request?

This PR takes over apache#28780.

1. Counted the spilled memory size when creating the `UnsafeExternalSorter` with the existing `InMemorySorter`

2. Accumulate the `totalSpillBytes` when merging two `UnsafeExternalSorter`

### Why are the changes needed?

As mentioned in apache#28780:

> It happends when hash aggregate downgrades to sort based aggregate.
`UnsafeExternalSorter.createWithExistingInMemorySorter` calls spill on an `InMemorySorter` immediately, but the memory pointed by `InMemorySorter` is acquired by outside `BytesToBytesMap`, instead the allocatedPages in `UnsafeExternalSorter`. So the memory spill bytes metric is always 0, but disk bytes spill metric is right.

Besides, this PR also fixes the `UnsafeExternalSorter.merge` by accumulating the `totalSpillBytes` of two sorters. Thus, we can report the correct spilled size in `HashAggregateExec.finishAggregate`.

Issues can be reproduced by the following step by checking the SQL metrics in UI:

```
bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1"
scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json")
```

Before:

<img width="200" alt="WeChatfe5146180d91015e03b9a27852e9a443" src="https://user-images.githubusercontent.com/16397174/103625414-e6fc6280-4f75-11eb-8b93-c55095bdb5b8.png">

After:

<img width="200" alt="WeChat42ab0e73c5fbc3b14c12ab85d232071d" src="https://user-images.githubusercontent.com/16397174/103625420-e8c62600-4f75-11eb-8e1f-6f5e8ab561b9.png">

### Does this PR introduce _any_ user-facing change?

Yes, users can see the correct spill metrics after this PR.

### How was this patch tested?

Tested manually and added UTs.

Closes apache#31035 from Ngone51/SPARK-31952.

Lead-authored-by: yi.wu <yi.wu@databricks.com>
Co-authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 12, 2021
…Aggregate

### What changes were proposed in this pull request?

This PR takes over #28780.

1. Counted the spilled memory size when creating the `UnsafeExternalSorter` with the existing `InMemorySorter`

2. Accumulate the `totalSpillBytes` when merging two `UnsafeExternalSorter`

### Why are the changes needed?

As mentioned in #28780:

> It happends when hash aggregate downgrades to sort based aggregate.
`UnsafeExternalSorter.createWithExistingInMemorySorter` calls spill on an `InMemorySorter` immediately, but the memory pointed by `InMemorySorter` is acquired by outside `BytesToBytesMap`, instead the allocatedPages in `UnsafeExternalSorter`. So the memory spill bytes metric is always 0, but disk bytes spill metric is right.

Besides, this PR also fixes the `UnsafeExternalSorter.merge` by accumulating the `totalSpillBytes` of two sorters. Thus, we can report the correct spilled size in `HashAggregateExec.finishAggregate`.

Issues can be reproduced by the following step by checking the SQL metrics in UI:

```
bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1"
scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json")
```

Before:

<img width="200" alt="WeChatfe5146180d91015e03b9a27852e9a443" src="https://user-images.githubusercontent.com/16397174/103625414-e6fc6280-4f75-11eb-8b93-c55095bdb5b8.png">

After:

<img width="200" alt="WeChat42ab0e73c5fbc3b14c12ab85d232071d" src="https://user-images.githubusercontent.com/16397174/103625420-e8c62600-4f75-11eb-8e1f-6f5e8ab561b9.png">

### Does this PR introduce _any_ user-facing change?

Yes, users can see the correct spill metrics after this PR.

### How was this patch tested?

Tested manually and added UTs.

Closes #31140 from Ngone51/cp-spark-31952.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants