Skip to content

Commit

Permalink
catch OOM when allocate memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Feb 5, 2016
1 parent 138c300 commit ff77170
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class TaskMemoryManager {
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;

/**
* The amount of memory that is acquired but not used.
*/
private long acquiredButNotUsed = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -256,7 +261,19 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
acquiredButNotUsed += acquired;
synchronized (this) {
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -378,6 +395,9 @@ public long cleanUpAllAllocatedMemory() {
}
Arrays.fill(pageTable, null);

// release the memory that is not used by any consumer.
memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);

return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public long getMemoryUsage() {
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,7 @@ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (OutOfMemoryError e) {
// should have trigger spilling
assert(inMemSorter.hasSpaceForAnotherRecord());
return;
}
LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void expandPointerArray(LongArray newArray) {
*/
public void insertRecord(long recordPointer, long keyPrefix) {
if (!hasSpaceForAnotherRecord()) {
expandPointerArray(consumer.allocateArray(array.size() * 2));
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, recordPointer);
pos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public void testBasicSorting() throws Exception {
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes("utf-8");
Platform.putInt(baseObject, position, strBytes.length);
Expand Down Expand Up @@ -114,6 +117,9 @@ public void testSortingManyNumbers() throws Exception {
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public int compare(long prefix1, long prefix2) {
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
}
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);
Expand Down

0 comments on commit ff77170

Please sign in to comment.