From fd2e55d4e6c4926f042f5c8f7e04b4ceb443553a Mon Sep 17 00:00:00 2001 From: Padma Penumarthy Date: Thu, 22 Feb 2018 16:41:47 -0800 Subject: [PATCH] DRILL-6180: Use System Option "output_batch_size" for External Sort --- .../impl/xsort/managed/ExternalSortBatch.java | 2 +- .../impl/xsort/managed/SortConfig.java | 12 ++- .../src/main/resources/drill-module.conf | 9 +- .../managed/TestExternalSortInternals.java | 88 ++++++++++--------- .../impl/xsort/managed/TestSortImpl.java | 3 +- 5 files changed, 59 insertions(+), 55 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 23e66a03072..ea53a80a1dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -213,7 +213,7 @@ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, Record super(popConfig, context, true); this.incoming = incoming; - SortConfig sortConfig = new SortConfig(context.getConfig()); + SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions()); SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig); oContext.setInjector(injector); PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java index 236c2f37f14..e592ccb6ab6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java @@ -19,6 +19,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.options.OptionManager; public class SortConfig { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class); @@ -71,8 +72,7 @@ public class SortConfig { private final int mSortBatchSize; - public SortConfig(DrillConfig config) { - + public SortConfig(DrillConfig config, OptionManager options) { // Optional configured memory limit, typically used only for testing. maxMemory = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY); @@ -99,7 +99,13 @@ public SortConfig(DrillConfig config) { // of memory, but no smaller than the minimum size. In any event, an // output batch can contain no fewer than a single record. - mergeBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE), MIN_MERGE_BATCH_SIZE); + // get the output batch size from context. + // Size of the batch sent downstream from the sort operator during + // the merge phase. Default value is 16M. + // Don't change defaults unless you know what you are doing, + // larger sizes can result in memory fragmentation, smaller sizes + // in excessive operator iterator overhead. + mergeBatchSize = (int) Math.max(options.getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR), MIN_MERGE_BATCH_SIZE); // Limit on in-memory batches, primarily for testing. diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 2305a30e4bd..a79cd1bbb45 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -287,12 +287,7 @@ drill.exec: { // Set large enough to get long, continuous writes, but not so // large as to overwhelm a temp directory. // Supports HOCON memory suffixes. - file_size: 256M, - // Size of the batch sent downstream from the sort operator during - // the merge phase. Don't change this unless you know what you are doing, - // larger sizes can result in memory fragmentation, smaller sizes - // in excessive operator iterator overhead. - merge_batch_size = 16M + file_size: 256M } } }, @@ -421,7 +416,7 @@ drill.exec.options: { drill.exec.storage.implicit.fqn.column.label: "fqn", drill.exec.storage.implicit.suffix.column.label: "suffix", drill.exec.testing.controls: "{}", - drill.exec.memory.operator.output_batch_size : 33554432, # 32 MB + drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB exec.bulk_load_table_list.bulk_size: 1000, exec.compile.scalar_replacement: false, exec.enable_bulk_load_table_list: false, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 1315a8675cf..b0afbb267e8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -22,12 +22,13 @@ import static org.junit.Assert.assertTrue; import org.apache.drill.categories.OperatorTest; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask; -import org.apache.drill.test.ConfigBuilder; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.SubOperatorTest; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -42,8 +43,7 @@ public class TestExternalSortInternals extends SubOperatorTest { */ @Test public void testConfigDefaults() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getOptionManager()); // Zero means no artificial limit assertEquals(0, sortConfig.maxMemory()); // Zero mapped to large number @@ -67,16 +67,18 @@ public void testConfigDefaults() { @Test public void testConfigOverride() { // Verify the various HOCON ways of setting memory - DrillConfig drillConfig = new ConfigBuilder() + OperatorFixture.Builder builder = new OperatorFixture.Builder(); + builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K") .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10) .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, "10M") .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000) - .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000) .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50) .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 10) .build(); - SortConfig sortConfig = new SortConfig(drillConfig); + FragmentContext fragmentContext = builder.build().getFragmentContext(); + fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, 600_000); + SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions()); assertEquals(2000 * 1024, sortConfig.maxMemory()); assertEquals(10, sortConfig.mergeLimit()); assertEquals(10 * ONE_MEG, sortConfig.spillFileSize()); @@ -91,15 +93,17 @@ public void testConfigOverride() { */ @Test public void testConfigLimits() { - DrillConfig drillConfig = new ConfigBuilder() + OperatorFixture.Builder builder = new OperatorFixture.Builder(); + builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1) .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE - 1) .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1) - .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1) .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1) .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 0) .build(); - SortConfig sortConfig = new SortConfig(drillConfig); + FragmentContext fragmentContext = builder.build().getFragmentContext(); + fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1); + SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions()); assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit()); assertEquals(SortConfig.MIN_SPILL_FILE_SIZE, sortConfig.spillFileSize()); assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize()); @@ -110,8 +114,7 @@ public void testConfigLimits() { @Test public void testMemoryManagerBasics() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); long memoryLimit = 70 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -217,8 +220,7 @@ private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryMana @Test public void testSmallRows() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); long memoryLimit = 100 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -263,8 +265,7 @@ public void testSmallRows() { @Test public void testLowMemory() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); int memoryLimit = 10 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -306,8 +307,7 @@ public void testLowMemory() { @Test public void testLowerMemory() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); int memoryLimit = 10 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -352,8 +352,7 @@ public void testLowerMemory() { @Test public void testExtremeLowMemory() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); long memoryLimit = 10 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -391,8 +390,7 @@ public void testExtremeLowMemory() { @Test public void testMemoryOverflow() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); long memoryLimit = 10 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -413,34 +411,36 @@ public void testMemoryOverflow() { @Test public void testConfigConstraints() { - int memConstaint = 40 * ONE_MEG; - int batchSizeConstaint = ONE_MEG / 2; - int mergeSizeConstaint = ONE_MEG; - DrillConfig drillConfig = new ConfigBuilder() - .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstaint) - .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstaint) - .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, mergeSizeConstaint) + int memConstraint = 40 * ONE_MEG; + int batchSizeConstraint = ONE_MEG / 2; + int mergeSizeConstraint = ONE_MEG; + + OperatorFixture.Builder builder = new OperatorFixture.Builder(); + builder.configBuilder() + .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstraint) + .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstraint) .build(); - SortConfig sortConfig = new SortConfig(drillConfig); + FragmentContext fragmentContext = builder.build().getFragmentContext(); + fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, mergeSizeConstraint); + SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions()); long memoryLimit = 50 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); - assertEquals(batchSizeConstaint, memManager.getPreferredSpillBatchSize()); - assertEquals(mergeSizeConstaint, memManager.getPreferredMergeBatchSize()); - assertEquals(memConstaint, memManager.getMemoryLimit()); + assertEquals(batchSizeConstraint, memManager.getPreferredSpillBatchSize()); + assertEquals(mergeSizeConstraint, memManager.getPreferredMergeBatchSize()); + assertEquals(memConstraint, memManager.getMemoryLimit()); int rowWidth = 300; int rowCount = 10000; int batchSize = rowWidth * rowCount * 2; memManager.updateEstimates(batchSize, rowWidth, rowCount); - verifyCalcs(sortConfig, memConstaint, memManager, batchSize, rowWidth, rowCount); + verifyCalcs(sortConfig, memConstraint, memManager, batchSize, rowWidth, rowCount); } @Test public void testMemoryDynamics() { - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); long memoryLimit = 50 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); @@ -471,10 +471,12 @@ public void testMergeCalcs() { // No artificial merge limit int mergeLimitConstraint = 100; - DrillConfig drillConfig = new ConfigBuilder() + OperatorFixture.Builder builder = new OperatorFixture.Builder(); + builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint) .build(); - SortConfig sortConfig = new SortConfig(drillConfig); + FragmentContext fragmentContext = builder.build().getFragmentContext(); + SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions()); // Allow four spill batches, 8 MB each, plus one output of 16 // Allow for internal fragmentation @@ -569,9 +571,7 @@ public void testMergeCalcs() { @Test public void testMergeCalcsExtreme() { - - DrillConfig drillConfig = DrillConfig.create(); - SortConfig sortConfig = new SortConfig(drillConfig); + SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions()); // Force odd situation in which the spill batch is larger // than memory. Won't actually run, but needed to test @@ -600,10 +600,12 @@ public void testMergeCalcsExtreme() { public void testMergeLimit() { // Constrain merge width int mergeLimitConstraint = 5; - DrillConfig drillConfig = new ConfigBuilder() + OperatorFixture.Builder builder = new OperatorFixture.Builder(); + builder.configBuilder() .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint) .build(); - SortConfig sortConfig = new SortConfig(drillConfig); + FragmentContext fragmentContext = builder.build().getFragmentContext(); + SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions()); // Plenty of memory, memory will not be a limit long memoryLimit = 400 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java index a98547885f7..bcc53fa5e4c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java @@ -30,6 +30,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.spill.SpillSet; @@ -94,7 +95,7 @@ public static SortImpl makeSortImpl(OperatorFixture fixture, .setMinorFragmentId(3) .setQueryId(queryId) .build(); - SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig()); + SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig(), opContext.getFragmentContext().getOptions()); SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle, popConfig); PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);