Skip to content

Commit

Permalink
DRILL-5815: Option to set query memory as percent of total
Browse files Browse the repository at this point in the history
closes #960
  • Loading branch information
Paul Rogers committed Oct 16, 2017
1 parent 2444271 commit f781ce1
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 82 deletions.
Expand Up @@ -19,12 +19,12 @@

import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
import org.apache.drill.exec.server.options.OptionMetaData;
import org.apache.drill.exec.server.options.OptionValidator;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
import org.apache.drill.exec.server.options.TypeValidators.EnumeratedStringValidator;
import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
import org.apache.drill.exec.server.options.TypeValidators.MaxWidthValidator;
import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
Expand Down Expand Up @@ -358,13 +358,42 @@ private ExecConstants() {
public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY);

/**
* Maximum query memory per node (in MB). Re-plan with cheaper operators if memory estimation exceeds this limit.
* Maximum query memory per node (in MB). Re-plan with cheaper operators if
* memory estimation exceeds this limit.
* <p/>
* DEFAULT: 2048 MB
*/
public static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "planner.memory.max_query_memory_per_node";
public static final LongValidator MAX_QUERY_MEMORY_PER_NODE = new RangeLongValidator(MAX_QUERY_MEMORY_PER_NODE_KEY, 1024 * 1024, Long.MAX_VALUE);

/**
* Alternative way to compute per-query-per-node memory as a percent
* of the total available system memory.
* <p>
* Suggestion for computation.
* <ul>
* <li>Assume an allowance for non-managed operators. Default assumption:
* 50%</li>
* <li>Assume a desired number of concurrent queries. Default assumption:
* 10.</li>
* <li>The value of this parameter is<br>
* (1 - non-managed allowance) / concurrency</li>
* </ul>
* Doing the math produces the default 5% number. The actual number
* given is no less than the <tt>max_query_memory_per_node</tt>
* amount.
* <p>
* This number is used only when throttling is disabled. Setting the
* number to 0 effectively disables this technique as it will always
* produce values lower than <tt>max_query_memory_per_node</tt>.
* <p>
* DEFAULT: 5%
*/

public static String PERCENT_MEMORY_PER_QUERY_KEY = "planner.memory.percent_per_query";
public static DoubleValidator PERCENT_MEMORY_PER_QUERY = new RangeDoubleValidator(
PERCENT_MEMORY_PER_QUERY_KEY, 0, 1.0);

/**
* Minimum memory allocated to each buffered operator instance.
* <p/>
Expand Down
Expand Up @@ -167,6 +167,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION),
new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE),
new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY),
new OptionDefinition(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP),
new OptionDefinition(ExecConstants.NON_BLOCKING_OPERATORS_MEMORY),
new OptionDefinition(ExecConstants.HASH_JOIN_TABLE_FACTOR),
Expand Down
Expand Up @@ -27,6 +27,9 @@
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionSet;

import com.google.common.annotations.VisibleForTesting;

public class MemoryAllocationUtilities {

Expand Down Expand Up @@ -71,30 +74,90 @@ public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, fi
}

// if there are any sorts, compute the maximum allocation, and set it on them
if (bufferedOpList.size() > 0) {
final OptionManager optionManager = queryContext.getOptions();
double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth);
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);

// User configurable option to allow forcing minimum memory.
// Ensure that the buffered ops receive the minimum memory needed to make progress.
// Without this, the math might work out to allocate too little memory.
final long opMinMem = queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;

for(final PhysicalOperator op : bufferedOpList) {

long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
alloc = Math.max(alloc, opMinMem);
op.setMaxAllocation(alloc);
}
}
plan.getProperties().hasResourcePlan = true;
if (bufferedOpList.isEmpty()) {
return;
}

// Setup options, etc.

final OptionManager optionManager = queryContext.getOptions();
final long directMemory = DrillConfig.getMaxDirectMemory();

// Compute per-node, per-query memory.

final long maxAllocPerNode = computeQueryMemory(queryContext.getConfig(), optionManager, directMemory);
logger.debug("Memory per query per node: {}", maxAllocPerNode);

// Now divide up the memory by slices and operators.

final long opMinMem = computeOperatorMemory(optionManager, maxAllocPerNode, bufferedOpList.size());

for(final PhysicalOperator op : bufferedOpList) {
final long alloc = Math.max(opMinMem, op.getInitialAllocation());
op.setMaxAllocation(alloc);
}
}

/**
* Compute per-operator memory based on the computed per-node memory, the
* number of operators, and the computed number of fragments (which house
* the operators.) Enforces a floor on the amount of memory per operator.
*
* @param optionManager system option manager
* @param maxAllocPerNode computed query memory per node
* @param opCount number of buffering operators in this query
* @return the per-operator memory
*/

public static long computeOperatorMemory(OptionSet optionManager, long maxAllocPerNode, int opCount) {
final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
final double cpuLoadAverage = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth);
final long maxOperatorAlloc = maxAllocPerNode / (opCount * maxWidthPerNode);
logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);

