Skip to content

Commit

Permalink
[MINOR][CORE] Cleanup dead code and duplication in Mem. Management
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
  * The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to

## How was this patch tested?

* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.

Author: Armin <me@obrown.io>

Closes apache#19254 from original-brownbear/cleanup-mem-consumer.
  • Loading branch information
original-brownbear authored and srowen committed Sep 19, 2017
1 parent a11db94 commit 7c92351
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 61 deletions.
26 changes: 12 additions & 14 deletions core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,7 @@ public LongArray allocateArray(long size) {
long required = size * 8L;
MemoryBlock page = taskMemoryManager.allocatePage(required, this);
if (page == null || page.size() < required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throwOom(page, required);
}
used += required;
return new LongArray(page);
Expand All @@ -116,13 +110,7 @@ public void freeArray(LongArray array) {
protected MemoryBlock allocatePage(long required) {
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
if (page == null || page.size() < required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throwOom(page, required);
}
used += page.size();
return page;
Expand Down Expand Up @@ -152,4 +140,14 @@ public void freeMemory(long size) {
taskMemoryManager.releaseExecutionMemory(size, this);
used -= size;
}

private void throwOom(final MemoryBlock page, final long required) {
long got = 0;
if (page != null) {
got = page.size();
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
}
}
24 changes: 10 additions & 14 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,7 @@ private void advanceToNextPage() {
} else {
currentPage = null;
if (reader != null) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
handleFailedDelete();
}
try {
Closeables.close(reader, /* swallowIOException = */ false);
Expand All @@ -307,13 +301,7 @@ private void advanceToNextPage() {
public boolean hasNext() {
if (numRecords == 0) {
if (reader != null) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
handleFailedDelete();
}
}
return numRecords > 0;
Expand Down Expand Up @@ -403,6 +391,14 @@ public long spill(long numBytes) throws IOException {
public void remove() {
throw new UnsupportedOperationException();
}

private void handleFailedDelete() {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
inMemSorter.numRecords());
spillWriters.add(spillWriter);
final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final Object baseObject = sortedRecords.getBaseObject();
final long baseOffset = sortedRecords.getBaseOffset();
final int recordLength = sortedRecords.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
spillIterator(inMemSorter.getSortedIterator(), spillWriter);
}

final long spillSize = freeMemory();
Expand Down Expand Up @@ -488,6 +480,18 @@ public UnsafeSorterIterator getSortedIterator() throws IOException {
}
}

private static void spillIterator(UnsafeSorterIterator inMemIterator,
UnsafeSorterSpillWriter spillWriter) throws IOException {
while (inMemIterator.hasNext()) {
inMemIterator.loadNext();
final Object baseObject = inMemIterator.getBaseObject();
final long baseOffset = inMemIterator.getBaseOffset();
final int recordLength = inMemIterator.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
}
spillWriter.close();
}

/**
* An UnsafeSorterIterator that support spilling.
*/
Expand All @@ -503,6 +507,7 @@ class SpillableIterator extends UnsafeSorterIterator {
this.numRecords = inMemIterator.getNumRecords();
}

@Override
public int getNumRecords() {
return numRecords;
}
Expand All @@ -521,14 +526,7 @@ public long spill() throws IOException {
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
while (inMemIterator.hasNext()) {
inMemIterator.loadNext();
final Object baseObject = inMemIterator.getBaseObject();
final long baseOffset = inMemIterator.getBaseOffset();
final int recordLength = inMemIterator.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
}
spillWriter.close();
spillIterator(inMemIterator, spillWriter);
spillWriters.add(spillWriter);
nextUpstream = spillWriter.getReader(serializerManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,6 @@ public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
}

// Do word alignment for this row and grow the row buffer if needed.
// todo: remove this after we make unsafe array data word align.
public void alignToWords(int numBytes) {
final int remainder = numBytes & 0x07;

if (remainder > 0) {
final int paddingBytes = 8 - remainder;
holder.grow(paddingBytes);

for (int i = 0; i < paddingBytes; i++) {
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
holder.cursor++;
}
}
}

public void write(int ordinal, boolean value) {
final long offset = getFieldOffset(ordinal);
Platform.putLong(holder.buffer, offset, 0L);
Expand Down

0 comments on commit 7c92351

Please sign in to comment.