Skip to content
Permalink
Browse files
DRILL-6537: Limit the batch size for buffering operators based on how…
… much memory they get

closes #1342
  • Loading branch information
Padma Penumarthy authored and vvysotskyi committed Jul 1, 2018
1 parent dcc2580 commit 482a63549e1bfe2b238ea9bdaf7d42312e1f51f6
Showing 4 changed files with 14 additions and 4 deletions.
@@ -85,6 +85,10 @@ private ExecConstants() {
// need to produce very large batches that take up lot of memory.
public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024);

// Based on available memory, adjust output batch size for buffered operators by this factor.
public static final String OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR = "drill.exec.memory.operator.output_batch_size_avail_mem_factor";
public static final DoubleValidator OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR = new RangeDoubleValidator(OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR, 0.01, 1.0);

// External Sort Boot configuration

public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
@@ -886,9 +886,13 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
partitions = new HashPartition[0];

// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);

batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
}

/**
@@ -233,6 +233,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
};

@@ -294,7 +295,7 @@ public SystemOptionManager(final DrillConfig bootConfig) {
* Initializes this option manager.
*
* @return this option manager
* @throws IOException
* @throws Exception
*/
public SystemOptionManager init() throws Exception {
options = provider.getOrCreateStore(config);
@@ -438,6 +438,7 @@ drill.exec.options: {
drill.exec.storage.implicit.suffix.column.label: "suffix",
drill.exec.testing.controls: "{}",
drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB
drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1,
exec.bulk_load_table_list.bulk_size: 1000,
exec.compile.scalar_replacement: false,
exec.enable_bulk_load_table_list: false,

0 comments on commit 482a635

Please sign in to comment.