// User configurable option to allow forcing minimum memory.
// Ensure that the buffered ops receive the minimum memory needed to make progress.
// Without this, the math might work out to allocate too little memory.

return Math.max(maxOperatorAlloc,
optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP));
}

/**
* Per-node memory calculations based on a number of constraints.
* <p>
* Factored out into a separate method to allow unit testing.
* @param config Drill config
* @param optionManager system options
* @param directMemory amount of direct memory
* @return memory per query per node
*/

@VisibleForTesting
public static long computeQueryMemory(DrillConfig config, OptionSet optionManager, long directMemory) {

// Memory computed as a percent of total memory.

long perQueryMemory = Math.round(directMemory *
optionManager.getOption(ExecConstants.PERCENT_MEMORY_PER_QUERY));

// But, must allow at least the amount given explicitly for
// backward compatibility.

perQueryMemory = Math.max(perQueryMemory,
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE));

// Compute again as either the total direct memory, or the
// configured maximum top-level allocation (10 GB).

long maxAllocPerNode = Math.min(directMemory,
config.getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));

// Final amount per node per query is the minimum of these two.

maxAllocPerNode = Math.min(maxAllocPerNode, perQueryMemory);
return maxAllocPerNode;
}
}
113 changes: 57 additions & 56 deletions exec/java-exec/src/main/resources/drill-module.conf
Expand Up @@ -19,7 +19,7 @@

