From 32be4e3674b3fdd4db427a53f24d9cac3b98412e Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Mon, 11 Sep 2017 09:18:04 -0500 Subject: [PATCH] [STORM-2730] Add in config options for acker cpu and memory --- .../src/jvm/org/apache/storm/Config.java | 22 +++++++++++ .../storm/scheduler/TopologyDetails.java | 37 ++++++++++++++----- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 13b8a222dba..61c75e626d8 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -226,6 +226,28 @@ public class Config extends HashMap { @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT = "topology.component.cpu.pcore.percent"; + /** + * The maximum amount of memory an instance of an acker will take on heap. This enables the scheduler + * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override + */ + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB = "topology.acker.resources.onheap.memory.mb"; + + /** + * The maximum amount of memory an instance of an acker will take off heap. This enables the scheduler + * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override + */ + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB = "topology.acker.resources.offheap.memory.mb"; + + /** + * The config indicates the percentage of cpu for a core an instance(executor) of an acker will use. + * Assuming the a core value to be 100, a value of 10 indicates 10% of the core. + * The P in PCORE represents the term "physical". A default value will be set for this config if user does not override + */ + @isPositiveNumber(includeZero = true) + public static final String TOPOLOGY_ACKER_CPU_PCORE_PERCENT = "topology.acker.cpu.pcore.percent"; + /** * The class name of the {@link org.apache.storm.state.StateProvider} implementation. If not specified * defaults to {@link org.apache.storm.state.InMemoryKeyValueStateProvider}. This can be overridden diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java index 575463b04f8..fb1b0f76435 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.storm.Config; +import org.apache.storm.daemon.Acker; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.ComponentType; @@ -166,14 +167,6 @@ private void initResourceList() { // topology.getbolt (AKA sys tasks most specifically __acker tasks) for (ExecutorDetails exec : getExecutors()) { if (!resourceList.containsKey(exec)) { - LOG.debug( - "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and " - + "CPU requirement as {}", - getExecutorToComponent().get(exec), - exec, - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), - topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), - topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); addDefaultResforExec(exec); } } @@ -482,16 +475,42 @@ private void addDefaultResforExec(ExecutorDetails exec) { defaultResourceList.put( Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topologyComponentResourcesOnheapMemoryMb); + + adjustResourcesForExec(exec, defaultResourceList); + LOG.debug( - "Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " + "Scheduling Executor: {} {} with memory requirement as onHeap: {} - offHeap: {} " + "and CPU requirement: {}", + getExecutorToComponent().get(exec), exec, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)); + addResourcesForExec(exec, defaultResourceList); } + /** + * Some components might have different resource configs. + */ + private void adjustResourcesForExec(ExecutorDetails exec, Map resourceListForExec) { + String component = getExecutorToComponent().get(exec); + if (component.equals(Acker.ACKER_COMPONENT_ID)) { + if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)) { + resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB))); + } + if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)) { + resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB))); + } + if (topologyConf.containsKey(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)) { + resourceListForExec.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, + ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT))); + } + } + } + /** * initializes member variables */