Skip to content

Commit

Permalink
Add test and fix bug for sorting empty arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 10, 2015
1 parent d1e28bc commit 2f48777
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,12 @@ public void insertRecord(

public UnsafeSorterIterator getSortedIterator() throws IOException {
final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
return inMemoryIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator);
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
spillMerger.addSpill(spillWriter.getReader(blockManager));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ final class UnsafeSorterSpillMerger {

public UnsafeSorterSpillMerger(
final RecordComparator recordComparator,
final PrefixComparator prefixComparator) {
final PrefixComparator prefixComparator,
final int numSpills) {
final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() {

@Override
Expand All @@ -43,8 +44,7 @@ public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) {
}
}
};
// TODO: the size is often known; incorporate size hints here.
priorityQueue = new PriorityQueue<UnsafeSorterIterator>(10, comparator);
priorityQueue = new PriorityQueue<UnsafeSorterIterator>(numSpills, comparator);
}

public void addSpill(UnsafeSorterIterator spillReader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public void write(
dataRemaining -= toTransfer;
freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE;
}
if (freeSpaceInWriteBuffer < DISK_WRITE_BUFFER_SIZE) {
writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer));
}
writer.recordWritten();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,34 @@ public void testSortingOnlyByPrefix() throws Exception {
// assert(tempDir.isEmpty)
}

@Test
public void testSortingEmptyArrays() throws Exception {

final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
memoryManager,
shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
prefixComparator,
1024,
new SparkConf());

sorter.insertRecord(null, 0, 0, 0);
sorter.insertRecord(null, 0, 0, 0);
sorter.spill();
sorter.insertRecord(null, 0, 0, 0);
sorter.spill();
sorter.insertRecord(null, 0, 0, 0);
sorter.insertRecord(null, 0, 0, 0);

UnsafeSorterIterator iter = sorter.getSortedIterator();

for (int i = 1; i <= 5; i++) {
iter.loadNext();
assertEquals(0, iter.getKeyPrefix());
assertEquals(0, iter.getRecordLength());
}
}

}

0 comments on commit 2f48777

Please sign in to comment.