drill {
classpath.scanning {
base.classes : ${?drill.classpath.scanning.base.classes} [
base.classes: ${?drill.classpath.scanning.base.classes} [
org.apache.drill.exec.expr.DrillFunc,
org.apache.drill.exec.expr.fn.PluggableFunctionRegistry,
org.apache.drill.exec.physical.base.PhysicalOperator,
Expand All @@ -33,7 +33,7 @@ drill {

annotations += org.apache.drill.exec.expr.annotations.FunctionTemplate

packages : ${?drill.classpath.scanning.packages} [
packages: ${?drill.classpath.scanning.packages} [
org.apache.drill.exec.expr,
org.apache.drill.exec.physical,
org.apache.drill.exec.store,
Expand Down Expand Up @@ -71,7 +71,7 @@ drill.exec: {
bit: {
timeout: 300,
server: {
port : 31011,
port: 31011,
retry:{
count: 7200,
delay: 500
Expand All @@ -89,7 +89,7 @@ drill.exec: {
}
}
},
use.ip : false
use.ip: false
},
optimizer: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
Expand Down Expand Up @@ -180,15 +180,15 @@ drill.exec: {
use_login_principal: false
}
security.user.encryption.sasl: {
enabled : false,
max_wrapped_size : 65536
enabled: false,
max_wrapped_size: 65536
}
security.bit.encryption.sasl: {
enabled : false,
max_wrapped_size : 65536
enabled: false,
max_wrapped_size: 65536
}
security.user.encryption.ssl: {
enabled : false
enabled: false
}
trace: {
directory: "/tmp/drill-trace",
Expand Down Expand Up @@ -388,10 +388,10 @@ drill.jdbc: {
# Users are not supposed to set these options in the drill-override.conf file.
# Users should use ALTER SYSTEM and ALTER SESSION to set the options.

drill.exec.options: {
drill.exec.options: {
bootstrap-storage-plugins.json: .sys.drill,
debug.validate_iterators : false,
debug.validate_vectors :false,
debug.validate_iterators: false,
debug.validate_vectors: false,
drill.exec.functions.cast_empty_string_to_null: false,
# Setting to control if HashAgg should fallback to older behavior of consuming
# unbounded memory. In case of 2 phase Agg when available memory is not enough
Expand All @@ -416,9 +416,9 @@ drill.exec.options: {
exec.hashagg.use_memory_prediction: true,
exec.impersonation.inbound_policies: "[]",
exec.java.compiler.exp_in_method_size: 50,
exec.java_compiler : "DEFAULT",
exec.java_compiler_debug :true,
exec.java_compiler_janino_maxsize : 262144,
exec.java_compiler: "DEFAULT",
exec.java_compiler_debug: true,
exec.java_compiler_janino_maxsize: 262144,
exec.max_hash_table_size: 1073741824,
exec.min_hash_table_size: 65536,
exec.persistent_table.umask: "002",
Expand All @@ -440,57 +440,58 @@ drill.exec.options: {
exec.udf.enable_dynamic_support: true,
exec.udf.use_dynamic: true,
new_view_default_permissions: 700,
org.apache.drill.exec.compile.ClassTransformer.scalar_replacement : "try",
planner.add_producer_consumer:false,
org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
planner.add_producer_consumer: false,
planner.affinity_factor: 1.2,
planner.broadcast_factor:1.0,
planner.broadcast_threshold:10000000,
planner.cpu_load_average : 0.70,
planner.disable_exchanges:false,
planner.enable_broadcast_join:true,
planner.enable_constant_folding:true,
planner.enable_decimal_data_type:false,
planner.enable_demux_exchange:false,
planner.enable_hash_single_key:true,
planner.enable_hashagg:true,
planner.enable_hashjoin:true,
planner.enable_hashjoin_swap:true,
planner.enable_hep_opt:true,
planner.enable_hep_partition_pruning:true,
planner.broadcast_factor: 1.0,
planner.broadcast_threshold: 10000000,
planner.cpu_load_average: 0.70,
planner.disable_exchanges: false,
planner.enable_broadcast_join: true,
planner.enable_constant_folding: true,
planner.enable_decimal_data_type: false,
planner.enable_demux_exchange: false,
planner.enable_hash_single_key: true,
planner.enable_hashagg: true,
planner.enable_hashjoin: true,
planner.enable_hashjoin_swap: true,
planner.enable_hep_opt: true,
planner.enable_hep_partition_pruning: true,
planner.enable_join_optimization: true,
planner.enable_limit0_optimization: false,
planner.enable_mergejoin:true,
planner.enable_multiphase_agg:true,
planner.enable_mux_exchange:true,
planner.enable_nestedloopjoin:true,
planner.enable_nljoin_for_scalar_only:true,
planner.enable_streamagg:true,
planner.enable_mergejoin: true,
planner.enable_multiphase_agg: true,
planner.enable_mux_exchange: true,
planner.enable_nestedloopjoin: true,
planner.enable_nljoin_for_scalar_only: true,
planner.enable_streamagg: true,
planner.enable_type_inference: true,
planner.enable_unionall_distribute:false,
planner.filter.max_selectivity_estimate_factor:1.0,
planner.filter.min_selectivity_estimate_factor:0.0,
planner.force_2phase_aggr : false,
planner.identifier_max_length:1024,
planner.enable_unionall_distribute: false,
planner.filter.max_selectivity_estimate_factor: 1.0,
planner.filter.min_selectivity_estimate_factor: 0.0,
planner.force_2phase_aggr: false,
planner.identifier_max_length: 1024,
planner.in_subquery_threshold: 20,
planner.join.hash_join_swap_margin_factor:10,
planner.join.row_count_estimate_factor:1.0,
planner.join.hash_join_swap_margin_factor: 10,
planner.join.row_count_estimate_factor: 1.0,
planner.memory.average_field_width: 8,
planner.memory.enable_memory_estimation: false,
planner.memory.hash_agg_table_factor: 1.1d,
planner.memory.hash_join_table_factor: 1.1d,
planner.memory.max_query_memory_per_node: 2147483648,
planner.memory.min_memory_per_buffered_op: 41943040,
planner.memory.max_query_memory_per_node: 2147483648, # 2 GB
planner.memory.percent_per_query: 0.05, # 5%
planner.memory.min_memory_per_buffered_op: 41943040, # 40 MB
planner.memory.non_blocking_operators_memory: 64,
planner.memory_limit:268435456,
planner.nestedloopjoin_factor:100.0,
planner.parser.quoting_identifiers :"`",
planner.partitioner_sender_max_threads:8,
planner.partitioner_sender_set_threads:-1,
planner.partitioner_sender_threads_factor:2,
planner.producer_consumer_queue_size:10,
planner.memory_limit: 268435456,
planner.nestedloopjoin_factor: 100.0,
planner.parser.quoting_identifiers: "`",
planner.partitioner_sender_max_threads: 8,
planner.partitioner_sender_set_threads: -1,
planner.partitioner_sender_threads_factor: 2,
planner.producer_consumer_queue_size: 10,
planner.slice_target: 100000,
planner.store.parquet.rowgroup.filter.pushdown.enabled : true,
planner.store.parquet.rowgroup.filter.pushdown.threshold : 10000,
planner.store.parquet.rowgroup.filter.pushdown.enabled: true,
planner.store.parquet.rowgroup.filter.pushdown.threshold: 10000,
# Max per node should always be configured as zero and
# it is dynamically computed based on cpu_load_average
planner.width.max_per_node: 0,
Expand Down Expand Up @@ -526,7 +527,7 @@ drill.exec.options: {
store.parquet.use_new_reader: false,
store.parquet.vector_fill_check_threshold: 10,
store.parquet.vector_fill_threshold: 85,
store.parquet.writer.use_single_fs_block : false,
store.parquet.writer.use_single_fs_block: false,
store.partition.hash_distribute: false,
store.text.estimated_row_size_bytes: 100.0,
web.logs.max_lines: 10000,
Expand Down
Expand Up @@ -55,6 +55,8 @@ public static void setup() throws Exception {
.configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // Unmanaged
.configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) // Unmanaged
.sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 60 * 1024 * 1024) // Spill early
// Prevent the percent-based memory rule from second-guessing the above.
.sessionOption(ExecConstants.PERCENT_MEMORY_PER_QUERY_KEY, 0.0)
.configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
.maxParallelization(1)
;
Expand Down

0 comments on commit f781ce1

Please sign in to comment.