Skip to content

Commit

Permalink
Update sortedIterator after insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
GuoChenzhao committed Dec 13, 2017
1 parent 57550fb commit e40c2f1
Showing 1 changed file with 52 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,65 +142,64 @@ public Iterator<UnsafeRow> sort() throws IOException {
return sort(null);
}

public Iterator<UnsafeRow> sort(Iterator<UnsafeRow> inputIterator)
throws IOException {
try {
final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator();
return new AbstractIterator<UnsafeRow>() {
// inputIterator == null means the rows has been inserted
boolean inserted = inputIterator == null;
private final int numFields = schema.length();
private UnsafeRow row = new UnsafeRow(numFields);

@Override
public boolean hasNext() {
try {
if (!inserted) {
while (inputIterator.hasNext()) {
insertRow(inputIterator.next());
}
inserted = true;
if (!sortedIterator.hasNext()) {
// Since we won't ever call next() on an empty iterator, we need to clean up resources
// here in order to prevent memory leaks.
cleanupResources();
}
public Iterator<UnsafeRow> sort(Iterator<UnsafeRow> inputIterator) throws IOException {
final UnsafeSorterIterator[] sortedIterator = new UnsafeSorterIterator[1];
return new AbstractIterator<UnsafeRow>() {
// inputIterator == null means the rows has been inserted
boolean inserted = inputIterator == null;
private final int numFields = schema.length();
private UnsafeRow row = new UnsafeRow(numFields);

@Override
public boolean hasNext() {
try {
if (!inserted) {
while (inputIterator.hasNext()) {
insertRow(inputIterator.next());
}
} catch (IOException e) {
return false;
}
return sortedIterator.hasNext();
}

@Override
public UnsafeRow next() {
try {
sortedIterator.loadNext();
row.pointTo(
sortedIterator.getBaseObject(),
sortedIterator.getBaseOffset(),
sortedIterator.getRecordLength());
if (!hasNext()) {
UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page
row = null; // so that we don't keep references to the base object
sortedIterator[0] = sorter.getSortedIterator();
inserted = true;
if (!sortedIterator[0].hasNext()) {
// Since we won't ever call next() on an empty iterator, we need to clean up resources
// here in order to prevent memory leaks.
cleanupResources();
return copy;
} else {
return row;
}
} catch (IOException e) {
}
} catch (IOException e) {
cleanupResources();
// Scala iterators don't declare any checked exceptions, so we need to use this hack
// to re-throw the exception:
Platform.throwException(e);
}
return sortedIterator[0].hasNext();
}

@Override
public UnsafeRow next() {
UnsafeSorterIterator sortedIter = sortedIterator[0];
try {
sortedIter.loadNext();
row.pointTo(
sortedIter.getBaseObject(),
sortedIter.getBaseOffset(),
sortedIter.getRecordLength());
if (!hasNext()) {
UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page
row = null; // so that we don't keep references to the base object
cleanupResources();
// Scala iterators don't declare any checked exceptions, so we need to use this hack
// to re-throw the exception:
Platform.throwException(e);
return copy;
} else {
return row;
}
throw new RuntimeException("Exception should have been re-thrown in next()");
} catch (IOException e) {
cleanupResources();
// Scala iterators don't declare any checked exceptions, so we need to use this hack
// to re-throw the exception:
Platform.throwException(e);
}
};
} catch (IOException e) {
cleanupResources();
throw e;
}
throw new RuntimeException("Exception should have been re-thrown in next()");
}
};
}

private static final class RowComparator extends RecordComparator {
Expand Down

0 comments on commit e40c2f1

Please sign in to comment.