From b05aeb0eaadde8c919428bb2dbbffaa414b8470d Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 12 Feb 2016 12:38:48 -0600 Subject: [PATCH 1/7] STROM-1263: port backtype.storm.command.kill-topology to java (And add in better java CLI) --- bin/storm.cmd | 14 +- bin/storm.py | 2 +- pom.xml | 6 + storm-core/pom.xml | 9 + .../apache/storm/command/kill_topology.clj | 29 --- .../src/jvm/org/apache/storm/command/CLI.java | 229 ++++++++++++++++++ .../apache/storm/command/KillTopology.java | 51 ++++ .../org/apache/storm/utils/NimbusClient.java | 19 +- 8 files changed, 321 insertions(+), 38 deletions(-) delete mode 100644 storm-core/src/clj/org/apache/storm/command/kill_topology.clj create mode 100644 storm-core/src/jvm/org/apache/storm/command/CLI.java create mode 100644 storm-core/src/jvm/org/apache/storm/command/KillTopology.java diff --git a/bin/storm.cmd b/bin/storm.cmd index 6f4e934425c..8b3fa920a91 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -145,7 +145,7 @@ :drpc set CLASS=org.apache.storm.daemon.drpc - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value drpc.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue drpc.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -160,7 +160,7 @@ goto :eof :kill - set CLASS=org.apache.storm.command.kill_topology + set CLASS=org.apache.storm.command.KillTopology set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof @@ -171,7 +171,7 @@ :logviewer set CLASS=org.apache.storm.daemon.logviewer - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value logviewer.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue logviewer.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -183,7 +183,7 @@ :nimbus set CLASS=org.apache.storm.daemon.nimbus - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value nimbus.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue nimbus.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -199,7 +199,7 @@ goto :eof :remoteconfvalue - set CLASS=org.apache.storm.command.config_value + set CLASS=org.apache.storm.command.ConfigValue set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof @@ -215,7 +215,7 @@ :supervisor set CLASS=org.apache.storm.daemon.supervisor - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value supervisor.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue supervisor.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( @@ -228,7 +228,7 @@ :ui set CLASS=org.apache.storm.ui.core set CLASSPATH=%CLASSPATH%;%STORM_HOME% - "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.config_value ui.childopts > %CMD_TEMP_FILE% + "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" org.apache.storm.command.ConfigValue ui.childopts > %CMD_TEMP_FILE% FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do ( FOR /F "tokens=1,* delims= " %%a in ("%%i") do ( if %%a == VALUE: ( diff --git a/bin/storm.py b/bin/storm.py index f2aca955678..48160cce15d 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -278,7 +278,7 @@ def kill(*args): print_usage(command="kill") sys.exit(2) exec_storm_class( - "org.apache.storm.command.kill_topology", + "org.apache.storm.command.KillTopology", args=args, jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) diff --git a/pom.xml b/pom.xml index 783018f867d..61a1ed9b515 100644 --- a/pom.xml +++ b/pom.xml @@ -199,6 +199,7 @@ 1.1 1.2.1 1.6 + 1.3.1 0.8.0 2.9.0 1.1 @@ -491,6 +492,11 @@ kryo ${kryo.version} + + commons-cli + commons-cli + ${commons-cli.version} + commons-io commons-io diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 247d097f350..624e3408b7b 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -148,6 +148,10 @@ + + commons-cli + commons-cli + commons-io commons-io @@ -505,6 +509,7 @@ org.apache.commons:commons-exec org.apache.commons:commons-compress org.apache.hadoop:hadoop-auth + commons-cli:commons-cli commons-io:commons-io commons-codec:commons-codec commons-fileupload:commons-fileupload @@ -642,6 +647,10 @@ com.metamx.http.client org.apache.storm.shade.com.metamx.http.client + + org.apache.commons.cli + org.apache.storm.shade.org.apache.commons.cli + org.apache.commons.io org.apache.storm.shade.org.apache.commons.io diff --git a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj b/storm-core/src/clj/org/apache/storm/command/kill_topology.clj deleted file mode 100644 index 84e0a64f9ec..00000000000 --- a/storm-core/src/clj/org/apache/storm/command/kill_topology.clj +++ /dev/null @@ -1,29 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.kill-topology - (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm thrift config log]) - (:import [org.apache.storm.generated KillOptions]) - (:gen-class)) - -(defn -main [& args] - (let [[{wait :wait} [name] _] (cli args ["-w" "--wait" :default nil :parse-fn #(Integer/parseInt %)]) - opts (KillOptions.)] - (if wait (.set_wait_secs opts wait)) - (with-configured-nimbus-connection nimbus - (.killTopologyWithOpts nimbus name opts) - (log-message "Killed topology: " name) - ))) diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java new file mode 100644 index 00000000000..9813a3e4ec9 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.List; + +import org.apache.commons.cli.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CLI { + private static final Logger LOG = LoggerFactory.getLogger(CLI.class); + private static class Opt { + final String s; + final String l; + final Object defaultValue; + final Parse parse; + final Assoc assoc; + public Opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + this.s = s; + this.l = l; + this.defaultValue = defaultValue; + this.parse = parse == null ? AS_STRING : parse; + this.assoc = assoc == null ? LAST_WINS : assoc; + } + + public Object process(Object current, String value) { + return assoc.assoc(current, parse.parse(value)); + } + } + + private static class Arg { + final String name; + final Parse parse; + final Assoc assoc; + public Arg(String name, Parse parse, Assoc assoc) { + this.name = name; + this.parse = parse == null ? AS_STRING : parse; + this.assoc = assoc == null ? INTO_LIST : assoc; + } + + public Object process(Object current, String value) { + return assoc.assoc(current, parse.parse(value)); + } + } + + public interface Parse { + /** + * Parse a String to the type you want it to be. + * @param value the String to parse + * @return the parsed value + */ + public Object parse(String value); + } + + public static final Parse AS_INT = new Parse() { + @Override + public Object parse(String value) { + return Integer.valueOf(value); + } + }; + + public static final Parse AS_STRING = new Parse() { + @Override + public Object parse(String value) { + return value; + } + }; + + public interface Assoc { + /** + * Associate a value into somthing else + * @param current what to put value into, will be null if no values have been added yet. + * @param value what to add + * @return the result of combining the two + */ + public Object assoc(Object current, Object value); + } + + public static final Assoc LAST_WINS = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + return value; + } + }; + + public static final Assoc FIRST_WINS = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + return current == null ? value : current; + } + }; + + public static final Assoc INTO_LIST = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + if (current == null) { + current = new ArrayList(); + } + ((List)current).add(value); + return current; + } + }; + + public static class CLIBuilder { + private final ArrayList opts = new ArrayList<>(); + private final ArrayList args = new ArrayList<>(); + + public CLIBuilder opt(String s, String l, Object defaultValue) { + return opt(s, l, defaultValue, null, null); + } + + public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { + return opt(s, l, defaultValue, parse, null); + } + + public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + opts.add(new Opt(s, l, defaultValue, parse, assoc)); + return this; + } + + public CLIBuilder arg(String name) { + return arg(name, null, null); + } + + public CLIBuilder arg(String name, Assoc assoc) { + return arg(name, null, assoc); + } + + public CLIBuilder arg(String name, Parse parse) { + return arg(name, parse, null); + } + + public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + args.add(new Arg(name, parse, assoc)); + return this; + } + + public Map parse(String[] rawArgs) throws Exception { + Options options = new Options(); + for (Opt opt: opts) { + options.addOption(Option.builder(opt.s).longOpt(opt.l).hasArg().build()); + } + DefaultParser parser = new DefaultParser(); + CommandLine cl = parser.parse(options, rawArgs); + HashMap ret = new HashMap<>(); + for (Opt opt: opts) { + Object current = null; + for (String val: cl.getOptionValues(opt.s)) { + current = opt.process(current, val); + } + if (current == null) { + current = opt.defaultValue; + } + ret.put(opt.s, current); + } + List stringArgs = cl.getArgList(); + if (args.size() > stringArgs.size()) { + throw new RuntimeException("Wrong number of arguments at least "+args.size()+" expected, but only " + stringArgs.size() + " found"); + } + + int argIndex = 0; + int stringArgIndex = 0; + if (args.size() > 0) { + while (argIndex < args.size()) { + Arg arg = args.get(argIndex); + boolean isLastArg = (argIndex == (args.size() - 1)); + Object current = null; + int maxStringIndex = isLastArg ? stringArgs.size() : (stringArgIndex + 1); + for (;stringArgIndex < maxStringIndex; stringArgIndex++) { + current = arg.process(current, stringArgs.get(stringArgIndex)); + } + ret.put(arg.name, current); + argIndex++; + } + } else { + ret.put("ARGS", stringArgs); + } + return ret; + } + } + + public static CLIBuilder opt(String s, String l, Object defaultValue) { + return new CLIBuilder().opt(s, l, defaultValue); + } + + public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { + return new CLIBuilder().opt(s, l, defaultValue, parse); + } + + public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { + return new CLIBuilder().opt(s, l, defaultValue, parse, assoc); + } + + public CLIBuilder arg(String name) { + return new CLIBuilder().arg(name); + } + + public CLIBuilder arg(String name, Assoc assoc) { + return new CLIBuilder().arg(name, assoc); + } + + public CLIBuilder arg(String name, Parse parse) { + return new CLIBuilder().arg(name, parse); + } + + public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + return new CLIBuilder().arg(name, parse, assoc); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java new file mode 100644 index 00000000000..8f4d3230423 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import java.util.Map; + +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KillTopology { + private static final Logger LOG = LoggerFactory.getLogger(KillTopology.class); + + public static void main(String [] args) throws Exception { + Map cl = CLI.opt("w", "wait", null, CLI.AS_INT) + .arg("TOPO", CLI.FIRST_WINS) + .parse(args); + final String name = (String)cl.get("TOPO"); + Integer wait = (Integer)cl.get("w"); + + final KillOptions opts = new KillOptions(); + if (wait != null) { + opts.set_wait_secs(wait); + } + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + nimbus.killTopologyWithOpts(name, opts); + LOG.info("Killed topology: {}", name); + } + }); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java index f5bad6e202e..4c76b291483 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java @@ -17,11 +17,11 @@ */ package org.apache.storm.utils; - import org.apache.storm.Config; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.Nimbus; import org.apache.storm.generated.NimbusSummary; +import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftClient; import org.apache.storm.security.auth.ThriftConnectionType; import com.google.common.collect.Lists; @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.security.Principal; import java.util.List; import java.util.Map; @@ -36,6 +37,22 @@ public class NimbusClient extends ThriftClient implements AutoCloseable { private Nimbus.Client _client; private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); + public interface WithNimbus { + public void run(Nimbus.Client client) throws Exception; + } + + public static void withConfiguredClient(WithNimbus cb) throws Exception { + withConfiguredClient(cb, ConfigUtils.readStormConfig()); + } + + public static void withConfiguredClient(WithNimbus cb, Map conf) throws Exception { + ReqContext context = ReqContext.context(); + Principal principal = context.principal(); + String user = principal == null ? null : principal.getName(); + try (NimbusClient client = getConfiguredClientAs(conf, user);) { + cb.run(client.getClient()); + } + } public static NimbusClient getConfiguredClient(Map conf) { return getConfiguredClientAs(conf, null); From 54f17d81368bc3c58c2a886a63f96c5061c49f13 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Sat, 13 Feb 2016 13:34:33 -0600 Subject: [PATCH 2/7] STORM-1260: port backtype.storm.command.activate to java --- bin/storm.cmd | 2 +- bin/storm.py | 2 +- .../clj/org/apache/storm/command/activate.clj | 24 ----------- .../org/apache/storm/command/Activate.java | 40 +++++++++++++++++++ 4 files changed, 42 insertions(+), 26 deletions(-) delete mode 100644 storm-core/src/clj/org/apache/storm/command/activate.clj create mode 100644 storm-core/src/jvm/org/apache/storm/command/Activate.java diff --git a/bin/storm.cmd b/bin/storm.cmd index 8b3fa920a91..b29a6487608 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -125,7 +125,7 @@ :activate - set CLASS=org.apache.storm.command.activate + set CLASS=org.apache.storm.command.Activate set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof diff --git a/bin/storm.py b/bin/storm.py index 48160cce15d..e14990e5e23 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -345,7 +345,7 @@ def activate(*args): print_usage(command="activate") sys.exit(2) exec_storm_class( - "org.apache.storm.command.activate", + "org.apache.storm.command.Activate", args=args, jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) diff --git a/storm-core/src/clj/org/apache/storm/command/activate.clj b/storm-core/src/clj/org/apache/storm/command/activate.clj deleted file mode 100644 index dc452e8bb6a..00000000000 --- a/storm-core/src/clj/org/apache/storm/command/activate.clj +++ /dev/null @@ -1,24 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.activate - (:use [org.apache.storm thrift log]) - (:gen-class)) - -(defn -main [name] - (with-configured-nimbus-connection nimbus - (.activate nimbus name) - (log-message "Activated topology: " name) - )) diff --git a/storm-core/src/jvm/org/apache/storm/command/Activate.java b/storm-core/src/jvm/org/apache/storm/command/Activate.java new file mode 100644 index 00000000000..6a64bf68808 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/Activate.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Activate { + private static final Logger LOG = LoggerFactory.getLogger(Activate.class); + + public static void main(String [] args) throws Exception { + final String name = args[0]; + + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + nimbus.activate(name); + LOG.info("Activated topology: {}", name); + } + }); + } +} From a64daee0e2d77c4553d2a53a027e9f40194f7370 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Sat, 13 Feb 2016 13:46:08 -0600 Subject: [PATCH 3/7] STORM-1261: port backtype.storm.command.deactivate to java --- bin/storm.cmd | 2 +- bin/storm.py | 2 +- .../org/apache/storm/command/deactivate.clj | 24 ----------- .../org/apache/storm/command/Deactivate.java | 40 +++++++++++++++++++ 4 files changed, 42 insertions(+), 26 deletions(-) delete mode 100644 storm-core/src/clj/org/apache/storm/command/deactivate.clj create mode 100644 storm-core/src/jvm/org/apache/storm/command/Deactivate.java diff --git a/bin/storm.cmd b/bin/storm.cmd index b29a6487608..367574caefc 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -134,7 +134,7 @@ goto :eof :deactivate - set CLASS=org.apache.storm.command.deactivate + set CLASS=org.apache.storm.command.Deactivate set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof diff --git a/bin/storm.py b/bin/storm.py index e14990e5e23..cc8fe8f70e0 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -403,7 +403,7 @@ def deactivate(*args): print_usage(command="deactivate") sys.exit(2) exec_storm_class( - "org.apache.storm.command.deactivate", + "org.apache.storm.command.Deactivate", args=args, jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) diff --git a/storm-core/src/clj/org/apache/storm/command/deactivate.clj b/storm-core/src/clj/org/apache/storm/command/deactivate.clj deleted file mode 100644 index 4fd2c8581f4..00000000000 --- a/storm-core/src/clj/org/apache/storm/command/deactivate.clj +++ /dev/null @@ -1,24 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.deactivate - (:use [org.apache.storm thrift log]) - (:gen-class)) - -(defn -main [name] - (with-configured-nimbus-connection nimbus - (.deactivate nimbus name) - (log-message "Deactivated topology: " name) - )) diff --git a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java new file mode 100644 index 00000000000..6b9dd118e19 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Deactivate { + private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class); + + public static void main(String [] args) throws Exception { + final String name = args[0]; + + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + nimbus.deactivate(name); + LOG.info("Deactivated topology: {}", name); + } + }); + } +} From 7d7f5b6e3519ed66a796087b3cd879261d63880c Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Sat, 13 Feb 2016 13:56:09 -0600 Subject: [PATCH 4/7] Rework --- storm-core/src/jvm/org/apache/storm/command/CLI.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java index 9813a3e4ec9..f360d2f747c 100644 --- a/storm-core/src/jvm/org/apache/storm/command/CLI.java +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -22,7 +22,10 @@ import java.util.Map; import java.util.List; -import org.apache.commons.cli.*; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 415310a9a13306cd7f110712cee5f19d840d8f6a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Sat, 13 Feb 2016 14:33:03 -0600 Subject: [PATCH 5/7] STORM-1264: port backtype.storm.command.list to java --- bin/storm.cmd | 2 +- bin/storm.py | 2 +- .../src/clj/org/apache/storm/command/list.clj | 38 -------------- .../jvm/org/apache/storm/command/List.java | 50 +++++++++++++++++++ 4 files changed, 52 insertions(+), 40 deletions(-) delete mode 100644 storm-core/src/clj/org/apache/storm/command/list.clj create mode 100644 storm-core/src/jvm/org/apache/storm/command/List.java diff --git a/bin/storm.cmd b/bin/storm.cmd index 367574caefc..c8953bf82d3 100644 --- a/bin/storm.cmd +++ b/bin/storm.cmd @@ -165,7 +165,7 @@ goto :eof :list - set CLASS=org.apache.storm.command.list + set CLASS=org.apache.storm.command.List set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% goto :eof diff --git a/bin/storm.py b/bin/storm.py index cc8fe8f70e0..a491b63aaaa 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -389,7 +389,7 @@ def listtopos(*args): List the running topologies and their statuses. """ exec_storm_class( - "org.apache.storm.command.list", + "org.apache.storm.command.List", args=args, jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) diff --git a/storm-core/src/clj/org/apache/storm/command/list.clj b/storm-core/src/clj/org/apache/storm/command/list.clj deleted file mode 100644 index 87975cd4cd6..00000000000 --- a/storm-core/src/clj/org/apache/storm/command/list.clj +++ /dev/null @@ -1,38 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.list - (:use [org.apache.storm thrift log]) - (:import [org.apache.storm.generated TopologySummary]) - (:gen-class)) - -(defn -main [] - (with-configured-nimbus-connection nimbus - (let [cluster-info (.getClusterInfo nimbus) - topologies (.get_topologies cluster-info) - msg-format "%-20s %-10s %-10s %-12s %-10s"] - (if (or (nil? topologies) (empty? topologies)) - (println "No topologies running.") - (do - (println (format msg-format "Topology_name" "Status" "Num_tasks" "Num_workers" "Uptime_secs")) - (println "-------------------------------------------------------------------") - (doseq [^TopologySummary topology topologies] - (let [topology-name (.get_name topology) - topology-status (.get_status topology) - topology-num-tasks (.get_num_tasks topology) - topology-num-workers (.get_num_workers topology) - topology-uptime-secs (.get_uptime_secs topology)] - (println (format msg-format topology-name topology-status topology-num-tasks - topology-num-workers topology-uptime-secs))))))))) diff --git a/storm-core/src/jvm/org/apache/storm/command/List.java b/storm-core/src/jvm/org/apache/storm/command/List.java new file mode 100644 index 00000000000..7df07117ec6 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/List.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.utils.NimbusClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class List { + private static final Logger LOG = LoggerFactory.getLogger(List.class); + private static final String MSG_FORMAT = "%-20s %-10s %-10s %-12s %-10s\n"; + + public static void main(String [] args) throws Exception { + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + java.util.List topologies = nimbus.getClusterInfo().get_topologies(); + if (topologies == null || topologies.isEmpty()) { + System.out.println("No topologies running."); + } else { + System.out.printf(MSG_FORMAT, "Topology_name", "Status", "Num_tasks", "Num_workers", "Uptime_secs"); + System.out.println("-------------------------------------------------------------------"); + for (TopologySummary topology: topologies) { + System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(), + topology.get_num_tasks(), topology.get_num_workers(), + topology.get_uptime_secs()); + } + } + } + }); + } +} From 0867b8017678f55fd24fee80408d7c7041e953e8 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Sat, 13 Feb 2016 15:11:48 -0600 Subject: [PATCH 6/7] Added in unit test for CLI --- .../src/jvm/org/apache/storm/command/CLI.java | 2 +- .../jvm/org/apache/storm/command/TestCLI.java | 59 +++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 storm-core/test/jvm/org/apache/storm/command/TestCLI.java diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java index f360d2f747c..e7d0ecea12b 100644 --- a/storm-core/src/jvm/org/apache/storm/command/CLI.java +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -158,7 +158,7 @@ public CLIBuilder arg(String name, Parse parse, Assoc assoc) { return this; } - public Map parse(String[] rawArgs) throws Exception { + public Map parse(String ... rawArgs) throws Exception { Options options = new Options(); for (Opt opt: opts) { options.addOption(Option.builder(opt.s).longOpt(opt.l).hasArg().build()); diff --git a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java new file mode 100644 index 00000000000..b64745845a0 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.command; + +import java.util.Map; +import java.util.List; +import java.util.Arrays; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestCLI { + @Test + public void testSimple() throws Exception { + Map values = CLI.opt("a", "aa", null) + .opt("b", "bb", 1, CLI.AS_INT) + .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS) + .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST) + .arg("A") + .arg("B", CLI.AS_INT) + .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"); + assertEquals(6, values.size()); + assertEquals("200", (String)values.get("a")); + assertEquals((Integer)40, (Integer)values.get("b")); + assertEquals((Integer)2, (Integer)values.get("c")); + + List d = (List)values.get("d"); + assertEquals(3, d.size()); + assertEquals("1", d.get(0)); + assertEquals("2", d.get(1)); + assertEquals("3", d.get(2)); + + List A = (List)values.get("A"); + assertEquals(1, A.size()); + assertEquals("A-VALUE", A.get(0)); + + List B = (List)values.get("B"); + assertEquals(3, B.size()); + assertEquals((Integer)1, B.get(0)); + assertEquals((Integer)2, B.get(1)); + assertEquals((Integer)3, B.get(2)); + } +} From aca76e0db96bd34b9f9d52ccb71fd89a27e97b70 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 15 Feb 2016 14:01:25 -0600 Subject: [PATCH 7/7] Added javadocs and made option names more readable. --- .../src/jvm/org/apache/storm/command/CLI.java | 171 +++++++++++++++--- 1 file changed, 146 insertions(+), 25 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java index e7d0ecea12b..d4eaa5d4f17 100644 --- a/storm-core/src/jvm/org/apache/storm/command/CLI.java +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -33,14 +33,14 @@ public class CLI { private static final Logger LOG = LoggerFactory.getLogger(CLI.class); private static class Opt { - final String s; - final String l; + final String shortName; + final String longName; final Object defaultValue; final Parse parse; final Assoc assoc; - public Opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { - this.s = s; - this.l = l; + public Opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) { + this.shortName = shortName; + this.longName = longName; this.defaultValue = defaultValue; this.parse = parse == null ? AS_STRING : parse; this.assoc = assoc == null ? LAST_WINS : assoc; @@ -75,6 +75,9 @@ public interface Parse { public Object parse(String value); } + /** + * Parse function to return an Integer + */ public static final Parse AS_INT = new Parse() { @Override public Object parse(String value) { @@ -82,6 +85,9 @@ public Object parse(String value) { } }; + /** + * Noop parse function, returns the String. + */ public static final Parse AS_STRING = new Parse() { @Override public Object parse(String value) { @@ -99,6 +105,9 @@ public interface Assoc { public Object assoc(Object current, Object value); } + /** + * Last occurance on the command line is the resulting value. + */ public static final Assoc LAST_WINS = new Assoc() { @Override public Object assoc(Object current, Object value) { @@ -106,6 +115,9 @@ public Object assoc(Object current, Object value) { } }; + /** + * First occurance on the command line is the resulting value. + */ public static final Assoc FIRST_WINS = new Assoc() { @Override public Object assoc(Object current, Object value) { @@ -113,6 +125,9 @@ public Object assoc(Object current, Object value) { } }; + /** + * All values are returned as a List. + */ public static final Assoc INTO_LIST = new Assoc() { @Override public Object assoc(Object current, Object value) { @@ -128,57 +143,115 @@ public static class CLIBuilder { private final ArrayList opts = new ArrayList<>(); private final ArrayList args = new ArrayList<>(); - public CLIBuilder opt(String s, String l, Object defaultValue) { - return opt(s, l, defaultValue, null, null); + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder opt(String shortName, String longName, Object defaultValue) { + return opt(shortName, longName, defaultValue, null, null); } - - public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { - return opt(s, l, defaultValue, parse, null); + + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse) { + return opt(shortName, longName, defaultValue, parse, null); } - public CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { - opts.add(new Opt(s, l, defaultValue, parse, assoc)); + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the option appears multiple times. If null LAST_WINS is used. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) { + opts.add(new Opt(shortName, longName, defaultValue, parse, assoc)); return this; } + /** + * Add a named argument. + * @param name the name of the argument. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name) { return arg(name, null, null); } + /** + * Add a named argument. + * @param name the name of the argument. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Assoc assoc) { return arg(name, null, assoc); } - + + /** + * Add a named argument. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Parse parse) { return arg(name, parse, null); } + /** + * Add a named argument. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Parse parse, Assoc assoc) { args.add(new Arg(name, parse, assoc)); return this; } + /** + * Parse the command line arguments. + * @param rawArgs the string arguments to be parsed. + * @throws Exception on any error. + * @return The parsed command line. + * opts will be stored under the short argument name. + * args will be stored under the argument name, unless no arguments are configured, and then they will be stored under "ARGS". + * The last argument comnfigured is greedy and is used to process all remaining command line arguments. + */ public Map parse(String ... rawArgs) throws Exception { Options options = new Options(); for (Opt opt: opts) { - options.addOption(Option.builder(opt.s).longOpt(opt.l).hasArg().build()); + options.addOption(Option.builder(opt.shortName).longOpt(opt.longName).hasArg().build()); } DefaultParser parser = new DefaultParser(); CommandLine cl = parser.parse(options, rawArgs); HashMap ret = new HashMap<>(); for (Opt opt: opts) { Object current = null; - for (String val: cl.getOptionValues(opt.s)) { + for (String val: cl.getOptionValues(opt.shortName)) { current = opt.process(current, val); } if (current == null) { current = opt.defaultValue; } - ret.put(opt.s, current); + ret.put(opt.shortName, current); } List stringArgs = cl.getArgList(); if (args.size() > stringArgs.size()) { - throw new RuntimeException("Wrong number of arguments at least "+args.size()+" expected, but only " + stringArgs.size() + " found"); + throw new RuntimeException("Wrong number of arguments at least " + args.size() + + " expected, but only " + stringArgs.size() + " found"); } int argIndex = 0; @@ -202,30 +275,78 @@ public Map parse(String ... rawArgs) throws Exception { } } - public static CLIBuilder opt(String s, String l, Object defaultValue) { - return new CLIBuilder().opt(s, l, defaultValue); + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder opt(String shortName, String longName, Object defaultValue) { + return new CLIBuilder().opt(shortName, longName, defaultValue); } - - public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse) { - return new CLIBuilder().opt(s, l, defaultValue, parse); + + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse) { + return new CLIBuilder().opt(shortName, longName, defaultValue, parse); } - public static CLIBuilder opt(String s, String l, Object defaultValue, Parse parse, Assoc assoc) { - return new CLIBuilder().opt(s, l, defaultValue, parse, assoc); + /** + * Add an option to be parsed + * @param shortName the short single character name of the option (no `-` character proceeds it). + * @param longName the multi character name of the option (no `--` characters proceed it). + * @param defaultValue the value that will be returned of the command if none is given. null if none is given. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the option appears multiple times. If null LAST_WINS is used. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder opt(String shortName, String longName, Object defaultValue, Parse parse, Assoc assoc) { + return new CLIBuilder().opt(shortName, longName, defaultValue, parse, assoc); } + /** + * Add a named argument. + * @param name the name of the argument. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name) { return new CLIBuilder().arg(name); } - + + /** + * Add a named argument. + * @param name the name of the argument. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Assoc assoc) { return new CLIBuilder().arg(name, assoc); } + /** + * Add a named argument. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Parse parse) { return new CLIBuilder().arg(name, parse); } + /** + * Add a named argument. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ public CLIBuilder arg(String name, Parse parse, Assoc assoc) { return new CLIBuilder().arg(name, parse, assoc); }