diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index 9359ea188d8..6896faa329d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -143,6 +143,7 @@ public VectorContainer getContainer() { } public void cleanup() throws IOException { + currentContainer.zeroVectors(); if (sv2 != null) { sv2.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index dbfd1a511f9..bd3c4e76900 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch { private long highWaterMark = Long.MAX_VALUE; private int targetRecordCount; private final String fileName; + private int firstSpillBatchCount = 0; public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, true); @@ -276,9 +277,6 @@ public IterOutcome innerNext() { } int count = sv2.getCount(); totalCount += count; -// if (count == 0) { -// break outer; -// } sorter.setup(context, sv2, incoming); Stopwatch w = new Stopwatch(); w.start(); @@ -302,7 +300,15 @@ public IterOutcome innerNext() { // since the last spill exceed the defined limit (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) { - mergeAndSpill(); + if (firstSpillBatchCount == 0) { + firstSpillBatchCount = batchGroups.size(); + } + + if (spilledBatchGroups.size() > firstSpillBatchCount / 2) { + logger.info("Merging spills"); + spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups)); + } + spilledBatchGroups.add(mergeAndSpill(batchGroups)); batchesSinceLastSpill = 0; } long t = w.elapsed(TimeUnit.MICROSECONDS); @@ -311,7 +317,7 @@ public IterOutcome innerNext() { case OUT_OF_MEMORY: highWaterMark = totalSizeInMemory; if (batchesSinceLastSpill > 2) { - mergeAndSpill(); + spilledBatchGroups.add(mergeAndSpill(batchGroups)); } batchesSinceLastSpill = 0; break; @@ -347,7 +353,7 @@ public IterOutcome innerNext() { // logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount()); container.buildSchema(SelectionVectorMode.FOUR_BYTE); } else { - mergeAndSpill(); + spilledBatchGroups.add(mergeAndSpill(batchGroups)); batchGroups.addAll(spilledBatchGroups); logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory()); VectorContainer hyperBatch = constructHyperBatch(batchGroups); @@ -388,7 +394,7 @@ private boolean hasMemoryForInMemorySort(int currentRecordCount) { return currentlyAvailable > neededForInMemorySort; } - public void mergeAndSpill() throws SchemaChangeException { + public BatchGroup mergeAndSpill(LinkedList batchGroups) throws SchemaChangeException { logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory()); VectorContainer outputContainer = new VectorContainer(); List batchGroupList = Lists.newArrayList(); @@ -397,19 +403,16 @@ public void mergeAndSpill() throws SchemaChangeException { if (batchGroups.size() == 0) { break; } - if (batchGroups.peekLast().getSv2() == null) { - break; - } BatchGroup batch = batchGroups.pollLast(); batchGroupList.add(batch); long bufferSize = getBufferSize(batch); totalSizeInMemory -= bufferSize; } if (batchGroupList.size() == 0) { - return; + return null; } int estimatedRecordSize = 0; - for (VectorWrapper w : batchGroups.get(0)) { + for (VectorWrapper w : batchGroupList.get(0)) { try { estimatedRecordSize += TypeHelper.getSize(w.getField().getType()); } catch (UnsupportedOperationException e) { @@ -430,6 +433,7 @@ public void mergeAndSpill() throws SchemaChangeException { String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator()); + logger.info("Merging and spilling to {}", outputFile); try { while ((count = copier.next(targetRecordCount)) > 0) { outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); @@ -437,7 +441,6 @@ public void mergeAndSpill() throws SchemaChangeException { newGroup.addBatch(outputContainer); } newGroup.closeOutputStream(); - spilledBatchGroups.add(newGroup); for (BatchGroup group : batchGroupList) { group.cleanup(); } @@ -447,6 +450,8 @@ public void mergeAndSpill() throws SchemaChangeException { } takeOwnership(c1); totalSizeInMemory += getBufferSize(c1); + logger.info("Completed spilling to {}", outputFile); + return newGroup; } private void takeOwnership(VectorAccessible batch) { @@ -477,7 +482,7 @@ private SelectionVector2 newSV2() throws OutOfMemoryException { SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator()); if (!sv2.allocateNew(incoming.getRecordCount())) { try { - mergeAndSpill(); + spilledBatchGroups.addFirst(mergeAndSpill(batchGroups)); } catch (SchemaChangeException e) { throw new RuntimeException(); }