diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 4099fb01f2f95..0efae16e9838c 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -89,13 +89,7 @@ public LongArray allocateArray(long size) { long required = size * 8L; MemoryBlock page = taskMemoryManager.allocatePage(required, this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { - got = page.size(); - taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += required; return new LongArray(page); @@ -116,13 +110,7 @@ public void freeArray(LongArray array) { protected MemoryBlock allocatePage(long required) { MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { - got = page.size(); - taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += page.size(); return page; @@ -152,4 +140,14 @@ public void freeMemory(long size) { taskMemoryManager.releaseExecutionMemory(size, this); used -= size; } + + private void throwOom(final MemoryBlock page, final long required) { + long got = 0; + if (page != null) { + got = page.size(); + taskMemoryManager.freePage(page, this); + } + taskMemoryManager.showMemoryUsage(); + throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + } } diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 610ace30f8a62..4fadfe36cd716 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -283,13 +283,7 @@ private void advanceToNextPage() { } else { currentPage = null; if (reader != null) { - // remove the spill file from disk - File file = spillWriters.removeFirst().getFile(); - if (file != null && file.exists()) { - if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - } - } + handleFailedDelete(); } try { Closeables.close(reader, /* swallowIOException = */ false); @@ -307,13 +301,7 @@ private void advanceToNextPage() { public boolean hasNext() { if (numRecords == 0) { if (reader != null) { - // remove the spill file from disk - File file = spillWriters.removeFirst().getFile(); - if (file != null && file.exists()) { - if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); - } - } + handleFailedDelete(); } } return numRecords > 0; @@ -403,6 +391,14 @@ public long spill(long numBytes) throws IOException { public void remove() { throw new UnsupportedOperationException(); } + + private void handleFailedDelete() { + // remove the spill file from disk + File file = spillWriters.removeFirst().getFile(); + if (file != null && file.exists() && !file.delete()) { + logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + } + } } /** diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index de4464080ef55..39eda00dd7efb 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -219,15 +219,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, inMemSorter.numRecords()); spillWriters.add(spillWriter); - final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator(); - while (sortedRecords.hasNext()) { - sortedRecords.loadNext(); - final Object baseObject = sortedRecords.getBaseObject(); - final long baseOffset = sortedRecords.getBaseOffset(); - final int recordLength = sortedRecords.getRecordLength(); - spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); - } - spillWriter.close(); + spillIterator(inMemSorter.getSortedIterator(), spillWriter); } final long spillSize = freeMemory(); @@ -488,6 +480,18 @@ public UnsafeSorterIterator getSortedIterator() throws IOException { } } + private static void spillIterator(UnsafeSorterIterator inMemIterator, + UnsafeSorterSpillWriter spillWriter) throws IOException { + while (inMemIterator.hasNext()) { + inMemIterator.loadNext(); + final Object baseObject = inMemIterator.getBaseObject(); + final long baseOffset = inMemIterator.getBaseOffset(); + final int recordLength = inMemIterator.getRecordLength(); + spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix()); + } + spillWriter.close(); + } + /** * An UnsafeSorterIterator that support spilling. */ @@ -503,6 +507,7 @@ class SpillableIterator extends UnsafeSorterIterator { this.numRecords = inMemIterator.getNumRecords(); } + @Override public int getNumRecords() { return numRecords; } @@ -521,14 +526,7 @@ public long spill() throws IOException { // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); - while (inMemIterator.hasNext()) { - inMemIterator.loadNext(); - final Object baseObject = inMemIterator.getBaseObject(); - final long baseOffset = inMemIterator.getBaseOffset(); - final int recordLength = inMemIterator.getRecordLength(); - spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix()); - } - spillWriter.close(); + spillIterator(inMemIterator, spillWriter); spillWriters.add(spillWriter); nextUpstream = spillWriter.getReader(serializerManager); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index 4776617043878..5d9515c0725da 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -109,22 +109,6 @@ public void setOffsetAndSize(int ordinal, long currentCursor, long size) { Platform.putLong(holder.buffer, fieldOffset, offsetAndSize); } - // Do word alignment for this row and grow the row buffer if needed. - // todo: remove this after we make unsafe array data word align. - public void alignToWords(int numBytes) { - final int remainder = numBytes & 0x07; - - if (remainder > 0) { - final int paddingBytes = 8 - remainder; - holder.grow(paddingBytes); - - for (int i = 0; i < paddingBytes; i++) { - Platform.putByte(holder.buffer, holder.cursor, (byte) 0); - holder.cursor++; - } - } - } - public void write(int ordinal, boolean value) { final long offset = getFieldOffset(ordinal); Platform.putLong(holder.buffer, offset, 0L);