Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3448: core: fix all checkstyle warnings #3063

Merged
merged 1 commit into from Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion storm-core/pom.xml
Expand Up @@ -303,7 +303,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>73</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
19 changes: 10 additions & 9 deletions storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
Expand Up @@ -72,7 +72,8 @@ private static class RemoveCorruptTopologies implements AdminCommand {
@Override
public void run(String[] args, Map<String, Object> conf, String command) throws Exception {
try (BlobStore nimbusBlobStore = ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null)) {
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf));
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf,
new ClusterStateContext(DaemonType.NIMBUS, conf));

Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key));
Set<String> activeTopologyIds = new HashSet<>(stormClusterState.activeStorms());
Expand Down Expand Up @@ -126,14 +127,6 @@ public static String prettyPrint(TBase value) {
return builder.toString();
}

private static void println(StringBuilder out, int depth, Object value) {
for (int i = 0; i < depth; i++) {
out.append("\t");
}
out.append(value);
out.append("\n");
}

private static void prettyPrint(TBase value, int depth, StringBuilder out) {
if (value == null) {
println(out, depth,"null");
Expand All @@ -144,6 +137,14 @@ private static void prettyPrint(TBase value, int depth, StringBuilder out) {
println(out, depth, "}");
}

private static void println(StringBuilder out, int depth, Object value) {
for (int i = 0; i < depth; i++) {
out.append("\t");
}
out.append(value);
out.append("\n");
}

private static void prettyPrintFields(TBase value, int depth, StringBuilder out) {
for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : FieldMetaData.getStructMetaDataMap(value.getClass()).entrySet()) {
TFieldIdEnum key = entry.getKey();
Expand Down
2 changes: 2 additions & 0 deletions storm-core/src/jvm/org/apache/storm/command/CLI.java
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class CLI {
/**
* Parse function to return an Integer.
Expand Down Expand Up @@ -250,6 +251,7 @@ public Object process(Object current, String value) {
}
}

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public static class CLIBuilder {
private final ArrayList<Opt> opts = new ArrayList<>();
private final ArrayList<Arg> args = new ArrayList<>();
Expand Down
Expand Up @@ -31,7 +31,14 @@ public void run(Nimbus.Iface nimbus) throws Exception {
if (topologies == null || topologies.isEmpty()) {
System.out.println("No topologies running.");
} else {
System.out.printf(MSG_FORMAT, "Topology_name", "Status", "Num_tasks", "Num_workers", "Uptime_secs", "Topology_Id", "Owner");
System.out.printf(MSG_FORMAT,
"Topology_name",
"Status",
"Num_tasks",
"Num_workers",
"Uptime_secs",
"Topology_Id",
"Owner");
System.out.println("----------------------------------------------------------------------------------------");
for (TopologySummary topology : topologies) {
System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(),
Expand Down
19 changes: 9 additions & 10 deletions storm-core/src/jvm/org/apache/storm/command/Monitor.java
Expand Up @@ -27,25 +27,24 @@ public static void main(String[] args) throws Exception {
.parse(args);
final org.apache.storm.utils.Monitor monitor = new org.apache.storm.utils.Monitor();
Integer interval = (Integer) cl.get("i");
String component = (String) cl.get("m");
String stream = (String) cl.get("s");
String watch = (String) cl.get("w");
String topologyName = (String) cl.get("topologyName");

if (null != interval) {
monitor.set_interval(interval);
monitor.setInterval(interval);
}
String component = (String) cl.get("m");
if (null != component) {
monitor.set_component(component);
monitor.setComponent(component);
}
String stream = (String) cl.get("s");
if (null != stream) {
monitor.set_stream(stream);
monitor.setStream(stream);
}
String watch = (String) cl.get("w");
if (null != watch) {
monitor.set_watch(watch);
monitor.setWatch(watch);
}
String topologyName = (String) cl.get("topologyName");
if (null != topologyName) {
monitor.set_topology(topologyName);
monitor.setTopology(topologyName);
}

NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
Expand Down
9 changes: 4 additions & 5 deletions storm-core/src/jvm/org/apache/storm/command/Rebalance.java
Expand Up @@ -13,6 +13,7 @@
package org.apache.storm.command;

import static java.lang.String.format;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.generated.Nimbus;
Expand All @@ -39,20 +40,18 @@ public static void main(String[] args) throws Exception {
final String name = (String) cl.get("topologyName");
final RebalanceOptions rebalanceOptions = new RebalanceOptions();
Integer wait = (Integer) cl.get("w");
Integer numWorkers = (Integer) cl.get("n");
Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e");
Map<String, Map<String, Double>> resourceOverrides = (Map<String, Map<String, Double>>) cl.get("r");

if (null != wait) {
rebalanceOptions.set_wait_secs(wait);
}
Integer numWorkers = (Integer) cl.get("n");
if (null != numWorkers) {
rebalanceOptions.set_num_workers(numWorkers);
}
Map<String, Integer> numExecutors = (Map<String, Integer>) cl.get("e");
if (null != numExecutors) {
rebalanceOptions.set_num_executors(numExecutors);
}

Map<String, Map<String, Double>> resourceOverrides = (Map<String, Map<String, Double>>) cl.get("r");
if (null != resourceOverrides) {
rebalanceOptions.set_topology_resources_overrides(resourceOverrides);
}
Expand Down
Expand Up @@ -12,6 +12,9 @@

package org.apache.storm.command;

import java.util.Arrays;
import java.util.Map;

import org.apache.commons.lang.ArrayUtils;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.NimbusSummary;
Expand All @@ -21,9 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;

public class ShellSubmission {
private static final Logger LOG = LoggerFactory.getLogger(ShellSubmission.class);

Expand Down
Expand Up @@ -12,9 +12,5 @@

package org.apache.storm.planner;


public class CompoundSpout
//implements ISpout
{

public class CompoundSpout {
}
Expand Up @@ -12,9 +12,5 @@

package org.apache.storm.planner;


public class CompoundTask
// implements IBolt
{

public class CompoundTask {
}
76 changes: 40 additions & 36 deletions storm-core/src/jvm/org/apache/storm/utils/Monitor.java
Expand Up @@ -27,11 +27,11 @@ public class Monitor {
private static final String WATCH_TRANSFERRED = "transferred";
private static final String WATCH_EMITTED = "emitted";

private int _interval = 4;
private String _topology;
private String _component;
private String _stream;
private String _watch;
private int interval = 4;
private String topology;
private String component;
private String stream;
private String watch;

private HashSet<String> getComponents(Nimbus.Iface client, String topology) throws Exception {
HashSet<String> components = new HashSet<>();
Expand All @@ -58,17 +58,17 @@ private HashSet<String> getComponents(Nimbus.Iface client, String topology) thro
}

public void metrics(Nimbus.Iface client) throws Exception {
if (_interval <= 0) {
if (interval <= 0) {
throw new IllegalArgumentException("poll interval must be positive");
}

if (_topology == null || _topology.isEmpty()) {
if (topology == null || topology.isEmpty()) {
throw new IllegalArgumentException("topology name must be something");
}

if (_component == null || _component.isEmpty()) {
HashSet<String> components = getComponents(client, _topology);
System.out.println("Available components for " + _topology + " :");
if (component == null || component.isEmpty()) {
HashSet<String> components = getComponents(client, topology);
System.out.println("Available components for " + topology + " :");
System.out.println("------------------");
for (String comp : components) {
System.out.println(comp);
Expand All @@ -78,16 +78,16 @@ public void metrics(Nimbus.Iface client) throws Exception {
return;
}

if (_stream == null || _stream.isEmpty()) {
if (stream == null || stream.isEmpty()) {
throw new IllegalArgumentException("stream name must be something");
}

if (!WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
if (!WATCH_TRANSFERRED.equals(watch) && !WATCH_EMITTED.equals(watch)) {
throw new IllegalArgumentException("watch item must either be transferred or emitted");
}
System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + watch + "\tthroughput (Kt/s)");

long pollMs = _interval * 1000;
long pollMs = interval * 1000;
long now = System.currentTimeMillis();
MetricsState state = new MetricsState(now, 0);
Poller poller = new Poller(now, pollMs);
Expand All @@ -111,29 +111,29 @@ public void metrics(Nimbus.Iface client, long now, MetricsState state) throws Ex
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts : clusterSummary.get_topologies()) {
if (_topology.equals(ts.get_name())) {
if (topology.equals(ts.get_name())) {
topologySummary = ts;
break;
}
}
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + _topology + " not found");
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
GetInfoOptions getInfoOpts = new GetInfoOptions();
getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
for (ExecutorSummary es : info.get_executors()) {
if (_component.equals(es.get_component_id())) {
if (component.equals(es.get_component_id())) {
componentParallelism++;
ExecutorStats stats = es.get_stats();
if (stats != null) {
Map<String, Map<String, Long>> statted =
WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
WATCH_EMITTED.equals(watch) ? stats.get_emitted() : stats.get_transferred();
if (statted != null) {
Map<String, Long> e2 = statted.get(":all-time");
if (e2 != null) {
Long stream = e2.get(_stream);
Long stream = e2.get(this.stream);
if (stream != null) {
streamFound = true;
totalStatted += stream;
Expand All @@ -146,47 +146,51 @@ public void metrics(Nimbus.Iface client, long now, MetricsState state) throws Ex
}

if (componentParallelism <= 0) {
HashSet<String> components = getComponents(client, _topology);
System.out.println("Available components for " + _topology + " :");
HashSet<String> components = getComponents(client, topology);
System.out.println("Available components for " + topology + " :");
System.out.println("------------------");
for (String comp : components) {
System.out.println(comp);
}
System.out.println("------------------");
throw new IllegalArgumentException("component: " + _component + " not found");
throw new IllegalArgumentException("component: " + component + " not found");
}

if (!streamFound) {
throw new IllegalArgumentException("stream: " + _stream + " not found");
throw new IllegalArgumentException("stream: " + stream + " not found");
}
long timeDelta = now - state.getLastTime();
long stattedDelta = totalStatted - state.getLastStatted();
state.setLastTime(now);
state.setLastStatted(totalStatted);
double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta);
System.out.println(
_topology + "\t" + _component + "\t" + componentParallelism + "\t" + _stream + "\t" + timeDelta + "\t" + stattedDelta + "\t" +
throughput);
System.out.println(topology + "\t"
+ component + "\t"
+ componentParallelism + "\t"
+ stream + "\t"
+ timeDelta + "\t"
+ stattedDelta + "\t"
+ throughput);
}

public void set_interval(int _interval) {
this._interval = _interval;
public void setInterval(int interval) {
this.interval = interval;
}

public void set_topology(String _topology) {
this._topology = _topology;
public void setTopology(String topology) {
this.topology = topology;
}

public void set_component(String _component) {
this._component = _component;
public void setComponent(String component) {
this.component = component;
}

public void set_stream(String _stream) {
this._stream = _stream;
public void setStream(String stream) {
this.stream = stream;
}

public void set_watch(String _watch) {
this._watch = _watch;
public void setWatch(String watch) {
this.watch = watch;
}

private static class MetricsState {
Expand Down