diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 6097b359ec7..22c8bcc048d 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -261,7 +261,7 @@
**/generated/**
- 10298
+ 10263
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index c8d3b09668a..a8e2c012769 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -64,17 +64,5 @@ public class Constants {
public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
-
- public static final Map resourceNameMapping;
-
- static {
- Map tmp = new HashMap<>();
- tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
- tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- resourceNameMapping = Collections.unmodifiableMap(tmp);
- }
}
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 18fec7f6b7c..d6554de4508 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
maven-checkstyle-plugin
- 2655
+ 2630
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index fc427a8e8a0..ea3c9c4a3df 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -52,6 +52,7 @@
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Constants;
@@ -148,6 +149,7 @@
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.INimbusCredentialPlugin;
@@ -967,13 +969,12 @@ private static Map> extr
return ret;
}
- private static Map setResourcesDefaultIfNotSet(Map> compResourcesMap, String compId, Map topoConf) {
- Map resourcesMap = compResourcesMap.get(compId);
- if (resourcesMap == null) {
- resourcesMap = new HashMap<>();
+ private static void setResourcesDefaultIfNotSet(Map compResourcesMap, String compId,
+ Map topoConf) {
+ NormalizedResourceRequest resources = compResourcesMap.get(compId);
+ if (resources == null) {
+ compResourcesMap.put(compId, new NormalizedResourceRequest(topoConf));
}
- ResourceUtils.checkInitialization(resourcesMap, compId, topoConf);
- return resourcesMap;
}
private static void validatePortAvailable(Map conf) throws IOException {
@@ -2563,20 +2564,16 @@ static void validateTopologyWorkerMaxHeapSizeConfigs(
private static double getMaxExecutorMemoryUsageForTopo(
StormTopology topology, Map topologyConf) {
double largestMemoryOperator = 0.0;
- for (Map entry :
+ for (NormalizedResourceRequest entry :
ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
- double memoryRequirement =
- entry.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
- + entry.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ double memoryRequirement = entry.getTotalMemoryMb();
if (memoryRequirement > largestMemoryOperator) {
largestMemoryOperator = memoryRequirement;
}
}
- for (Map entry :
+ for (NormalizedResourceRequest entry :
ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
- double memoryRequirement =
- entry.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
- + entry.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ double memoryRequirement = entry.getTotalMemoryMb();
if (memoryRequirement > largestMemoryOperator) {
largestMemoryOperator = memoryRequirement;
}
@@ -3649,16 +3646,18 @@ public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolea
topoPageInfo.set_storm_version(topology.get_storm_version());
}
- Map> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+ Map spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
for (Entry entry: topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- commonStats.set_resources_map(setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf));
+ setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
}
- Map> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+ Map boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
for (Entry entry: topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- commonStats.set_resources_map(setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf));
+ setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
}
if (workerSummaries != null) {
@@ -3816,11 +3815,17 @@ public ComponentPageInfo getComponentPageInfo(String topoId, String componentId,
ComponentPageInfo compPageInfo = StatsUtil.aggCompExecsStats(exec2HostPort, info.taskToComponent, info.beats, window,
includeSys, topoId, topology, componentId);
if (compPageInfo.get_component_type() == ComponentType.SPOUT) {
- compPageInfo.set_resources_map(setResourcesDefaultIfNotSet(
- ResourceUtils.getSpoutsResources(topology, topoConf), componentId, topoConf));
+ NormalizedResourceRequest spoutResources = ResourceUtils.getSpoutResources(topology, topoConf, componentId);
+ if (spoutResources == null) {
+ spoutResources = new NormalizedResourceRequest(topoConf);
+ }
+ compPageInfo.set_resources_map(spoutResources.toNormalizedMap());
} else { //bolt
- compPageInfo.set_resources_map(setResourcesDefaultIfNotSet(
- ResourceUtils.getBoltsResources(topology, topoConf), componentId, topoConf));
+ NormalizedResourceRequest boltResources = ResourceUtils.getBoltResources(topology, topoConf, componentId);
+ if (boltResources == null) {
+ boltResources = new NormalizedResourceRequest(topoConf);
+ }
+ compPageInfo.set_resources_map(boltResources.toNormalizedMap());
}
compPageInfo.set_topology_name(info.topoName);
compPageInfo.set_errors(stormClusterState.errors(topoId, componentId));
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 26fb72923a0..87e310ea4bb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -26,13 +26,12 @@
import org.apache.storm.utils.Time;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap;
+import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
public class SupervisorHeartbeat implements Runnable {
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 93e4dfaba23..c311897b5f0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -19,7 +19,6 @@
package org.apache.storm.scheduler;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -29,13 +28,14 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
@@ -43,7 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap;
+import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
@@ -79,7 +79,7 @@ public class Cluster implements ISchedulingState {
private Set blackListedHosts = new HashSet<>();
private INimbus inimbus;
private final Topologies topologies;
- private final Map scheduledCPUCache = new HashMap<>();
+ private final Map scheduledCpuCache = new HashMap<>();
private final Map scheduledMemoryCache = new HashMap<>();
public Cluster(
@@ -327,10 +327,10 @@ public Set getAvailablePorts(SupervisorDetails supervisor) {
@Override
public Set getAssignablePorts(SupervisorDetails supervisor) {
- if (isBlackListed(supervisor.id)) {
+ if (isBlackListed(supervisor.getId())) {
return Collections.emptySet();
}
- return supervisor.allPorts;
+ return supervisor.getAllPorts();
}
@Override
@@ -417,33 +417,18 @@ private void addResource(Map resourceMap, String resourceName, D
private WorkerResources calculateWorkerResources(
TopologyDetails td, Collection executors) {
- Map totalResources = new HashMap<>();
+ NormalizedResourceRequest totalResources = new NormalizedResourceRequest();
Map sharedTotalResources = new HashMap<>();
for (ExecutorDetails exec : executors) {
- Map allResources = td.getTotalResources(exec);
+ NormalizedResourceRequest allResources = td.getTotalResources(exec);
if (allResources == null) {
continue;
}
- for (Entry resource : allResources.entrySet()) {
-
- if (!totalResources.containsKey(resource.getKey())) {
- totalResources.put(resource.getKey(), 0.0);
- }
- totalResources.put(
- resource.getKey(),
- totalResources.get(resource.getKey()) + resource.getValue());
- }
+ totalResources.add(allResources);
}
- totalResources = normalizedResourceMap(totalResources);
for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
- addResource(
- totalResources,
- Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, shared.get_off_heap_worker()
- );
- addResource(
- totalResources,
- Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap()
- );
+ totalResources.addOffHeap(shared.get_off_heap_worker());
+ totalResources.addOnHeap(shared.get_off_heap_worker());
addResource(
sharedTotalResources,
@@ -456,21 +441,12 @@ private WorkerResources calculateWorkerResources(
}
sharedTotalResources = normalizedResourceMap(sharedTotalResources);
WorkerResources ret = new WorkerResources();
- ret.set_resources(totalResources);
+ ret.set_resources(totalResources.toNormalizedMap());
ret.set_shared_resources(sharedTotalResources);
- ret.set_cpu(
- totalResources.getOrDefault(
- Constants.COMMON_CPU_RESOURCE_NAME, 0.0)
- );
- ret.set_mem_off_heap(
- totalResources.getOrDefault(
- Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
- );
- ret.set_mem_on_heap(
- totalResources.getOrDefault(
- Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0)
- );
+ ret.set_cpu(totalResources.getTotalCpu());
+ ret.set_mem_off_heap(totalResources.getOffHeapMemoryMb());
+ ret.set_mem_on_heap(totalResources.getOnHeapMemoryMb());
ret.set_shared_mem_off_heap(
sharedTotalResources.getOrDefault(
Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
@@ -487,32 +463,12 @@ public boolean wouldFit(
WorkerSlot ws,
ExecutorDetails exec,
TopologyDetails td,
- Map resourcesAvailable,
+ NormalizedResourceOffer resourcesAvailable,
double maxHeap) {
- Map requestedResources = td.getTotalResources(exec);
-
- for (Entry resourceNeededEntry : requestedResources.entrySet()) {
- String resourceName = resourceNeededEntry.getKey().toString();
- if (resourceName.equals(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME) ||
- resourceName.equals(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME)) {
- continue;
- }
- Double resourceNeeded = ObjectReader.getDouble(resourceNeededEntry.getValue());
- Double resourceAvailable = ObjectReader.getDouble(resourcesAvailable.get(resourceName), 0.0);
- if (resourceNeeded > resourceAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough {} {} > {}",
- td.getName(),
- exec,
- ws,
- resourceName,
- resourceNeeded,
- resourceAvailable);
- }
- //Not enough resources - stop trying
- return false;
- }
+ NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
+ if (!resourcesAvailable.couldHoldIgnoringMemory(requestedResources)) {
+ return false;
}
double currentTotal = 0.0;
@@ -539,8 +495,7 @@ public boolean wouldFit(
}
double memoryAdded = afterTotal - currentTotal;
- double memoryAvailable = ObjectReader.getDouble(resourcesAvailable.get(
- Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME), 0.0);
+ double memoryAvailable = resourcesAvailable.getTotalMemoryMb();
if (memoryAdded > memoryAvailable) {
if (LOG.isTraceEnabled()) {
@@ -613,7 +568,7 @@ public void assign(WorkerSlot slot, String topologyId, Collection getSupervisors() {
public double getClusterTotalCpuResource() {
double sum = 0.0;
for (SupervisorDetails sup : supervisors.values()) {
- sum += sup.getTotalCPU();
+ sum += sup.getTotalCpu();
}
return sum;
}
@@ -935,7 +890,7 @@ public Map getTopologyResourcesMap() {
public Map getSupervisorsResourcesMap() {
Map ret = new HashMap<>();
for (SupervisorDetails sd : supervisors.values()) {
- ret.put(sd.getId(), new SupervisorResources(sd.getTotalMemory(), sd.getTotalCPU(), 0, 0));
+ ret.put(sd.getId(), new SupervisorResources(sd.getTotalMemory(), sd.getTotalCpu(), 0, 0));
}
for (SchedulerAssignmentImpl assignment : assignments.values()) {
for (Entry entry :
@@ -986,27 +941,18 @@ public WorkerResources getWorkerResources(WorkerSlot ws) {
}
@Override
- public Map getAllScheduledResourcesForNode(String nodeId) {
- Map totalScheduledResources = new HashMap<>();
+ public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
+ NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
for (SchedulerAssignmentImpl assignment : assignments.values()) {
for (Entry entry :
assignment.getScheduledResources().entrySet()) {
if (nodeId.equals(entry.getKey().getNodeId())) {
- WorkerResources resources = entry.getValue();
- for (Map.Entry resourceEntry : resources.get_resources().entrySet()) {
- Double currentResourceValue = totalScheduledResources.getOrDefault(resourceEntry.getKey(), 0.0);
- totalScheduledResources.put(
- resourceEntry.getKey(),
- currentResourceValue + ObjectReader.getDouble(resourceEntry.getValue()));
- }
-
+ totalScheduledResources.add(entry.getValue());
}
}
Double sharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId);
if (sharedOffHeap != null) {
- String resourceName = Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME;
- Double currentResourceValue = totalScheduledResources.getOrDefault(resourceName, 0.0);
- totalScheduledResources.put(resourceName, currentResourceValue + sharedOffHeap);
+ totalScheduledResources.addOffHeap(sharedOffHeap);
}
}
return totalScheduledResources;
@@ -1038,7 +984,7 @@ public double getScheduledMemoryForNode(String nodeId) {
@Override
public double getScheduledCpuForNode(String nodeId) {
- Double ret = scheduledCPUCache.get(nodeId);
+ Double ret = scheduledCpuCache.get(nodeId);
if (ret != null) {
return ret;
}
@@ -1052,7 +998,7 @@ public double getScheduledCpuForNode(String nodeId) {
}
}
}
- scheduledCPUCache.put(nodeId, totalCpu);
+ scheduledCpuCache.put(nodeId, totalCpu);
return totalCpu;
}
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
similarity index 87%
rename from storm-client/src/jvm/org/apache/storm/scheduler/ExecutorDetails.java
rename to storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
index d1e776d17c2..bbbbf3f2eca 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/ExecutorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler;
public class ExecutorDetails {
- int startTask;
- int endTask;
+ public final int startTask;
+ public final int endTask;
- public ExecutorDetails(int startTask, int endTask){
+ public ExecutorDetails(int startTask, int endTask) {
this.startTask = startTask;
this.endTask = endTask;
}
@@ -34,6 +35,7 @@ public int getEndTask() {
return endTask;
}
+ @Override
public boolean equals(Object other) {
if (other == null || !(other instanceof ExecutorDetails)) {
return false;
@@ -42,13 +44,14 @@ public boolean equals(Object other) {
ExecutorDetails executor = (ExecutorDetails)other;
return (this.startTask == executor.startTask) && (this.endTask == executor.endTask);
}
-
+
+ @Override
public int hashCode() {
return this.startTask + 13 * this.endTask;
}
@Override
public String toString() {
- return "[" + this.startTask + ", " + this.endTask + "]";
+ return "[" + this.startTask + ", " + this.endTask + "]";
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index b539b1f935a..7175b7e72c3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -25,6 +25,8 @@
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
/** An interface that provides access to the current scheduling state. */
public interface ISchedulingState {
@@ -179,7 +181,7 @@ boolean wouldFit(
WorkerSlot ws,
ExecutorDetails exec,
TopologyDetails td,
- Map resourcesAvailable,
+ NormalizedResourceOffer resourcesAvailable,
double maxHeap);
/** get the current assignment for the topology. */
@@ -205,8 +207,8 @@ boolean wouldFit(
/** Get all the supervisors. */
Map getSupervisors();
- /** Get all scheduled resources for node **/
- Map getAllScheduledResourcesForNode(String nodeId);
+ /** Get all scheduled resources for node. **/
+ NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId);
/** Get the total amount of CPU resources in cluster. */
double getClusterTotalCpuResource();
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
similarity index 74%
rename from storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
rename to storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index 58ee98a2791..c16a67f4241 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler;
import java.util.Collection;
@@ -26,16 +27,16 @@
public interface SchedulerAssignment {
/**
- * Does this slot occupied by this assignment?
- * @param slot
- * @return true if the slot is occupied else false
+ * Is this slot part of this assignment or not.
+ * @param slot the slot to check.
+ * @return true if the slot is occupied by this assignment else false.
*/
public boolean isSlotOccupied(WorkerSlot slot);
/**
- * Is the executor assigned?
+ * Is the executor assigned or not.
*
- * @param executor
+ * @param executor the executor to check it if is assigned.
* @return true if it is assigned else false
*/
public boolean isExecutorAssigned(ExecutorDetails executor);
@@ -47,25 +48,37 @@ public interface SchedulerAssignment {
public String getTopologyId();
/**
+ * Get the map of executor to WorkerSlot.
* @return the executor -> slot map.
*/
public Map getExecutorToSlot();
/**
+ * Get the set of all executors.
* @return the executors covered by this assignments
*/
public Set getExecutors();
-
+
+ /**
+ * Get the set of all slots that are a part of this.
+ * @return the set of all slots.
+ */
public Set getSlots();
+ /**
+ * Get the mapping of slot to executors on that slot.
+ * @return the slot to the executors assigned to that slot.
+ */
public Map> getSlotToExecutors();
/**
+ * Get the slot to resource mapping.
* @return The slot to resource mapping
*/
public Map getScheduledResources();
/**
+ * Get the total shared off heap memory mapping.
* @return host to total shared off heap memory mapping.
*/
public Map getNodeIdToTotalSharedOffHeapMemory();
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
similarity index 88%
rename from storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
rename to storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index bff68fed670..f92c3d1732b 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.scheduler;
import java.util.ArrayList;
@@ -39,7 +40,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
private final String topologyId;
/**
- * assignment detail, a mapping from executor to WorkerSlot
+ * assignment detail, a mapping from executor to WorkerSlot
.
*/
private final Map executorToSlot = new HashMap<>();
private final Map resources = new HashMap<>();
@@ -47,6 +48,13 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
//Used to cache the slotToExecutors mapping.
private Map> slotToExecutorsCache = null;
+ /**
+ * Create a new assignment.
+ * @param topologyId the id of the topology the assignment is for.
+ * @param executorToSlot the executor to slot mapping for the assignment. Can be null and set through other methods later.
+ * @param resources the resources for the current assignments. Can be null and set through other methods later.
+ * @param nodeIdToTotalSharedOffHeap the shared memory for this assignment can be null and set through other methods later.
+ */
public SchedulerAssignmentImpl(String topologyId, Map executorToSlot,
Map resources, Map nodeIdToTotalSharedOffHeap) {
this.topologyId = topologyId;
@@ -84,6 +92,11 @@ public String toString() {
return this.getClass().getSimpleName() + " topo: " + topologyId + " execToSlots: " + executorToSlot;
}
+ /**
+ * Like the equals command, but ignores the resources.
+ * @param other the object to check for equality against.
+ * @return true if they are equal, ignoring resources, else false.
+ */
public boolean equalsIgnoreResources(Object other) {
if (other == this) {
return true;
@@ -93,8 +106,8 @@ public boolean equalsIgnoreResources(Object other) {
}
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
- return this.topologyId.equals(o.topologyId) &&
- this.executorToSlot.equals(o.executorToSlot);
+ return topologyId.equals(o.topologyId)
+ && executorToSlot.equals(o.executorToSlot);
}
@Override
@@ -113,8 +126,8 @@ public boolean equals(Object other) {
}
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
- return this.resources.equals(o.resources) &&
- this.nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+ return resources.equals(o.resources)
+ && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
}
@Override
@@ -131,7 +144,7 @@ public void assign(WorkerSlot slot, Collection executors) {
* Assign the slot to executors.
*/
public void assign(WorkerSlot slot, Collection executors, WorkerResources slotResources) {
- assert(slot != null);
+ assert slot != null;
for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
@@ -178,33 +191,32 @@ public void unassignBySlot(WorkerSlot slot) {
}
}
- /**
- * @param slot
- * @return true if slot is occupied by this assignment
- */
+ @Override
public boolean isSlotOccupied(WorkerSlot slot) {
return this.executorToSlot.containsValue(slot);
}
+ @Override
public boolean isExecutorAssigned(ExecutorDetails executor) {
return this.executorToSlot.containsKey(executor);
}
-
+
+ @Override
public String getTopologyId() {
return this.topologyId;
}
+ @Override
public Map getExecutorToSlot() {
return this.executorToSlot;
}
- /**
- * @return the executors covered by this assignments
- */
+ @Override
public Set getExecutors() {
return this.executorToSlot.keySet();
}
+ @Override
public Map> getSlotToExecutors() {
Map> ret = slotToExecutorsCache;
if (ret != null) {
diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
similarity index 67%
rename from storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
rename to storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 0df8bdd369d..3d087155ae5 100644
--- a/storm-client/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -22,36 +22,44 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-
-
import org.apache.storm.Constants;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SupervisorDetails {
private static final Logger LOG = LoggerFactory.getLogger(SupervisorDetails.class);
- String id;
+ private final String id;
/**
* hostname of this supervisor.
*/
- String host;
- Object meta;
+ private final String host;
+ private final Object meta;
/**
* meta data configured for this supervisor.
*/
- Object schedulerMeta;
+ private final Object schedulerMeta;
/**
* all the ports of the supervisor.
*/
- Set allPorts;
+ private Set allPorts;
/**
* Map containing a manifest of resources for the node the supervisor resides.
*/
- private Map _total_resources;
+ private final NormalizedResourceOffer totalResources;
+ /**
+ * Create the details of a new supervisor.
+ * @param id the ID as reported by the supervisor.
+ * @param host the host the supervisor is on.
+ * @param meta meta data reported by the supervisor (should be a collection of the ports on the supervisor).
+ * @param schedulerMeta Not used and can probably be removed.
+ * @param allPorts all of the ports for the supervisor (a better version of meta)
+ * @param totalResources all of the resources for this supervisor.
+ */
public SupervisorDetails(String id, String host, Object meta, Object schedulerMeta,
- Collection extends Number> allPorts, Map total_resources) {
+ Collection extends Number> allPorts, Map totalResources) {
this.id = id;
this.host = host;
@@ -62,16 +70,16 @@ public SupervisorDetails(String id, String host, Object meta, Object schedulerMe
} else {
this.allPorts = new HashSet<>();
}
- this._total_resources = total_resources;
- LOG.debug("Creating a new supervisor ({}-{}) with resources: {}", this.host, this.id, total_resources);
+ this.totalResources = new NormalizedResourceOffer(totalResources);
+ LOG.debug("Creating a new supervisor ({}-{}) with resources: {}", this.host, this.id, totalResources);
}
- public SupervisorDetails(String id, Object meta){
+ public SupervisorDetails(String id, Object meta) {
this(id, null, meta, null, null, null);
}
- public SupervisorDetails(String id, Object meta, Map total_resources) {
- this(id, null, meta, null, null, total_resources);
+ public SupervisorDetails(String id, Object meta, Map totalResources) {
+ this(id, null, meta, null, null, totalResources);
}
public SupervisorDetails(String id, Object meta, Collection extends Number> allPorts) {
@@ -83,8 +91,8 @@ public SupervisorDetails(String id, String host, Object schedulerMeta, Collectio
}
public SupervisorDetails(String id, String host, Object schedulerMeta,
- Collection extends Number> allPorts, Map total_resources) {
- this(id, host, null, schedulerMeta, allPorts, total_resources);
+ Collection extends Number> allPorts, Map totalResources) {
+ this(id, host, null, schedulerMeta, allPorts, totalResources);
}
@Override
@@ -122,26 +130,24 @@ public Object getSchedulerMeta() {
return this.schedulerMeta;
}
- private Double getTotalResource(String type) {
- return this._total_resources.get(type);
- }
-
+ /**
+ * Get the total Memory on this supervisor in MB.
+ */
public double getTotalMemory() {
- Double totalMemory = getTotalResource(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
- assert totalMemory != null;
- return totalMemory;
+ return totalResources.getTotalMemoryMb();
}
- public double getTotalCPU() {
- Double totalCPU = getTotalResource(Constants.COMMON_CPU_RESOURCE_NAME);
- assert totalCPU != null;
- return totalCPU;
+ /**
+ * Get the total CPU on this supervisor in % CPU.
+ */
+ public double getTotalCpu() {
+ return totalResources.getTotalCpu();
}
/**
- * get all resources for this Supervisor.
+ * Get all resources for this Supervisor.
*/
- public Map getTotalResources() {
- return _total_resources;
+ public NormalizedResourceOffer getTotalResources() {
+ return totalResources;
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index d92c80af581..8e4c1d532c6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -35,15 +35,13 @@
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap;
-
public class TopologyDetails {
private final String topologyId;
private final Map topologyConf;
@@ -51,7 +49,7 @@ public class TopologyDetails {
private final Map executorToComponent;
private final int numWorkers;
//>>
- private Map> resourceList;
+ private Map resourceList;
//Max heap size for a worker used by topology
private Double topologyWorkerMaxHeapSize;
//topology priority
@@ -138,12 +136,10 @@ private void initResourceList() {
if (topology.get_bolts() != null) {
for (Map.Entry bolt : topology.get_bolts().entrySet()) {
//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
- Map topologyResources =
- ResourceUtils.parseResources(bolt.getValue().get_common().get_json_conf());
- ResourceUtils.checkInitialization(topologyResources, bolt.getKey(), this.topologyConf);
+ NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(bolt.getValue().get_common(), topologyConf);
for (Map.Entry anExecutorToComponent :
executorToComponent.entrySet()) {
- if (bolt.getKey().equals(anExecutorToComponent.getValue()) && !topologyResources.isEmpty()) {
+ if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
resourceList.put(anExecutorToComponent.getKey(), topologyResources);
}
}
@@ -152,12 +148,10 @@ private void initResourceList() {
// Extract spout resource info
if (topology.get_spouts() != null) {
for (Map.Entry spout : topology.get_spouts().entrySet()) {
- Map topologyResources =
- ResourceUtils.parseResources(spout.getValue().get_common().get_json_conf());
- ResourceUtils.checkInitialization(topologyResources, spout.getKey(), this.topologyConf);
+ NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(spout.getValue().get_common(), topologyConf);
for (Map.Entry anExecutorToComponent :
executorToComponent.entrySet()) {
- if (spout.getKey().equals(anExecutorToComponent.getValue()) && !topologyResources.isEmpty()) {
+ if (spout.getKey().equals(anExecutorToComponent.getValue())) {
resourceList.put(anExecutorToComponent.getKey(), topologyResources);
}
}
@@ -261,8 +255,7 @@ public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = resourceList
- .get(exec)
- .get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+ .get(exec).getOnHeapMemoryMb();;
}
return ret;
}
@@ -276,8 +269,7 @@ public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = resourceList
- .get(exec)
- .get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ .get(exec).getOffHeapMemoryMb();
}
return ret;
}
@@ -335,7 +327,7 @@ public Set getSharedMemoryRequests(
* @param exec the executor to get the resources for.
* @return Double the total about of cpu requirement for executor
*/
- public Map getTotalResources(ExecutorDetails exec) {
+ public NormalizedResourceRequest getTotalResources(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return this.resourceList.get(exec);
}
@@ -350,8 +342,7 @@ public Map getTotalResources(ExecutorDetails exec) {
public Double getTotalCpuReqTask(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return resourceList
- .get(exec)
- .get(Constants.COMMON_CPU_RESOURCE_NAME);
+ .get(exec).getTotalCpu();
}
return null;
}
@@ -440,7 +431,7 @@ public double getTotalRequestedCpu() {
* @param exec
* @return a map containing the resource requirements for this exec
*/
- public Map getTaskResourceReqList(ExecutorDetails exec) {
+ public NormalizedResourceRequest getTaskResourceReqList(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return resourceList.get(exec);
}
@@ -458,7 +449,7 @@ public boolean hasExecInTopo(ExecutorDetails exec) {
/**
* add resource requirements for a executor.
*/
- public void addResourcesForExec(ExecutorDetails exec, Map resourceList) {
+ public void addResourcesForExec(ExecutorDetails exec, NormalizedResourceRequest resourceList) {
if (hasExecInTopo(exec)) {
LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
return;
@@ -470,49 +461,7 @@ public void addResourcesForExec(ExecutorDetails exec, Map resour
* Add default resource requirements for a executor.
*/
private void addDefaultResforExec(ExecutorDetails exec) {
- Double topologyComponentCpuPcorePercent =
- ObjectReader.getDouble(
- topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
- Double topologyComponentResourcesOffheapMemoryMb =
- ObjectReader.getDouble(
- topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
- Double topologyComponentResourcesOnheapMemoryMb =
- ObjectReader.getDouble(
- topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-
- assert topologyComponentCpuPcorePercent != null;
- assert topologyComponentResourcesOffheapMemoryMb != null;
- assert topologyComponentResourcesOnheapMemoryMb != null;
-
- Map defaultResourceList = new HashMap<>();
- defaultResourceList.put(
- Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topologyComponentCpuPcorePercent);
- defaultResourceList.put(
- Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
- topologyComponentResourcesOffheapMemoryMb);
- defaultResourceList.put(
- Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
- topologyComponentResourcesOnheapMemoryMb);
-
- adjustResourcesForExec(exec, defaultResourceList);
-
- Map topologyComponentResourcesMap = (
- Map) this.topologyConf.getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>());
-
- topologyComponentResourcesMap = normalizedResourceMap(topologyComponentResourcesMap);
-
- LOG.info("Scheduling Executor: {} with resource requirement as {}",
- exec, topologyComponentResourcesMap);
- LOG.debug(
- "Scheduling Executor: {} {} with memory requirement as onHeap: {} - offHeap: {} "
- + "and CPU requirement: {}",
- getExecutorToComponent().get(exec),
- exec,
- topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
- topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
- topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
- addResourcesForExec(exec, normalizedResourceMap(defaultResourceList));
+ addResourcesForExec(exec, new NormalizedResourceRequest(topologyConf));
}
/**
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
new file mode 100644
index 00000000000..262365552b7
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An offer of resources that has been normalized.
+ */
+public class NormalizedResourceOffer extends NormalizedResources {
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
+ private double totalMemory;
+
+ /**
+ * Create a new normalized set of resources. Note that memory is not covered here becasue it is not consistent in requests vs offers
+ * because of how on heap vs off heap is used.
+ *
+ * @param resources the resources to be normalized.
+ */
+ public NormalizedResourceOffer(Map resources) {
+ super(resources, null);
+ }
+
+ public NormalizedResourceOffer() {
+ super(null, null);
+ }
+
+ public NormalizedResourceOffer(NormalizedResourceOffer other) {
+ super(other);
+ this.totalMemory = other.totalMemory;
+ }
+
+ @Override
+ protected void initializeMemory(Map normalizedResources) {
+ totalMemory = normalizedResources.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return totalMemory;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " MEM: " + totalMemory;
+ }
+
+ @Override
+ public Map toNormalizedMap() {
+ Map ret = super.toNormalizedMap();
+ ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemory);
+ return ret;
+ }
+
+ @Override
+ public void add(NormalizedResources other) {
+ super.add(other);
+ totalMemory += other.getTotalMemoryMb();
+ }
+
+ @Override
+ public void remove(NormalizedResources other) {
+ super.remove(other);
+ totalMemory -= other.getTotalMemoryMb();
+ assert totalMemory >= 0.0;
+ }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
new file mode 100644
index 00000000000..926184c35fc
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A request that has been normalized.
+ */
+public class NormalizedResourceRequest extends NormalizedResources {
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+ private static void putIfMissing(Map dest, String destKey, Map src, String srcKey) {
+ if (!dest.containsKey(destKey)) {
+ Number value = (Number)src.get(srcKey);
+ if (value != null) {
+ dest.put(destKey, value.doubleValue());
+ }
+ }
+ }
+
+ private static Map getDefaultResources(Map topoConf) {
+ Map ret = normalizedResourceMap((Map) topoConf.getOrDefault(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+ putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ return ret;
+ }
+
+ private static Map parseResources(String input) {
+ Map topologyResources = new HashMap<>();
+ JSONParser parser = new JSONParser();
+ LOG.debug("Input to parseResources {}", input);
+ try {
+ if (input != null) {
+ Object obj = parser.parse(input);
+ JSONObject jsonObject = (JSONObject) obj;
+
+ // Legacy resource parsing
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+ Double topoMemOnHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+ Double topoMemOffHeap = ObjectReader
+ .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+ }
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+ Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+ null);
+ topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+ }
+
+ // If resource is also present in resources map will overwrite the above
+ if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+ Map rawResourcesMap =
+ (Map) jsonObject.computeIfAbsent(
+ Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+
+ for (Map.Entry stringNumberEntry : rawResourcesMap.entrySet()) {
+ topologyResources.put(
+ stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
+ }
+
+
+ }
+ }
+ } catch (ParseException e) {
+ LOG.error("Failed to parse component resources is:" + e.toString(), e);
+ return null;
+ }
+ return topologyResources;
+ }
+
+ private double onHeap;
+ private double offHeap;
+
+ /**
+ * Create a new normalized set of resources. Note that memory is not covered here becasue it is not consistent in requests vs offers
+ * because of how on heap vs off heap is used.
+ *
+ * @param resources the resources to be normalized.
+ * @param topologyConf the config for the topology
+ */
+ private NormalizedResourceRequest(Map resources,
+ Map topologyConf) {
+ super(resources, getDefaultResources(topologyConf));
+ }
+
+ public NormalizedResourceRequest(ComponentCommon component, Map topoConf) {
+ this(parseResources(component.get_json_conf()), topoConf);
+ }
+
+ public NormalizedResourceRequest(Map topoConf) {
+ this((Map) null, topoConf);
+ }
+
+ public NormalizedResourceRequest() {
+ super(null, null);
+ }
+
+ @Override
+ public Map toNormalizedMap() {
+ Map ret = super.toNormalizedMap();
+ ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+ ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+ return ret;
+ }
+
+ @Override
+ protected void initializeMemory(Map normalizedResources) {
+ onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ }
+
+ public double getOnHeapMemoryMb() {
+ return onHeap;
+ }
+
+ public void addOnHeap(final double onHeap) {
+ this.onHeap += onHeap;
+ }
+
+ public double getOffHeapMemoryMb() {
+ return offHeap;
+ }
+
+ public void addOffHeap(final double offHeap) {
+ this.offHeap += offHeap;
+ }
+
+ /**
+ * Add the resources in other to this.
+ * @param other the other Request to add to this.
+ */
+ public void add(NormalizedResourceRequest other) {
+ super.add(other);
+ onHeap += other.onHeap;
+ offHeap += other.offHeap;
+ }
+
+ @Override
+ public void add(WorkerResources value) {
+ super.add(value);
+ //The resources are already normalized
+ Map resources = value.get_resources();
+ onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+ }
+
+ @Override
+ public double getTotalMemoryMb() {
+ return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+ }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
new file mode 100644
index 00000000000..8ed1a5728b6
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import static org.apache.storm.Constants.*;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized.
+ */
+public abstract class NormalizedResources {
+ private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
+ public static final Map RESOURCE_NAME_MAPPING;
+
+ static {
+ Map tmp = new HashMap<>();
+ tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+ RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
+ }
+
+ private static double[] makeArray(Map normalizedResources) {
+ //To avoid locking we will go through the map twice. It should be small so it is probably not a big deal
+ for (String key : normalizedResources.keySet()) {
+ //We are going to skip over CPU and Memory, because they are captured elsewhere
+ if (!COMMON_CPU_RESOURCE_NAME.equals(key)
+ && !COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+ && !COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+ && !COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+ resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+ }
+ }
+ //By default all of the values are 0
+ double [] ret = new double[counter.get()];
+ for (Map.Entry entry : normalizedResources.entrySet()) {
+ Integer index = resourceNames.get(entry.getKey());
+ if (index != null) {
+ //index == null if it is memory or CPU
+ ret[index] = entry.getValue();
+ }
+ }
+ return ret;
+ }
+
+ private static final ConcurrentMap resourceNames = new ConcurrentHashMap<>();
+ private static final AtomicInteger counter = new AtomicInteger(0);
+ private double cpu;
+ private double[] otherResources;
+
+ public NormalizedResources(NormalizedResources other) {
+ cpu = other.cpu;
+ otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+ }
+
+ /**
+ * Create a new normalized set of resources. Note that memory is not
+ * covered here because it is not consistent in requests vs offers because
+ * of how on heap vs off heap is used.
+ * @param resources the resources to be normalized.
+ * @param defaults the default resources that will also be normalized and combined with the real resources.
+ */
+ public NormalizedResources(Map resources, Map defaults) {
+ Map normalizedResources = normalizedResourceMap(defaults);
+ normalizedResources.putAll(normalizedResourceMap(resources));
+ cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ otherResources = makeArray(normalizedResources);
+ initializeMemory(normalizedResources);
+ }
+
+ /**
+ * Initialize any memory usage from the normalized map.
+ * @param normalizedResources the normalized resource map.
+ */
+ protected abstract void initializeMemory(Map normalizedResources);
+
+ /**
+ * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+ * @param resourceMap resource map of either Supervisor or Topology
+ * @return the resource map with common resource names
+ */
+ public static Map normalizedResourceMap(Map resourceMap) {
+ if (resourceMap == null) {
+ return new HashMap<>();
+ }
+ return new HashMap<>(resourceMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ //Map the key if needed
+ (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()),
+ //Map the value
+ (e) -> e.getValue().doubleValue())));
+ }
+
+ /**
+ * Get the total amount of memory.
+ * @return the total amount of memory requested or provided.
+ */
+ public abstract double getTotalMemoryMb();
+
+ /**
+ * Get the total amount of cpu.
+ * @return the amount of cpu.
+ */
+ public double getTotalCpu() {
+ return cpu;
+ }
+
+ private void add(double[] resourceArray) {
+ int otherLength = resourceArray.length;
+ int length = otherResources.length;
+ if (otherLength > length) {
+ double [] newResources = new double[otherLength];
+ System.arraycopy(newResources, 0, otherResources, 0, length);
+ otherResources = newResources;
+ }
+ for (int i = 0; i < otherLength; i++) {
+ otherResources[i] += resourceArray[i];
+ }
+ }
+
+ public void add(NormalizedResources other) {
+ this.cpu += other.cpu;
+ add(other.otherResources);
+ }
+
+ /**
+ * Add the resources from a worker to this.
+ * @param value the worker resources that should be added to this.
+ */
+ public void add(WorkerResources value) {
+ Map normalizedResources = value.get_resources();
+ cpu += normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ add(makeArray(normalizedResources));
+ }
+
+ /**
+ * Remove the resources from other. This is the same as subtracting the resources in other from this.
+ * @param other the resources we want removed.
+ */
+ public void remove(NormalizedResources other) {
+ this.cpu -= other.cpu;
+ assert cpu >= 0.0;
+ int otherLength = other.otherResources.length;
+ int length = otherResources.length;
+ if (otherLength > length) {
+ double [] newResources = new double[otherLength];
+ System.arraycopy(newResources, 0, otherResources, 0, length);
+ otherResources = newResources;
+ }
+ for (int i = 0; i < Math.min(length, otherLength); i++) {
+ otherResources[i] -= other.otherResources[i];
+ assert otherResources[i] >= 0.0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "CPU: " + cpu;
+ }
+
+ /**
+ * Return a Map of the normalized resource name to a double. This should only
+ * be used when returning thrift resource requests to the end user.
+ */
+ public Map toNormalizedMap() {
+ HashMap ret = new HashMap<>();
+ ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ int length = otherResources.length;
+ for (Map.Entry entry: resourceNames.entrySet()) {
+ int index = entry.getValue();
+ if (index < length) {
+ ret.put(entry.getKey(), otherResources[index]);
+ }
+ }
+ return ret;
+ }
+
+ private double getResourceAt(int index) {
+ if (index >= otherResources.length) {
+ return 0.0;
+ }
+ return otherResources[index];
+ }
+
+ /**
+ * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory.
+ * It does not check memory because with shared memory it is beyond the scope of this.
+ * @param other the resources that we want to check if they would fit in this.
+ * @return true if it might fit, else false if it could not possibly fit.
+ */
+ public boolean couldHoldIgnoringMemory(NormalizedResources other) {
+ if (this.cpu < other.getTotalCpu()) {
+ return false;
+ }
+ int length = Math.max(this.otherResources.length, other.otherResources.length);
+ for (int i = 0; i < length; i++) {
+ if (getResourceAt(i) < other.getResourceAt(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Calculate the average resource usage percentage with this being the total resources and
+ * used being the amounts used.
+ * @param used the amount of resources used.
+ * @return the average percentage used 0.0 to 100.0.
+ */
+ public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+ double total = 0.0;
+ double totalMemory = getTotalMemoryMb();
+ if (totalMemory != 0.0) {
+ total += used.getTotalMemoryMb() / totalMemory;
+ }
+ double totalCpu = getTotalCpu();
+ if (totalCpu != 0.0) {
+ total += used.getTotalCpu() / getTotalCpu();
+ }
+ //If total is 0 we add in a 0% used, so we can just skip over anything that is not in both.
+ int length = Math.min(used.otherResources.length, otherResources.length);
+ for (int i = 0; i < length; i++) {
+ if (otherResources[i] != 0.0) {
+ total += used.otherResources[i] / otherResources[i];
+ }
+ }
+ //To get the count we divide by we need to take the maximum length because we are doing an average.
+ return (total * 100.0) / (2 + Math.max(otherResources.length, used.otherResources.length));
+ }
+
+ /**
+ * Calculate the minimum resource usage percentage with this being the total resources and
+ * used being the amounts used.
+ * @param used the amount of resources used.
+ * @return the minimum percentage used 0.0 to 100.0.
+ */
+ public double calculateMinPercentageUsedBy(NormalizedResources used) {
+ double totalMemory = getTotalMemoryMb();
+ double totalCpu = getTotalCpu();
+ if (used.otherResources.length != otherResources.length
+ || totalMemory == 0.0
+ || totalCpu == 0.0) {
+ //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0
+ // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
+ return 0.0;
+ }
+ double min = used.getTotalMemoryMb() / totalMemory;
+ min = Math.min(min, used.getTotalCpu() / getTotalCpu());
+
+ for (int i = 0; i < otherResources.length; i++) {
+ if (otherResources[i] != 0.0) {
+ min = Math.min(min, used.otherResources[i] / otherResources[i]);
+ } else {
+ return 0.0; //0 will be the minimum, because we count values not in here as 0
+ }
+ }
+ return min * 100.0;
+ }
+}
\ No newline at end of file
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 633fb5c1645..53500057681 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -26,7 +26,6 @@
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
@@ -55,7 +54,6 @@ public class RAS_Node {
private SupervisorDetails sup;
private final Cluster cluster;
private final Set originallyFreeSlots;
- private final Map totalResources;
public RAS_Node(
String nodeId,
@@ -89,12 +87,6 @@ public RAS_Node(
this.sup = sup;
}
- if (isAlive) {
- totalResources = getTotalResources();
- } else {
- totalResources = new HashMap<>();
- }
-
HashSet freeById = new HashSet<>(slots.keySet());
if (assignmentMap != null) {
for (Map> assignment : assignmentMap.values()) {
@@ -367,7 +359,7 @@ public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td)
ws,
exec,
td,
- this.getTotalAvailableResources(),
+ getTotalAvailableResources(),
td.getTopologyWorkerMaxHeapSize()
);
}
@@ -429,52 +421,33 @@ public static int countTotalSlotsAlive(Collection nodes) {
*
* @return the available memory for this node
*/
- public Double getAvailableMemoryResources() {
- Map allAvailableResources = getTotalAvailableResources();
- return allAvailableResources.getOrDefault(
- Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+ public double getAvailableMemoryResources() {
+ return getTotalAvailableResources().getTotalMemoryMb();
}
/**
* Gets total resources for this node.
- *
- * @return Map of all resources
*/
- public Map getTotalResources() {
+ public NormalizedResourceOffer getTotalResources() {
if (sup != null) {
return sup.getTotalResources();
} else {
- return new HashMap<>();
+ return new NormalizedResourceOffer();
}
}
/**
* Gets all available resources for this node.
*
- * @return Map of all resources
+ * @return All of the available resources.
*/
- public Map getTotalAvailableResources() {
+ public NormalizedResourceOffer getTotalAvailableResources() {
if (sup != null) {
- Map totalResources = sup.getTotalResources();
- Map scheduledResources = cluster.getAllScheduledResourcesForNode(sup.getId());
- Map availableResources = new HashMap<>();
- for (Entry resource : totalResources.entrySet()) {
- if(resource.getKey() == Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME) {
- availableResources.put(resource.getKey().toString(),
- ObjectReader.getDouble(resource.getValue())
- - (scheduledResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0)
- + scheduledResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0))
- );
- continue;
- }
- availableResources.put(resource.getKey().toString(),
- ObjectReader.getDouble(resource.getValue())
- - scheduledResources.getOrDefault(resource.getKey(), 0.0));
-
- }
+ NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources());
+ availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()));
return availableResources;
} else {
- return new HashMap<>();
+ return new NormalizedResourceOffer();
}
}
@@ -483,7 +456,7 @@ public Map getTotalAvailableResources() {
*
* @return the total memory for this node
*/
- public Double getTotalMemoryResources() {
+ public double getTotalMemoryResources() {
if (sup != null) {
return sup.getTotalMemory();
} else {
@@ -497,7 +470,7 @@ public Double getTotalMemoryResources() {
* @return the available cpu for this node
*/
public double getAvailableCpuResources() {
- return getTotalAvailableResources().getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+ return getTotalAvailableResources().getTotalCpu();
}
/**
@@ -505,9 +478,9 @@ public double getAvailableCpuResources() {
*
* @return the total cpu for this node
*/
- public Double getTotalCpuResources() {
+ public double getTotalCpuResources() {
if (sup != null) {
- return sup.getTotalCPU();
+ return sup.getTotalCpu();
} else {
return 0.0;
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 9b0ee1532f7..83ee4cb6d6c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -18,37 +18,38 @@
package org.apache.storm.scheduler.resource;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.ObjectReader;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.Constants.resourceNameMapping;
-
public class ResourceUtils {
private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
- public static Map> getBoltsResources(StormTopology topology,
+ public static NormalizedResourceRequest getBoltResources(StormTopology topology, Map topologyConf,
+ String componentId) {
+ if (topology.get_bolts() != null) {
+ Bolt bolt = topology.get_bolts().get(componentId);
+ return new NormalizedResourceRequest(bolt.get_common(), topologyConf);
+ }
+ return null;
+ }
+
+ public static Map getBoltsResources(StormTopology topology,
Map topologyConf) {
- Map> boltResources = new HashMap<>();
+ Map boltResources = new HashMap<>();
if (topology.get_bolts() != null) {
for (Map.Entry bolt : topology.get_bolts().entrySet()) {
- Map topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
- checkInitialization(topologyResources, bolt.getValue().toString(), topologyConf);
+ NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(bolt.getValue().get_common(), topologyConf);
if (LOG.isTraceEnabled()) {
LOG.trace("Turned {} into {}", bolt.getValue().get_common().get_json_conf(), topologyResources);
}
@@ -58,13 +59,21 @@ public static Map> getBoltsResources(StormTopology t
return boltResources;
}
- public static Map> getSpoutsResources(StormTopology topology,
+ public static NormalizedResourceRequest getSpoutResources(StormTopology topology, Map topologyConf,
+ String componentId) {
+ if (topology.get_spouts() != null) {
+ SpoutSpec spout = topology.get_spouts().get(componentId);
+ return new NormalizedResourceRequest(spout.get_common(), topologyConf);
+ }
+ return null;
+ }
+
+ public static Map getSpoutsResources(StormTopology topology,
Map topologyConf) {
- Map> spoutResources = new HashMap<>();
+ Map spoutResources = new HashMap<>();
if (topology.get_spouts() != null) {
for (Map.Entry spout : topology.get_spouts().entrySet()) {
- Map topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
- checkInitialization(topologyResources, spout.getValue().toString(), topologyConf);
+ NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(spout.getValue().get_common(), topologyConf);
if (LOG.isTraceEnabled()) {
LOG.trace("Turned {} into {}", spout.getValue().get_common().get_json_conf(), topologyResources);
}
@@ -139,187 +148,4 @@ public static String getJsonWithUpdatedResources(String jsonConf, Map topologyResources,
- String componentId, Map topologyConf) {
- StringBuilder msgBuilder = new StringBuilder();
-
- Set resourceNameSet = new HashSet<>();
-
- resourceNameSet.add(
- Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT
- );
- resourceNameSet.add(
- Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB
- );
- resourceNameSet.add(
- Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB
- );
-
- Map topologyComponentResourcesMap =
- (Map) topologyConf.getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>());
-
- resourceNameSet.addAll(topologyResources.keySet());
- resourceNameSet.addAll(topologyComponentResourcesMap.keySet());
-
- for (String resourceName : resourceNameSet) {
- msgBuilder.append(checkInitResource(topologyResources, topologyConf, topologyComponentResourcesMap, resourceName));
- }
-
- Map normalizedTopologyResources = normalizedResourceMap(topologyResources);
- topologyResources.clear();
- topologyResources.putAll(normalizedTopologyResources);
-
- if (msgBuilder.length() > 0) {
- String resourceDefaults = msgBuilder.toString();
- LOG.debug(
- "Unable to extract resource requirement for Component {} \n Resources : {}",
- componentId, resourceDefaults);
- }
- }
-
- private static String checkInitResource(Map topologyResources, Map topologyConf,
- Map topologyComponentResourcesMap, String resourceName) {
- StringBuilder msgBuilder = new StringBuilder();
- String normalizedResourceName = resourceNameMapping.getOrDefault(resourceName, resourceName);
- if (!topologyResources.containsKey(normalizedResourceName)) {
- if (topologyConf.containsKey(resourceName)) {
- Double resourceValue = ObjectReader.getDouble(topologyConf.get(resourceName));
- if (resourceValue != null) {
- topologyResources.put(normalizedResourceName, resourceValue);
- }
- }
-
- if (topologyComponentResourcesMap.containsKey(normalizedResourceName)) {
- Double resourceValue = ObjectReader.getDouble(topologyComponentResourcesMap.get(resourceName));
- if (resourceValue != null) {
- topologyResources.put(normalizedResourceName, resourceValue);
- }
- }
- }
-
- return msgBuilder.toString();
- }
-
- public static Map parseResources(String input) {
- Map topologyResources = new HashMap<>();
- JSONParser parser = new JSONParser();
- LOG.debug("Input to parseResources {}", input);
- try {
- if (input != null) {
- Object obj = parser.parse(input);
- JSONObject jsonObject = (JSONObject) obj;
-
- // Legacy resource parsing
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
- Double topoMemOnHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
- Double topoMemOffHeap = ObjectReader
- .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
- }
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
- Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
- null);
- topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
- }
-
- // If resource is also present in resources map will overwrite the above
- if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
- Map rawResourcesMap =
- (Map) jsonObject.computeIfAbsent(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
-
- for (Map.Entry stringNumberEntry : rawResourcesMap.entrySet()) {
- topologyResources.put(
- stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
- }
-
-
- }
- }
- } catch (ParseException e) {
- LOG.error("Failed to parse component resources is:" + e.toString(), e);
- return null;
- }
- return normalizedResourceMap(topologyResources);
- }
-
- /**
- * Calculate the sum of a collection of doubles.
- * @param list collection of doubles
- * @return the sum of of collection of doubles
- */
- public static double sum(Collection list) {
- double sum = 0.0;
- for (Double elem : list) {
- sum += elem;
- }
- return sum;
- }
-
- /**
- * Calculate the average of a collection of doubles.
- * @param list a collection of doubles
- * @return the average of collection of doubles
- */
- public static double avg(Collection list) {
- return sum(list) / list.size();
- }
-
- /**
- * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
- * @param resourceMap resource map of either Supervisor or Topology
- * @return the resource map with common resource names
- */
- public static Map normalizedResourceMap(Map resourceMap) {
- Map result = new HashMap();
-
- result.putAll(resourceMap);
- for (Map.Entry entry: resourceMap.entrySet()) {
- if (resourceNameMapping.containsKey(entry.getKey())) {
- result.put(resourceNameMapping.get(entry.getKey()), ObjectReader.getDouble(entry.getValue(), 0.0));
- result.remove(entry.getKey());
- }
- }
- return result;
- }
-
- public static Map addResources(Map resourceMap1, Map resourceMap2) {
- Map result = new HashMap();
-
- result.putAll(resourceMap1);
-
- for (Map.Entry entry: resourceMap2.entrySet()) {
- if (result.containsKey(entry.getKey())) {
- result.put(entry.getKey(), ObjectReader.getDouble(entry.getValue(),
- 0.0) + ObjectReader.getDouble(resourceMap1.get(entry.getKey()), 0.0));
- } else {
- result.put(entry.getKey(), entry.getValue());
- }
- }
- return result;
-
- }
-
- public static Double getMinValuePresentInResourceMap(Map resourceMap) {
- return Collections.min(resourceMap.values());
- }
-
- public static Map getPercentageOfTotalResourceMap(Map resourceMap, Map totalResourceMap) {
- Map result = new HashMap();
-
- for(Map.Entry entry: totalResourceMap.entrySet()) {
- if (resourceMap.containsKey(entry.getKey())) {
- result.put(entry.getKey(), (ObjectReader.getDouble(resourceMap.get(entry.getKey()))/ entry.getValue()) * 100.0) ;
- } else {
- result.put(entry.getKey(), 0.0);
- }
- }
- return result;
-
- }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 07c908e9585..67bc867438d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -39,6 +39,7 @@
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.ResourceUtils;
@@ -129,9 +130,9 @@ protected interface ExistingScheduleFunc {
* a class to contain individual object resources as well as cumulative stats.
*/
static class AllResources {
- List objectResources = new LinkedList();
- Map availableResourcesOverall = new HashMap<>();
- Map totalResourcesOverall = new HashMap<>();
+ List objectResources = new LinkedList<>();
+ NormalizedResourceOffer availableResourcesOverall = new NormalizedResourceOffer();
+ NormalizedResourceOffer totalResourcesOverall = new NormalizedResourceOffer();
String identifier;
public AllResources(String identifier) {
@@ -139,19 +140,19 @@ public AllResources(String identifier) {
}
public AllResources(AllResources other) {
- this( null,
- new HashMap<>(other.availableResourcesOverall),
- new HashMap(other.totalResourcesOverall),
- other.identifier);
+ this (null,
+ new NormalizedResourceOffer(other.availableResourcesOverall),
+ new NormalizedResourceOffer(other.totalResourcesOverall),
+ other.identifier);
List objectResourcesList = new ArrayList<>();
for (ObjectResources objectResource : other.objectResources) {
objectResourcesList.add(new ObjectResources(objectResource));
}
this.objectResources = objectResourcesList;
-
}
- public AllResources(List objectResources, Map availableResourcesOverall, Map totalResourcesOverall, String identifier) {
+ public AllResources(List objectResources, NormalizedResourceOffer availableResourcesOverall,
+ NormalizedResourceOffer totalResourcesOverall, String identifier) {
this.objectResources = objectResources;
this.availableResourcesOverall = availableResourcesOverall;
this.totalResourcesOverall = totalResourcesOverall;
@@ -163,10 +164,10 @@ public AllResources(List objectResources, Map a
* class to keep track of resources on a rack or node.
*/
static class ObjectResources {
- String id;
- Map availableResources = new HashMap<>();
- Map totalResources = new HashMap<>();
- double effectiveResources = 0.0;
+ public final String id;
+ public NormalizedResourceOffer availableResources = new NormalizedResourceOffer();
+ public NormalizedResourceOffer totalResources = new NormalizedResourceOffer();
+ public double effectiveResources = 0.0;
public ObjectResources(String id) {
this.id = id;
@@ -176,7 +177,8 @@ public ObjectResources(ObjectResources other) {
this(other.id, other.availableResources, other.totalResources, other.effectiveResources);
}
- public ObjectResources(String id, Map availableResources, Map totalResources, double effectiveResources) {
+ public ObjectResources(String id, NormalizedResourceOffer availableResources, NormalizedResourceOffer totalResources,
+ double effectiveResources) {
this.id = id;
this.availableResources = availableResources;
this.totalResources = totalResources;
@@ -220,10 +222,8 @@ protected TreeSet sortNodes(
node.totalResources = rasNode.getTotalResources();
nodes.add(node);
- allResources.availableResourcesOverall = ResourceUtils.addResources(
- allResources.availableResourcesOverall, node.availableResources);
- allResources.totalResourcesOverall = ResourceUtils.addResources(
- allResources.totalResourcesOverall, node.totalResources);
+ allResources.availableResourcesOverall.add(node.availableResources);
+ allResources.totalResourcesOverall.add(node.totalResources);
}
@@ -234,7 +234,7 @@ protected TreeSet sortNodes(
allResources.totalResourcesOverall);
String topoId = topologyDetails.getId();
- return this.sortObjectResources(
+ return sortObjectResources(
allResources,
exec,
topologyDetails,
@@ -243,7 +243,7 @@ protected TreeSet sortNodes(
public int getNumExistingSchedule(String objectId) {
//Get execs already assigned in rack
- Collection execs = new LinkedList();
+ Collection execs = new LinkedList<>();
if (cluster.getAssignmentById(topoId) != null) {
for (Map.Entry entry :
cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
@@ -331,13 +331,13 @@ TreeSet sortRacks(ExecutorDetails exec, TopologyDetails topolog
racks.add(rack);
for (String nodeId : nodeIds) {
RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
- rack.availableResources = ResourceUtils.addResources(rack.availableResources, node.getTotalAvailableResources());
- rack.totalResources = ResourceUtils.addResources(rack.totalResources, node.getTotalResources());
+ rack.availableResources.add(node.getTotalAvailableResources());
+ rack.totalResources.add(node.getTotalAvailableResources());
nodeIdToRackId.put(nodeId, rack.id);
- allResources.totalResourcesOverall = ResourceUtils.addResources(allResources.totalResourcesOverall, rack.totalResources);
- allResources.availableResourcesOverall = ResourceUtils.addResources(allResources.availableResourcesOverall, rack.availableResources);
+ allResources.totalResourcesOverall.add(rack.totalResources);
+ allResources.availableResourcesOverall.add(rack.availableResources);
}
}
@@ -347,30 +347,27 @@ TreeSet sortRacks(ExecutorDetails exec, TopologyDetails topolog
allResources.totalResourcesOverall);
String topoId = topologyDetails.getId();
- return this.sortObjectResources(
+ return sortObjectResources(
allResources,
exec,
topologyDetails,
- new ExistingScheduleFunc() {
- @Override
- public int getNumExistingSchedule(String objectId) {
- String rackId = objectId;
- //Get execs already assigned in rack
- Collection execs = new LinkedList();
- if (cluster.getAssignmentById(topoId) != null) {
- for (Map.Entry entry :
- cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
- String nodeId = entry.getValue().getNodeId();
- String hostname = idToNode(nodeId).getHostname();
- ExecutorDetails exec = entry.getKey();
- if (nodeIdToRackId.get(hostname) != null
- && nodeIdToRackId.get(hostname).equals(rackId)) {
- execs.add(exec);
- }
+ (objectId) -> {
+ String rackId = objectId;
+ //Get execs already assigned in rack
+ Collection execs = new LinkedList<>();
+ if (cluster.getAssignmentById(topoId) != null) {
+ for (Map.Entry entry :
+ cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+ String nodeId = entry.getValue().getNodeId();
+ String hostname = idToNode(nodeId).getHostname();
+ ExecutorDetails exec1 = entry.getKey();
+ if (nodeIdToRackId.get(hostname) != null
+ && nodeIdToRackId.get(hostname).equals(rackId)) {
+ execs.add(exec1);
}
}
- return execs.size();
}
+ return execs.size();
});
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index adf05f54287..fc746b0847f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -68,7 +68,7 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
Collection executorsNotScheduled = new HashSet<>(unassignedExecutors);
List favoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
List unFavoredNodes = (List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
- final List sortedNodes = this.sortAllNodes(td, null, favoredNodes, unFavoredNodes);
+ final List sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes);
for (ExecutorDetails exec : orderedExecutors) {
LOG.debug(
@@ -127,32 +127,8 @@ protected TreeSet sortObjectResources(
final ExistingScheduleFunc existingScheduleFunc) {
for (ObjectResources objectResources : allResources.objectResources) {
- StringBuilder sb = new StringBuilder();
- if (ResourceUtils.getMinValuePresentInResourceMap(objectResources.availableResources) <= 0) {
- objectResources.effectiveResources = 0.0;
- } else {
- List values = new LinkedList<>();
-
- Map percentageTotal = ResourceUtils.getPercentageOfTotalResourceMap(
- objectResources.availableResources, allResources.availableResourcesOverall
- );
- for(Map.Entry percentageEntry : percentageTotal.entrySet()) {
- values.add(percentageEntry.getValue());
- sb.append(String.format("%s %f(%f%%) ", percentageEntry.getKey(),
- objectResources.availableResources.get(percentageEntry.getKey()),
- percentageEntry.getValue())
- );
-
- }
-
- objectResources.effectiveResources = Collections.min(values);
- }
- LOG.debug(
- "{}: Avail [ {} ] Total [ {} ] effective resources: {}",
- objectResources.id,
- sb.toString(),
- objectResources.totalResources,
- objectResources.effectiveResources);
+ objectResources.effectiveResources =
+ allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
}
TreeSet sortedObjectResources =
@@ -169,13 +145,8 @@ protected TreeSet sortObjectResources(
} else if (o1.effectiveResources < o2.effectiveResources) {
return 1;
} else {
- Collection o1Values = ResourceUtils.getPercentageOfTotalResourceMap(
- o1.availableResources, allResources.availableResourcesOverall).values();
- Collection o2Values = ResourceUtils.getPercentageOfTotalResourceMap(
- o2.availableResources, allResources.availableResourcesOverall).values();
-
- double o1Avg = ResourceUtils.avg(o1Values);
- double o2Avg = ResourceUtils.avg(o2Values);
+ double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
if (o1Avg > o2Avg) {
return -1;
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index a4de7c29d53..893e2898e81 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -18,8 +18,6 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
-import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -27,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
-
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
@@ -133,31 +130,7 @@ protected TreeSet sortObjectResources(
final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
final ExistingScheduleFunc existingScheduleFunc) {
- Map requestedResources = topologyDetails.getTotalResources(exec);
AllResources affinityBasedAllResources = new AllResources(allResources);
- for (ObjectResources objectResources : affinityBasedAllResources.objectResources) {
- StringBuilder sb = new StringBuilder();
- List values = new LinkedList<>();
-
- for (Map.Entry availableResourcesEntry : objectResources.availableResources.entrySet()) {
- if (!requestedResources.containsKey(availableResourcesEntry.getKey())) {
- objectResources.availableResources.put(availableResourcesEntry.getKey(), -1.0 * availableResourcesEntry.getValue());
- }
- }
-
- Map percentageTotal = ResourceUtils.getPercentageOfTotalResourceMap(
- objectResources.availableResources, allResources.availableResourcesOverall
- );
- for(Map.Entry percentageEntry : percentageTotal.entrySet()) {
- values.add(percentageEntry.getValue());
- sb.append(String.format("%s %f(%f%%) ", percentageEntry.getKey(),
- objectResources.availableResources.get(percentageEntry.getKey()),
- percentageEntry.getValue())
- );
-
- }
-
- }
TreeSet sortedObjectResources =
new TreeSet<>((o1, o2) -> {
@@ -168,13 +141,8 @@ protected TreeSet sortObjectResources(
} else if (execsScheduled1 < execsScheduled2) {
return 1;
} else {
- Collection o1Values = ResourceUtils.getPercentageOfTotalResourceMap(
- o1.availableResources, allResources.availableResourcesOverall).values();
- Collection o2Values = ResourceUtils.getPercentageOfTotalResourceMap(
- o2.availableResources, allResources.availableResourcesOverall).values();
-
- double o1Avg = ResourceUtils.avg(o1Values);
- double o2Avg = ResourceUtils.avg(o2Values);
+ double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+ double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
if (o1Avg > o2Avg) {
return -1;
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index bd4c4febae1..a64c9b4ed3f 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -73,6 +73,7 @@
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.apache.thrift.TException;
@@ -715,30 +716,33 @@ public static boolean isRAS(Map conf) {
return false;
}
- public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology) throws InvalidTopologyException {
+ public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology)
+ throws InvalidTopologyException {
return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
}
- public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map topoConf, StormTopology topology) throws InvalidTopologyException {
+ public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map topoConf, StormTopology topology)
+ throws InvalidTopologyException {
Map componentParallelism = getComponentParallelism(topoConf, topology);
double totalMemoryRequired = 0.0;
- for(Map.Entry> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
+ for (Map.Entry entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1);
- double memoryRequirement = entry.getValue().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+ double memoryRequirement = entry.getValue().getOnHeapMemoryMb();
totalMemoryRequired += memoryRequirement * parallelism;
}
- for(Map.Entry