Skip to content

Commit

Permalink
DRILL-2728: Merge spill files when number gets too large
Browse files Browse the repository at this point in the history
  • Loading branch information
StevenMPhillips committed Apr 15, 2015
1 parent 959419d commit 859e6a8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Expand Up @@ -143,6 +143,7 @@ public VectorContainer getContainer() {
} }


public void cleanup() throws IOException { public void cleanup() throws IOException {
currentContainer.zeroVectors();
if (sv2 != null) { if (sv2 != null) {
sv2.clear(); sv2.clear();
} }
Expand Down
Expand Up @@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private long highWaterMark = Long.MAX_VALUE; private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount; private int targetRecordCount;
private final String fileName; private final String fileName;
private int firstSpillBatchCount = 0;


public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true); super(popConfig, context, true);
Expand Down Expand Up @@ -276,9 +277,6 @@ public IterOutcome innerNext() {
} }
int count = sv2.getCount(); int count = sv2.getCount();
totalCount += count; totalCount += count;
// if (count == 0) {
// break outer;
// }
sorter.setup(context, sv2, incoming); sorter.setup(context, sv2, incoming);
Stopwatch w = new Stopwatch(); Stopwatch w = new Stopwatch();
w.start(); w.start();
Expand All @@ -302,7 +300,15 @@ public IterOutcome innerNext() {
// since the last spill exceed the defined limit // since the last spill exceed the defined limit
(batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) { (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {


mergeAndSpill(); if (firstSpillBatchCount == 0) {
firstSpillBatchCount = batchGroups.size();
}

if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
logger.info("Merging spills");
spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
}
spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchesSinceLastSpill = 0; batchesSinceLastSpill = 0;
} }
long t = w.elapsed(TimeUnit.MICROSECONDS); long t = w.elapsed(TimeUnit.MICROSECONDS);
Expand All @@ -311,7 +317,7 @@ public IterOutcome innerNext() {
case OUT_OF_MEMORY: case OUT_OF_MEMORY:
highWaterMark = totalSizeInMemory; highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) { if (batchesSinceLastSpill > 2) {
mergeAndSpill(); spilledBatchGroups.add(mergeAndSpill(batchGroups));
} }
batchesSinceLastSpill = 0; batchesSinceLastSpill = 0;
break; break;
Expand Down Expand Up @@ -347,7 +353,7 @@ public IterOutcome innerNext() {
// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount()); // logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
container.buildSchema(SelectionVectorMode.FOUR_BYTE); container.buildSchema(SelectionVectorMode.FOUR_BYTE);
} else { } else {
mergeAndSpill(); spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchGroups.addAll(spilledBatchGroups); batchGroups.addAll(spilledBatchGroups);
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory()); logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups); VectorContainer hyperBatch = constructHyperBatch(batchGroups);
Expand Down Expand Up @@ -388,7 +394,7 @@ private boolean hasMemoryForInMemorySort(int currentRecordCount) {
return currentlyAvailable > neededForInMemorySort; return currentlyAvailable > neededForInMemorySort;
} }


public void mergeAndSpill() throws SchemaChangeException { public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory()); logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer(); VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList(); List<BatchGroup> batchGroupList = Lists.newArrayList();
Expand All @@ -397,19 +403,16 @@ public void mergeAndSpill() throws SchemaChangeException {
if (batchGroups.size() == 0) { if (batchGroups.size() == 0) {
break; break;
} }
if (batchGroups.peekLast().getSv2() == null) {
break;
}
BatchGroup batch = batchGroups.pollLast(); BatchGroup batch = batchGroups.pollLast();
batchGroupList.add(batch); batchGroupList.add(batch);
long bufferSize = getBufferSize(batch); long bufferSize = getBufferSize(batch);
totalSizeInMemory -= bufferSize; totalSizeInMemory -= bufferSize;
} }
if (batchGroupList.size() == 0) { if (batchGroupList.size() == 0) {
return; return null;
} }
int estimatedRecordSize = 0; int estimatedRecordSize = 0;
for (VectorWrapper w : batchGroups.get(0)) { for (VectorWrapper w : batchGroupList.get(0)) {
try { try {
estimatedRecordSize += TypeHelper.getSize(w.getField().getType()); estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
} catch (UnsupportedOperationException e) { } catch (UnsupportedOperationException e) {
Expand All @@ -430,14 +433,14 @@ public void mergeAndSpill() throws SchemaChangeException {
String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator()); BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());


logger.info("Merging and spilling to {}", outputFile);
try { try {
while ((count = copier.next(targetRecordCount)) > 0) { while ((count = copier.next(targetRecordCount)) > 0) {
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
outputContainer.setRecordCount(count); outputContainer.setRecordCount(count);
newGroup.addBatch(outputContainer); newGroup.addBatch(outputContainer);
} }
newGroup.closeOutputStream(); newGroup.closeOutputStream();
spilledBatchGroups.add(newGroup);
for (BatchGroup group : batchGroupList) { for (BatchGroup group : batchGroupList) {
group.cleanup(); group.cleanup();
} }
Expand All @@ -447,6 +450,8 @@ public void mergeAndSpill() throws SchemaChangeException {
} }
takeOwnership(c1); takeOwnership(c1);
totalSizeInMemory += getBufferSize(c1); totalSizeInMemory += getBufferSize(c1);
logger.info("Completed spilling to {}", outputFile);
return newGroup;
} }


private void takeOwnership(VectorAccessible batch) { private void takeOwnership(VectorAccessible batch) {
Expand Down Expand Up @@ -477,7 +482,7 @@ private SelectionVector2 newSV2() throws OutOfMemoryException {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator()); SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
if (!sv2.allocateNew(incoming.getRecordCount())) { if (!sv2.allocateNew(incoming.getRecordCount())) {
try { try {
mergeAndSpill(); spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
} catch (SchemaChangeException e) { } catch (SchemaChangeException e) {
throw new RuntimeException(); throw new RuntimeException();
} }
Expand Down

0 comments on commit 859e6a8

Please sign in to comment.