Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3850] Remove PMD exceptions #3467

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
Expand All @@ -35,7 +36,6 @@
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GlobalStreamId;
Expand Down Expand Up @@ -75,7 +75,7 @@ private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
if (data != null) {
List<Double> subvalues = data.values().stream()
.map((subMap) -> subMap.get(id))
.filter((value) -> value != null)
.filter(Objects::nonNull)
.collect(Collectors.toList());
ret.addAll(subvalues);
}
Expand Down Expand Up @@ -104,7 +104,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
LOG.info("with config {}: {}", key, o);
}
}
//Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS
//Let's use the number of actually scheduled workers as a way to bridge RAS and non-RAS
int numWorkers = tpinfo.get_num_workers();
if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) {
numWorkers = Math.max(numWorkers, ((Number) savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue());
Expand Down Expand Up @@ -154,20 +154,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
if (bd != null) {
Map<String, Double> resources = entry.getValue();
Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if (cpu != null) {
bd.withCpuLoad(cpu);
}
Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
if (mem != null) {
bd.withMemoryLoad(mem);
}
}
}
addCpuMemToBuilders(boltBuilders, boltResources);
}

//Spouts
Expand Down Expand Up @@ -195,31 +182,14 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
if (sd != null) {
Map<String, Double> resources = entry.getValue();
Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if (cpu != null) {
sd.withCpuLoad(cpu);
}
Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
if (mem != null) {
sd.withMemoryLoad(mem);
}
}
}
addCpuMemToBuilders(spoutBuilders, spoutResources);
}

//Stats...
Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
for (ExecutorSummary executor: info.get_executors()) {
String component = executor.get_component_id();
List<ExecutorSummary> list = byComponent.get(component);
if (list == null) {
list = new ArrayList<>();
byComponent.put(component, list);
}
List<ExecutorSummary> list = byComponent.computeIfAbsent(component, k -> new ArrayList<>());
list.add(executor);
}

Expand All @@ -238,7 +208,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

//There is a bug in some versions that returns 0 for the uptime.
// To work around it we should get it an alternative (working) way.
// To work around it, we should get it an alternative (working) way.
Map<String, Integer> workerToUptime = new HashMap<>();
for (WorkerSummary ws : tpinfo.get_workers()) {
workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs());
Expand All @@ -265,7 +235,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
String timeWindow = statEntry.getKey();
long timeSecs = uptime;
try {
timeSecs = Long.valueOf(timeWindow);
timeSecs = Long.parseLong(timeWindow);
} catch (NumberFormatException e) {
//Ignored...
}
Expand All @@ -291,22 +261,39 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

List<LoadCompConf> spouts = spoutBuilders.values().stream()
.map((b) -> b.build())
.map(LoadCompConf.Builder::build)
.collect(Collectors.toList());

List<LoadCompConf> bolts = boltBuilders.values().stream()
.map((b) -> b.build())
.map(LoadCompConf.Builder::build)
.collect(Collectors.toList());

return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
}

private static void addCpuMemToBuilders(Map<String, LoadCompConf.Builder> boltBuilders,
Map<String, Map<String, Double>> boltResources) {
for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
if (bd != null) {
Map<String, Double> resources = entry.getValue();
Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if (cpu != null) {
bd.withCpuLoad(cpu);
}
Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
if (mem != null) {
bd.withMemoryLoad(mem);
}
}
}
}

/**
* Main entry point for CaptureLoad command.
* @param args the arguments to the command
* @throws Exception on any error
*/
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
Options options = new Options();
options.addOption(Option.builder("a")
.longOpt("anonymize")
Expand Down Expand Up @@ -368,8 +355,8 @@ public static void main(String[] args) throws Exception {
}
}

//ResourceUtils.java is not a available on the classpath to let us parse out the resources we want.
// So we have copied and pasted some of the needed methods here. (with a few changes to logging)
//ResourceUtils.java is not an available on the classpath to let us parse out the resources we want.
// So we have copied and pasted some needed methods here. (with a few changes to logging)
static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
Map<String, Map<String, Double>> boltResources = new HashMap<>();
if (topology.get_bolts() != null) {
Expand Down Expand Up @@ -420,7 +407,7 @@ static Map<String, Double> parseResources(String input) {
LOG.debug("Topology Resources {}", topologyResources);
}
} catch (org.json.simple.parser.ParseException e) {
LOG.error("Failed to parse component resources is:" + e.toString(), e);
LOG.error("Failed to parse component resources is:" + e, e);
return null;
}
return topologyResources;
Expand All @@ -429,10 +416,9 @@ static Map<String, Double> parseResources(String input) {
/**
* Checks if the topology's resource requirements are initialized.
* Will modify topologyResources by adding the appropriate defaults
* @param topologyResources map of resouces requirements
* @param topologyResources map of resources requirements
* @param componentId component for which initialization is being conducted
* @param topologyConf topology configuration
* @throws Exception on any error
*/
public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map<String, Object> topologyConf) {
StringBuilder msgBuilder = new StringBuilder();
Expand Down