From 57d99bdfdc5c7150d64d107b549dfb808f1c92a4 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 2 Mar 2017 16:09:01 -0800 Subject: [PATCH 1/2] DRILL-5226: Managed external sort fixes * Memory leak in managed sort if OOM during sv2 allocation * "Record batch sizer" does not include overhead for variable-sized vectors * Paranoid double-checking of merge batch sizes to prevent OOM when the sizes differ from expectations * Revised logging --- .../physical/impl/spill/RecordBatchSizer.java | 32 +++++---- .../impl/xsort/managed/BatchGroup.java | 9 ++- .../impl/xsort/managed/CopierHolder.java | 10 ++- .../impl/xsort/managed/ExternalSortBatch.java | 71 +++++++++++++++---- .../templates/VariableLengthVectors.java | 2 +- 5 files changed, 92 insertions(+), 32 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java index 22b1b0eed03..b384e0acdbe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java @@ -107,21 +107,23 @@ public ColumnSize(VectorWrapper vw) { @Override public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append(metadata.getName()); - buf.append("(std col. size: "); - buf.append(stdSize); - buf.append(", actual col. size: "); - buf.append(estSize); - buf.append(", total size: "); - buf.append(totalSize); - buf.append(", data size: "); - buf.append(dataSize); - buf.append(", row capacity: "); - buf.append(capacity); - buf.append(", density: "); - buf.append(density); - buf.append(")"); + StringBuilder buf = new StringBuilder() + .append(metadata.getName()) + .append("(type: ") + .append(metadata.getType().getMinorType().name()) + .append(", std col. size: ") + .append(stdSize) + .append(", actual col. size: ") + .append(estSize) + .append(", total size: ") + .append(totalSize) + .append(", data size: ") + .append(dataSize) + .append(", row capacity: ") + .append(capacity) + .append(", density: ") + .append(density) + .append(")"); return buf.toString(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java index 7ea599c39c0..2e5d5b2c75d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java @@ -150,7 +150,8 @@ public static class SpilledRun extends BatchGroup { private String path; private SpillSet spillSet; private BufferAllocator allocator; - private int spilledBatches = 0; + private int spilledBatches; + private long batchSize; public SpilledRun(SpillSet spillSet, String path, OperatorContext context) throws IOException { super(null, context); @@ -178,6 +179,12 @@ public void addBatch(VectorContainer newContainer) throws IOException { currentContainer.setRecordCount(0); } + public void setBatchSize(long batchSize) { + this.batchSize = batchSize; + } + + public long getBatchSize() { return batchSize; } + @Override public int getNextIndex() { if (pointer == getRecordCount()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java index 4fa520d9076..8071759622c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java @@ -176,6 +176,7 @@ public static class BatchMerger implements SortResults, AutoCloseable { private int targetRecordCount; private int copyCount; private int batchCount; + private long batchSize; /** * Creates a merger with an temporary output container. @@ -229,15 +230,18 @@ public VectorContainer getOutput() { @Override public boolean next() { Stopwatch w = Stopwatch.createStarted(); + long start = holder.allocator.getAllocatedMemory(); int count = holder.copier.next(targetRecordCount); copyCount += count; if (count > 0) { long t = w.elapsed(TimeUnit.MICROSECONDS); + batchCount++; logger.trace("Took {} us to merge {} records", t, count); + long size = holder.allocator.getAllocatedMemory() - start; + batchSize = Math.max(batchSize, size); } else { logger.trace("copier returned 0 records"); } - batchCount++; // Identify the schema to be used in the output container. (Since // all merged batches have the same schema, the schema we identify @@ -303,5 +307,9 @@ public int getRecordCount() { public int getBatchCount() { return batchCount; } + + public long getBatchSize() { + return batchSize; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index a1162a02eca..39d7e60ff98 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; +import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -181,13 +182,6 @@ public class ExternalSortBatch extends AbstractRecordBatch { */ private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024; - /** - * The preferred amount of memory to set aside to output batches - * expressed as a ratio of available memory. - */ - - private static final float MERGE_BATCH_ALLOWANCE = 0.10F; - /** * In the bizarre case where the user gave us an unrealistically low * spill file size, set a floor at some bare minimum size. (Note that, @@ -777,6 +771,7 @@ private void setupSchema(IterOutcome upstream) { * @return the converted batch, or null if the incoming batch is empty */ + @SuppressWarnings("resource") private VectorContainer convertBatch() { // Must accept the batch even if no records. Then clear @@ -788,6 +783,10 @@ private VectorContainer convertBatch() { for (VectorWrapper w : convertedBatch) { w.clear(); } + SelectionVector2 sv2 = incoming.getSelectionVector2(); + if (sv2 != null) { + sv2.clear(); + } return null; } return convertedBatch; @@ -852,7 +851,13 @@ private void processBatch() { return; } - SelectionVector2 sv2 = makeSelectionVector(); + SelectionVector2 sv2; + try { + sv2 = makeSelectionVector(); + } catch (Exception e) { + convertedBatch.clear(); + throw e; + } // Compute batch size, including allocation of an sv2. @@ -994,7 +999,8 @@ private void updateMemoryEstimates(long memoryDelta, RecordBatchSizer sizer) { // Maintain an estimate of the incoming batch size: the largest // batch yet seen. Used to reserve memory for the next incoming - // batch. + // batch. Because we are using the actual observed batch size, + // the size already includes overhead due to power-of-two rounding. long origInputBatchSize = estimatedInputBatchSize; estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize); @@ -1013,7 +1019,7 @@ private void updateMemoryEstimates(long memoryDelta, RecordBatchSizer sizer) { return; } // Estimate the total size of each incoming batch plus sv2. Note that, due - // to power-of-two rounding, the allocated size might be twice the data size. + // to power-of-two rounding, the allocated sv2 size might be twice the data size. long estimatedInputSize = estimatedInputBatchSize + 4 * actualRecordCount; @@ -1290,7 +1296,7 @@ private boolean consolidateBatches() { logger.trace("Merging {} on-disk runs, Alloc. memory = {}", mergeCount, allocator.getAllocatedMemory()); - mergeAndSpill(spilledRuns, mergeCount); + mergeRuns(mergeCount); return true; } @@ -1333,8 +1339,43 @@ private void spillFromMemory() { mergeAndSpill(bufferedBatches, spillCount); } + private void mergeRuns(int targetCount) { + + // Determine the number of runs to merge. The count should be the + // target count. However, to prevent possible memory overrun, we + // double-check with actual spill batch size and only spill as much + // as fits in the merge memory pool. + + int mergeCount = 0; + long mergeSize = 0; + for (SpilledRun batch : spilledRuns) { + long batchSize = batch.getBatchSize(); + if (mergeSize + batchSize > mergeMemoryPool) { + break; + } + mergeSize += batchSize; + mergeCount++; + if (mergeCount == targetCount) { + break; + } + } + + // Must always spill at least 2, even if this creates an over-size + // spill file. But, if this is a final consolidation, we may have only + // a single batch. + + mergeCount = Math.max(mergeCount, 2); + mergeCount = Math.min(mergeCount, spilledRuns.size()); + + // Do the actual spill. + + mergeAndSpill(spilledRuns, mergeCount); + } + private void mergeAndSpill(LinkedList source, int count) { spilledRuns.add(doMergeAndSpill(source, count)); + logger.trace("Completed spill: memory = {}", + allocator.getAllocatedMemory()); } private BatchGroup.SpilledRun doMergeAndSpill(LinkedList batchGroups, int spillCount) { @@ -1354,7 +1395,7 @@ private BatchGroup.SpilledRun doMergeAndSpill(LinkedList b BatchGroup.SpilledRun newGroup = null; try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill); CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) { - logger.trace("Spilling {} of {} batches, {} rows, memory = {}, write to {}", + logger.trace("Spilling {} of {} batches, spill batch size = {} rows, memory = {}, write to {}", batchesToSpill.size(), bufferedBatches.size() + batchesToSpill.size(), spillBatchRowCount, allocator.getAllocatedMemory(), outputFile); @@ -1375,8 +1416,10 @@ private BatchGroup.SpilledRun doMergeAndSpill(LinkedList b } injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class); newGroup.closeOutputStream(); - logger.trace("mergeAndSpill: completed, memory = {}, spilled {} records to {}", - allocator.getAllocatedMemory(), merger.getRecordCount(), outputFile); + logger.trace("Spilled {} batches, {} records; memory = {} to {}", + merger.getBatchCount(), merger.getRecordCount(), + allocator.getAllocatedMemory(), outputFile); + newGroup.setBatchSize(merger.getBatchSize()); return newGroup; } catch (Throwable e) { // we only need to clean up newGroup if spill failed diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index ea3c9de7c90..cc299d706fa 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -253,7 +253,7 @@ public int getPayloadByteCount() { // If 1 or more values, then the last value is set to // the offset of the next value, which is the same as // the length of existing values. - return a.get(count-1); + return a.get(count-1) + offsetVector.getPayloadByteCount(); } } From 8749eb4d2829911925219c71ae673a9a594780d7 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 3 Mar 2017 18:21:04 -0800 Subject: [PATCH 2/2] Addresses review comments --- .../physical/impl/xsort/managed/CopierHolder.java | 15 +++++++++++---- .../impl/xsort/managed/ExternalSortBatch.java | 6 +++--- .../codegen/templates/VariableLengthVectors.java | 5 +++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java index 8071759622c..c6b2dd9ffbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java @@ -176,7 +176,7 @@ public static class BatchMerger implements SortResults, AutoCloseable { private int targetRecordCount; private int copyCount; private int batchCount; - private long batchSize; + private long estBatchSize; /** * Creates a merger with an temporary output container. @@ -238,7 +238,7 @@ public boolean next() { batchCount++; logger.trace("Took {} us to merge {} records", t, count); long size = holder.allocator.getAllocatedMemory() - start; - batchSize = Math.max(batchSize, size); + estBatchSize = Math.max(estBatchSize, size); } else { logger.trace("copier returned 0 records"); } @@ -308,8 +308,15 @@ public int getBatchCount() { return batchCount; } - public long getBatchSize() { - return batchSize; + /** + * Gets the estimated batch size, in bytes. Use for estimating the memory + * needed to process the batches that this operator created. + * @return the size of the largest batch created by this operation, + * in bytes + */ + + public long getEstBatchSize() { + return estBatchSize; } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 39d7e60ff98..69e9b4ce9a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -1348,8 +1348,8 @@ private void mergeRuns(int targetCount) { int mergeCount = 0; long mergeSize = 0; - for (SpilledRun batch : spilledRuns) { - long batchSize = batch.getBatchSize(); + for (SpilledRun run : spilledRuns) { + long batchSize = run.getBatchSize(); if (mergeSize + batchSize > mergeMemoryPool) { break; } @@ -1419,7 +1419,7 @@ private BatchGroup.SpilledRun doMergeAndSpill(LinkedList b logger.trace("Spilled {} batches, {} records; memory = {} to {}", merger.getBatchCount(), merger.getRecordCount(), allocator.getAllocatedMemory(), outputFile); - newGroup.setBatchSize(merger.getBatchSize()); + newGroup.setBatchSize(merger.getEstBatchSize()); return newGroup; } catch (Throwable e) { // we only need to clean up newGroup if spill failed diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index cc299d706fa..bb1d4fbf917 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -253,6 +253,11 @@ public int getPayloadByteCount() { // If 1 or more values, then the last value is set to // the offset of the next value, which is the same as // the length of existing values. + // In addition to the actual data bytes, we must also + // include the "overhead" bytes: the offset vector entries + // that accompany each column value. Thus, total payload + // size is consumed text bytes + consumed offset vector + // bytes. return a.get(count-1) + offsetVector.getPayloadByteCount(); } }