From 730067a1862b649c68870f02ec4f7421ccd60b6a Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Sun, 17 Dec 2017 09:25:55 -0800 Subject: [PATCH] DRILL-6030: Managed sort should minimize number of batches in a k-way merge --- .../exec/physical/impl/xsort/managed/SortConfig.java | 1 + .../impl/xsort/managed/SortMemoryManager.java | 8 ++++---- exec/java-exec/src/main/resources/drill-module.conf | 4 ++-- .../xsort/managed/TestExternalSortInternals.java | 12 +++++------- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java index 8ae39982986..236c2f37f14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java @@ -42,6 +42,7 @@ public class SortConfig { public static final int MIN_SPILL_BATCH_SIZE = 256 * 1024; public static final int MIN_MERGE_BATCH_SIZE = 256 * 1024; + public static final int DEFAULT_MERGE_LIMIT = 128; public static final int MIN_MERGE_LIMIT = 2; private final long maxMemory; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java index 6c7ce20f9f6..68b546b95f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java @@ -687,11 +687,11 @@ public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spille spillBatchSize.maxBufferSize); memMergeLimit = Math.max(0, memMergeLimit); - // If batches are in memory, and we need more memory to merge - // them all than is actually available, then spill some in-memory - // batches. + // If batches are in memory, and final merge count will exceed + // merge limit or we need more memory to merge them all than is + // actually available, then spill some in-memory batches. - if (inMemCount > 0 && memMergeLimit < spilledRunsCount) { + if (inMemCount > 0 && ((inMemCount + spilledRunsCount) > config.mergeLimit() || memMergeLimit < spilledRunsCount)) { return new MergeTask(MergeAction.SPILL, 0); } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c923e4f860d..29163ad2b7f 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -258,10 +258,10 @@ drill.exec: { // value provided by Foreman. Primarily for testing. // 0 = unlimited, Supports HOCON memory suffixes. mem_limit: 0, - // Limit on the number of spilled batches that can be merged in + // Limit on the number of batches that can be merged in // a single pass. Limits the number of open file handles. // 0 = unlimited - merge_limit: 0, + merge_limit: 128, spill: { // Deprecated for managed xsort; used only by legacy xsort group.size: 40000, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 9c31cde51df..e913c392734 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -47,7 +47,7 @@ public void testConfigDefaults() { // Zero means no artificial limit assertEquals(0, sortConfig.maxMemory()); // Zero mapped to large number - assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit()); + assertEquals(SortConfig.DEFAULT_MERGE_LIMIT, sortConfig.mergeLimit()); // Default size: 256 MiB assertEquals(256 * ONE_MEG, sortConfig.spillFileSize()); // Default size: 1 MiB @@ -622,14 +622,12 @@ public void testMergeLimit() { int spillRunCount = mergeLimitConstraint; long allocMemory = batchSize * memBatchCount; MergeTask task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount); - assertEquals(MergeAction.NONE, task.action); + assertEquals(MergeAction.SPILL, task.action); - // One more run than can merge in one go. But, we have plenty of - // memory to merge and hold the in-memory batches. So, just merge. + // too many to merge, spill - task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount + 1); - assertEquals(MergeAction.MERGE, task.action); - assertEquals(2, task.count); + task = memManager.consolidateBatches(allocMemory, 1, spillRunCount); + assertEquals(MergeAction.SPILL, task.action); // One more runs than can merge in one go, intermediate merge