Skip to content

Commit

Permalink
[SPARK-21907][CORE][BACKPORT 2.2] oom during spill
Browse files Browse the repository at this point in the history
back-port #19181 to branch-2.2.

## What changes were proposed in this pull request?
1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill,
when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array.

## How was this patch tested?
introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.

Author: Eyal Farago <eyal@nrgene.com>

Closes #19481 from eyalfa/SPARK-21907__oom_during_spill__BACKPORT-2.2.
  • Loading branch information
Eyal Farago authored and hvanhovell committed Oct 12, 2017
1 parent c5889b5 commit cd51e2c
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 5 deletions.
Expand Up @@ -473,6 +473,10 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
}
}

@VisibleForTesting boolean hasSpaceForAnotherRecord() {
return inMemSorter.hasSpaceForAnotherRecord();
}

/**
* An UnsafeSorterIterator that support spilling.
*/
Expand Down
Expand Up @@ -162,14 +162,24 @@ private int getUsableCapacity() {
*/
public void free() {
if (consumer != null) {
consumer.freeArray(array);
if (array != null) {
consumer.freeArray(array);
}
array = null;
}
}

public void reset() {
if (consumer != null) {
consumer.freeArray(array);
// the call to consumer.allocateArray may trigger a spill
// which in turn access this instance and eventually re-enter this method and try to free the array again.
// by setting the array to null and its length to 0 we effectively make the spill code-path a no-op.
// setting the array to null also indicates that it has already been de-allocated which prevents a double de-allocation in free().
array = null;
usableCapacity = 0;
pos = 0;
nullBoundaryPos = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.UUID;

import org.hamcrest.Matchers;
import scala.Tuple2$;

import org.junit.After;
Expand Down Expand Up @@ -454,5 +455,36 @@ public void testPeakMemoryUsed() throws Exception {
}
}

@Test
public void testOOMDuringSpill() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
// we assume that given default configuration,
// the size of the data we insert to the sorter (ints)
// and assuming we shouldn't spill before pointers array is exhausted
// (memory manager is not configured to throw at this point)
// - so this loop runs a reasonable number of iterations (<2000).
// test indeed completed within <30ms (on a quad i7 laptop).
for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
insertNumber(sorter, i);
}
// we expect the next insert to attempt growing the pointerssArray
// first allocation is expected to fail, then a spill is triggered which attempts another allocation
// which also fails and we expect to see this OOM here.
// the original code messed with a released array within the spill code
// and ended up with a failed assertion.
// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
memoryManager.markconsequentOOM(2);
try {
insertNumber(sorter, 1024);
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
}
// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure)
catch (OutOfMemoryError oom){
String oomStackTrace = Utils.exceptionString(oom);
assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset",
oomStackTrace,
Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
}
}
}

Expand Up @@ -35,6 +35,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isIn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

public class UnsafeInMemorySorterSuite {
Expand Down Expand Up @@ -139,4 +140,49 @@ public int compare(
}
assertEquals(dataToSort.length, iterLength);
}

@Test
public void freeAfterOOM() {
final SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.memory.offHeap.enabled", "false");

final TestMemoryManager testMemoryManager =
new TestMemoryManager(sparkConf);
final TaskMemoryManager memoryManager = new TaskMemoryManager(
testMemoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
final Object baseObject = dataPage.getBaseObject();
// Write the records into the data page:
long position = dataPage.getBaseOffset();

final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = PrefixComparators.LONG;
final RecordComparator recordComparator = new RecordComparator() {
@Override
public int compare(
Object leftBaseObject,
long leftBaseOffset,
Object rightBaseObject,
long rightBaseOffset) {
return 0;
}
};
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
recordComparator, prefixComparator, 100, shouldUseRadixSort());

testMemoryManager.markExecutionAsOutOfMemoryOnce();
try {
sorter.reset();
fail("expected OutOfMmoryError but it seems operation surprisingly succeeded");
} catch (OutOfMemoryError oom) {
// as expected
}
// [SPARK-21907] this failed on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
sorter.free();
// simulate a 'back to back' free.
sorter.free();
}

}
Expand Up @@ -27,8 +27,8 @@ class TestMemoryManager(conf: SparkConf)
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = {
if (oomOnce) {
oomOnce = false
if (consequentOOM > 0) {
consequentOOM -= 1
0
} else if (available >= numBytes) {
available -= numBytes
Expand Down Expand Up @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)

override def maxOffHeapStorageMemory: Long = 0L

private var oomOnce = false
private var consequentOOM = 0
private var available = Long.MaxValue

def markExecutionAsOutOfMemoryOnce(): Unit = {
oomOnce = true
markconsequentOOM(1)
}

def markconsequentOOM(n : Int) : Unit = {
consequentOOM += n
}

def limit(avail: Long): Unit = {
Expand Down

0 comments on commit cd51e2c

Please sign in to comment.