Skip to content

Commit

Permalink
[SPARK-13210][SQL] catch OOM when allocate memory and expand array
Browse files Browse the repository at this point in the history
There is a bug when we try to grow the buffer, OOM is ignore wrongly (the assert also skipped by JVM), then we try grow the array again, this one will trigger spilling free the current page, the current record we inserted will be invalid.

The root cause is that JVM has less free memory than MemoryManager thought, it will OOM when allocate a page without trigger spilling. We should catch the OOM, and acquire memory again to trigger spilling.

And also, we could not grow the array in `insertRecord` of `InMemorySorter` (it was there just for easy testing).

Author: Davies Liu <davies@databricks.com>

Closes #11095 from davies/fix_expand.
  • Loading branch information
Davies Liu authored and JoshRosen committed Feb 8, 2016
1 parent 8e4d15f commit 37bc203
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class TaskMemoryManager {
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;

/**
* The amount of memory that is acquired but not used.
*/
private long acquiredButNotUsed = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -256,7 +261,20 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
acquiredButNotUsed += acquired;
synchronized (this) {
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -378,6 +396,9 @@ public long cleanUpAllAllocatedMemory() {
}
Arrays.fill(pageTable, null);

// release the memory that is not used by any consumer.
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);

return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public long getMemoryUsage() {
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void expandPointerArray(LongArray newArray) {
*/
public void insertRecord(long recordPointer, long keyPrefix) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, recordPointer);
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void testBasicSorting() throws Exception {
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes("utf-8");
Platform.putInt(baseObject, position, strBytes.length);
Expand Down Expand Up @@ -114,6 +117,9 @@ public void testSortingManyNumbers() throws Exception {
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
}
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public int compare(long prefix1, long prefix2) {
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);
Expand Down

0 comments on commit 37bc203

Please sign in to comment.