Skip to content

Commit

Permalink
Clean up final row copying code.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 8, 2015
1 parent d31f180 commit c56ec18
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.expressions;

import javax.annotation.Nullable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ObjectPool;
import org.apache.spark.unsafe.PlatformDependent;
Expand Down Expand Up @@ -57,17 +55,15 @@
*/
public final class UnsafeRow extends MutableRow {

/** Hack for if we want to pass around an UnsafeRow which also carries around its backing data */
@Nullable public byte[] backingArray;
private Object baseObject;
private long baseOffset;

/** A pool to hold non-primitive objects */
private ObjectPool pool;

Object getBaseObject() { return baseObject; }
long getBaseOffset() { return baseOffset; }
ObjectPool getPool() { return pool; }
public Object getBaseObject() { return baseObject; }
public long getBaseOffset() { return baseOffset; }
public ObjectPool getPool() { return pool; }

/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
private int numFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.math.Ordering;

Expand Down Expand Up @@ -150,28 +151,34 @@ public boolean hasNext() {
return sortedIterator.hasNext();
}

/**
* Called prior to returning this iterator's last row. This copies the row's data into an
* on-heap byte array so that the pointer to the row data will not be dangling after the
* sorter's memory pages are freed.
*/
private void detachRowFromPage(UnsafeRow row, int rowLength) {
final byte[] rowDataCopy = new byte[rowLength];
PlatformDependent.copyMemory(
row.getBaseObject(),
row.getBaseOffset(),
rowDataCopy,
PlatformDependent.BYTE_ARRAY_OFFSET,
rowLength
);
row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, row.getPool());
}

@Override
public InternalRow next() {
try {
sortedIterator.loadNext();
if (hasNext()) {
row.pointTo(
sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool);
return row;
} else {
final byte[] rowDataCopy = new byte[sortedIterator.getRecordLength()];
PlatformDependent.copyMemory(
sortedIterator.getBaseObject(),
sortedIterator.getBaseOffset(),
rowDataCopy,
PlatformDependent.BYTE_ARRAY_OFFSET,
sortedIterator.getRecordLength()
);
row.backingArray = rowDataCopy;
row.pointTo(rowDataCopy, PlatformDependent.BYTE_ARRAY_OFFSET, numFields, objPool);
row.pointTo(
sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), numFields, objPool);
if (!hasNext()) {
detachRowFromPage(row, sortedIterator.getRecordLength());
cleanupResources();
return row;
}
return row;
} catch (IOException e) {
cleanupResources();
// Scala iterators don't declare any checked exceptions, so we need to use this hack
Expand Down

0 comments on commit c56ec18

Please sign in to comment.