-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold #29744
Conversation
…eshold near MAX_CAPACITY
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Outdated
Show resolved
Hide resolved
} else if (numKeys >= growthThreshold && longArray.size() / 2 >= MAX_CAPACITY) { | ||
// The map cannot grow because doing so would cause it to exceed MAX_CAPACITY. Instead, we | ||
// prevent the map from accepting any more new elements. | ||
canGrowArray = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we won't exceed MAX_CAPACITY
at all. We have some restrictions to prevent it.
The root cause, I think, is once the map reaches MAX_CAPACITY
, we won't grow the map even the numKeys
is more than growthThreshold
. So we could add elements to the map to more than load factor (0.5) of it. The map cannot be used for sorting later in UnsafeKVExternalSorter
. Although UnsafeKVExternalSorter
will allocate new array in the case, it could hit OOM and fails the query.
This change favors reusing the array for sorting, but it prevents the map accepting new elements more early, half of the array is not used after the map capacity reaches MAX_CAPACITY
. But by doing this, we will do spilling early too.
When I was fixing the customer application before, we don't see the OOM even the payload reached MAX_CAPACITY
. Is it possibly due to the memory setting on your payload/cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Once we reach MAX_CAPACITY
, we have a choice: we can either (a) exceed the load factor, or (b) we can spill.
The code currently does (a). If the number of groups ultimately turns out to be less than MAX_CAPACITY
, then this may avoid spilling. (Performance will degrade as the load factor approaches 1.0, but it will still be better than spilling.) However, if the number of groups is larger than MAX_CAPACITY
, or if we need more memory to store the records, then we will be unable to spill and the query will fail.
It seems to me that it would be better to prevent queries from failing, as (b) does. This does mean we would spill earlier for a narrow range of queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the memory setting: I agree that whether or not an OOM will occur is dependent on the memory setting, but I would argue that (1) the user should not be required to tune memory settings to avoid failing queries, and (2) many reasonable settings will cause an OOM.
The problem arises when there are between 2^28+1 and 2^29 keys. If we need to spill, UnsafeKVExternalSorter
will attempt to allocate between 8 and 16 GiB. Therefore the user will either need to increase the memory per task by approximately this amount so that this allocation will succeed, or decrease the memory per task sufficiently that the map will never reach MAX_CAPACITY
in the first place. This seems to create quite a wide range of problematic settings.
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Outdated
Show resolved
Hide resolved
00e4431
to
ec51e65
Compare
Test build #128606 has finished for PR 29744 at commit
|
Test build #128605 has finished for PR 29744 at commit
|
Test build #128607 has finished for PR 29744 at commit
|
Test build #128608 has finished for PR 29744 at commit
|
Test build #128609 has finished for PR 29744 at commit
|
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Outdated
Show resolved
Hide resolved
Thank you for pinging me, @ankurdave . And, thank you for providing the test example. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (except @cloud-fan 's comment).
Test build #128626 has finished for PR 29744 at commit
|
Sratch that. |
Test build #128661 has finished for PR 29744 at commit
|
Thanks all for the quick reviews! A couple questions for maintainers of core:
|
@ankurdave ... Great fix !. I am curious if the test you mentioned in the PR description should actually be checked in ? The only downside seems to be that it would take 1.5 minutes to pass. Perhaps there is a mode for "slow tests". As for rerunning the GH tests: Unfortunately the only way currently is to reopen the PR: Close it in Github and reopen it -- that will force all the GH tests to be rerun. |
|
I think we can rerun GitHub Actions without closing the PR. Just triggered the rerun. |
@agrawaldevesh . Not every CI has a large enough memory. It can lead us more flaky situation. You can add the test in your downstream CI environment. |
Merged to master/3.0/2.4. Thank you, @ankurdave and all. |
…eding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail. This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 72550c3) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…eding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail. This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes #29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 72550c3) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Oops. @ankurdave . Sorry, but could you make a backporting PR to branch-2.4 once more? |
… exceeding growth threshold When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail. This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. No. Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes apache#29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Opened #29753 to backport to branch-2.4. |
…eding growth threshold ### What changes were proposed in this pull request? When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail. This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them. ### Why are the changes needed? Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage. For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes. ```java public abstract class AbstractBytesToBytesMapSuite { // ... Test public void respectGrowthThresholdAtMaxCapacity() { TestMemoryManager memoryManager2 = new TestMemoryManager( new SparkConf() .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true) .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L) .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false) .set(package$.MODULE$.SHUFFLE_COMPRESS(), false)); TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0); final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes); try { // Insert keys into the map until it stops accepting new keys. for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) { if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i); final long[] value = new long[]{i}; BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8); Assert.assertFalse(loc.isDefined()); boolean success = loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8); if (!success) break; } // The map should grow to its max capacity. long capacity = map.getArray().size() / 2; Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY); // The map should stop accepting new keys once it has reached its growth // threshold, which is half the max capacity. Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2); map.free(); } finally { map.free(); } } } ``` Closes apache#29744 from ankurdave/SPARK-32872. Authored-by: Ankur Dave <ankurdave@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 72550c3) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
When BytesToBytesMap is at
MAX_CAPACITY
and reaches its growth threshold,numKeys >= growthThreshold
is true butlongArray.size() / 2 < MAX_CAPACITY
is false. This correctly prevents the map from growing, butcanGrowArray
incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.This PR fixes the issue by setting
canGrowArray
to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.Why are the changes needed?
Without this change, hash aggregations will fail when the number of groups per task is greater than
MAX_CAPACITY / 2 = 2^28
(approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.For example, the final aggregation in
SELECT COUNT(DISTINCT id) FROM tbl
fails whentbl
contains 1 billion distinct values and whenspark.sql.shuffle.partitions=1
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.