Skip to content

Commit

Permalink
STORM-2813: Use a class for normalized resources not a map.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert (Bobby) Evans committed Nov 20, 2017
1 parent 9d52435 commit 6a4aeb9
Show file tree
Hide file tree
Showing 24 changed files with 864 additions and 615 deletions.
2 changes: 1 addition & 1 deletion storm-client/pom.xml
Expand Up @@ -261,7 +261,7 @@
<!--Note - the version would be inherited-->
<configuration>
<excludes>**/generated/**</excludes>
<maxAllowedViolations>10298</maxAllowedViolations>
<maxAllowedViolations>10263</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
12 changes: 0 additions & 12 deletions storm-client/src/jvm/org/apache/storm/Constants.java
Expand Up @@ -64,17 +64,5 @@ public class Constants {
public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";

public static final Map<String, String> resourceNameMapping;

static {
Map<String, String> tmp = new HashMap<>();
tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
resourceNameMapping = Collections.unmodifiableMap(tmp);
}
}

2 changes: 1 addition & 1 deletion storm-server/pom.xml
Expand Up @@ -130,7 +130,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>2655</maxAllowedViolations>
<maxAllowedViolations>2630</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Expand Up @@ -52,6 +52,7 @@
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.storm.Config;
import org.apache.storm.Constants;
Expand Down Expand Up @@ -148,6 +149,7 @@
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.INimbusCredentialPlugin;
Expand Down Expand Up @@ -967,13 +969,12 @@ private static Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> extr
return ret;
}

private static Map<String, Double> setResourcesDefaultIfNotSet(Map<String, Map<String, Double>> compResourcesMap, String compId, Map<String, Object> topoConf) {
Map<String, Double> resourcesMap = compResourcesMap.get(compId);
if (resourcesMap == null) {
resourcesMap = new HashMap<>();
private static void setResourcesDefaultIfNotSet(Map<String, NormalizedResourceRequest> compResourcesMap, String compId,
Map<String, Object> topoConf) {
NormalizedResourceRequest resources = compResourcesMap.get(compId);
if (resources == null) {
compResourcesMap.put(compId, new NormalizedResourceRequest(topoConf));
}
ResourceUtils.checkInitialization(resourcesMap, compId, topoConf);
return resourcesMap;
}

private static void validatePortAvailable(Map<String, Object> conf) throws IOException {
Expand Down Expand Up @@ -2563,20 +2564,16 @@ static void validateTopologyWorkerMaxHeapSizeConfigs(
private static double getMaxExecutorMemoryUsageForTopo(
StormTopology topology, Map<String, Object> topologyConf) {
double largestMemoryOperator = 0.0;
for (Map<String, Double> entry :
for (NormalizedResourceRequest entry :
ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
double memoryRequirement =
entry.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
+ entry.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
double memoryRequirement = entry.getTotalMemoryMb();
if (memoryRequirement > largestMemoryOperator) {
largestMemoryOperator = memoryRequirement;
}
}
for (Map<String, Double> entry :
for (NormalizedResourceRequest entry :
ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
double memoryRequirement =
entry.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
+ entry.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
double memoryRequirement = entry.getTotalMemoryMb();
if (memoryRequirement > largestMemoryOperator) {
largestMemoryOperator = memoryRequirement;
}
Expand Down Expand Up @@ -3649,16 +3646,18 @@ public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolea
topoPageInfo.set_storm_version(topology.get_storm_version());
}

Map<String, Map<String, Double>> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
for (Entry<String, ComponentAggregateStats> entry: topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
CommonAggregateStats commonStats = entry.getValue().get_common_stats();
commonStats.set_resources_map(setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf));
setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
}

Map<String, Map<String, Double>> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
for (Entry<String, ComponentAggregateStats> entry: topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
CommonAggregateStats commonStats = entry.getValue().get_common_stats();
commonStats.set_resources_map(setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf));
setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
}

if (workerSummaries != null) {
Expand Down Expand Up @@ -3816,11 +3815,17 @@ public ComponentPageInfo getComponentPageInfo(String topoId, String componentId,
ComponentPageInfo compPageInfo = StatsUtil.aggCompExecsStats(exec2HostPort, info.taskToComponent, info.beats, window,
includeSys, topoId, topology, componentId);
if (compPageInfo.get_component_type() == ComponentType.SPOUT) {
compPageInfo.set_resources_map(setResourcesDefaultIfNotSet(
ResourceUtils.getSpoutsResources(topology, topoConf), componentId, topoConf));
NormalizedResourceRequest spoutResources = ResourceUtils.getSpoutResources(topology, topoConf, componentId);
if (spoutResources == null) {
spoutResources = new NormalizedResourceRequest(topoConf);
}
compPageInfo.set_resources_map(spoutResources.toNormalizedMap());
} else { //bolt
compPageInfo.set_resources_map(setResourcesDefaultIfNotSet(
ResourceUtils.getBoltsResources(topology, topoConf), componentId, topoConf));
NormalizedResourceRequest boltResources = ResourceUtils.getBoltResources(topology, topoConf, componentId);
if (boltResources == null) {
boltResources = new NormalizedResourceRequest(topoConf);
}
compPageInfo.set_resources_map(boltResources.toNormalizedMap());
}
compPageInfo.set_topology_name(info.topoName);
compPageInfo.set_errors(stormClusterState.errors(topoId, componentId));
Expand Down
Expand Up @@ -26,13 +26,12 @@
import org.apache.storm.utils.Time;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.storm.scheduler.resource.ResourceUtils.normalizedResourceMap;
import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;

public class SupervisorHeartbeat implements Runnable {

Expand Down

0 comments on commit 6a4aeb9

Please sign in to comment.