Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32872][CORE][2.4] Prevent BytesToBytesMap at MAX_CAPACITY from exceeding growth threshold #29753

Closed
wants to merge 2 commits into from

Conversation

ankurdave
Copy link
Contributor

@ankurdave ankurdave commented Sep 14, 2020

What changes were proposed in this pull request?

When BytesToBytesMap is at MAX_CAPACITY and reaches its growth threshold, numKeys >= growthThreshold is true but longArray.size() / 2 < MAX_CAPACITY is false. This correctly prevents the map from growing, but canGrowArray incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting canGrowArray to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups per task is greater than MAX_CAPACITY / 2 = 2^28 (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in SELECT COUNT(DISTINCT id) FROM tbl fails when tbl contains 1 billion distinct values and when spark.sql.shuffle.partitions=1.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

public abstract class AbstractBytesToBytesMapSuite {
  // ...
  @Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
              .set("spark.memory.offHeap.enabled", "true")
              .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L)));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}

… exceeding growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
            .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
            .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L)
            .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
            .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}
```

Closes apache#29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@ankurdave
Copy link
Contributor Author

This is a backport of #29744 to branch-2.4.

cc @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128670 has finished for PR 29753 at commit bf0d5c2.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Thank you so much!

@@ -763,7 +763,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
if (longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
} catch (SparkOutOfMemoryError oom) {
} catch (OutOfMemoryError oom) {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. This was the root cause of reverting. I wanted to pass the UT once more due to this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. In addition to the CI tests, I just ran the manual test in the PR description before and after this fix and confirmed that the bug was present in 2.4 and that this PR fixes it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for confirming, @ankurdave !

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending Jenkins)

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128672 has finished for PR 29753 at commit 56b7bca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

dongjoon-hyun pushed a commit that referenced this pull request Sep 15, 2020
… exceeding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
              .set("spark.memory.offHeap.enabled", "true")
              .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L)));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}
```

Closes #29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

Merged to branch-2.4. Thanks, @ankurdave !

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
… exceeding growth threshold

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.

No.

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
              .set("spark.memory.offHeap.enabled", "true")
              .set("spark.memory.offHeap.size", Long.toString(25600 * 1024 * 1024L)));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}
```

Closes apache#29753 from ankurdave/SPARK-32872-2.4.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

RB=3011537
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants