From aa4d93c9c3d75b12c00cbd8966f976e5ea107fbe Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 29 Jun 2018 18:06:34 -0500 Subject: [PATCH 1/3] STORM-3134: Improve upload-creds user experience --- .../apache/storm/command/AdminCommands.java | 25 +++++ .../src/jvm/org/apache/storm/command/CLI.java | 105 ++++++++++++++++-- .../storm/command/UploadCredentials.java | 43 ++++++- .../jvm/org/apache/storm/command/TestCLI.java | 44 ++++++++ 4 files changed, 206 insertions(+), 11 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 10eaedc4b96..8fabe6c1e6a 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -30,6 +30,7 @@ import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.generated.Credentials; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.shade.org.apache.zookeeper.ZkCli; import org.apache.storm.utils.ConfigUtils; @@ -80,6 +81,29 @@ public void printCliHelp(String command, PrintStream out) { } } + private static class CredentialsDebug implements AdminCommand { + @Override + public void run(String[] args, Map conf, String command) throws Exception { + // We are pretending to be nimbus here. + IStormClusterState state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + for (String topologyId: args) { + System.out.println(topologyId + ":"); + Credentials creds = state.credentials(topologyId, null); + if (creds != null) { + for (String key : creds.get_creds().keySet()) { + System.out.println("\t" + key); + } + } + } + } + + @Override + public void printCliHelp(String command, PrintStream out) { + out.println(command + " topology_id:"); + out.println("\tPrint the credential keys for a topology."); + } + } + private static class Help implements AdminCommand { @Override @@ -109,6 +133,7 @@ public void printCliHelp(String command, PrintStream out) { static { COMMANDS.put("remove_corrupt_topologies", new RemoveCorruptTopologies()); COMMANDS.put("zk_cli", new ZkCli()); + COMMANDS.put("creds", new CredentialsDebug()); COMMANDS.put("help", new Help()); } diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java index 2c3311ed1f8..4510d29e7d5 100644 --- a/storm-core/src/jvm/org/apache/storm/command/CLI.java +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -109,7 +109,7 @@ public static CLIBuilder opt(String shortName, String longName, Object defaultVa * @param longName the multi character name of the option (no `--` characters proceed it). * @return a builder to be used to continue creating the command line. */ - public CLIBuilder boolOpt(String shortName, String longName) { + public static CLIBuilder boolOpt(String shortName, String longName) { return new CLIBuilder().boolOpt(shortName, longName); } @@ -118,7 +118,7 @@ public CLIBuilder boolOpt(String shortName, String longName) { * @param name the name of the argument. * @return a builder to be used to continue creating the command line. */ - public CLIBuilder arg(String name) { + public static CLIBuilder arg(String name) { return new CLIBuilder().arg(name); } @@ -128,7 +128,7 @@ public CLIBuilder arg(String name) { * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. * @return a builder to be used to continue creating the command line. */ - public CLIBuilder arg(String name, Assoc assoc) { + public static CLIBuilder arg(String name, Assoc assoc) { return new CLIBuilder().arg(name, assoc); } @@ -138,7 +138,7 @@ public CLIBuilder arg(String name, Assoc assoc) { * @param parse an optional function to transform the string to something else. If null a NOOP is used. * @return a builder to be used to continue creating the command line. */ - public CLIBuilder arg(String name, Parse parse) { + public static CLIBuilder arg(String name, Parse parse) { return new CLIBuilder().arg(name, parse); } @@ -149,10 +149,50 @@ public CLIBuilder arg(String name, Parse parse) { * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. * @return a builder to be used to continue creating the command line. */ - public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + public static CLIBuilder arg(String name, Parse parse, Assoc assoc) { return new CLIBuilder().arg(name, parse, assoc); } + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder optionalArg(String name) { + return new CLIBuilder().optionalArg(name); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder optionalArg(String name, Assoc assoc) { + return new CLIBuilder().optionalArg(name, assoc); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder optionalArg(String name, Parse parse) { + return new CLIBuilder().optionalArg(name, parse); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ + public static CLIBuilder optionalArg(String name, Parse parse, Assoc assoc) { + return new CLIBuilder().optionalArg(name, parse, assoc); + } + public interface Parse { /** * Parse a String to the type you want it to be. @@ -213,6 +253,7 @@ public Object process(Object current, String value) { public static class CLIBuilder { private final ArrayList opts = new ArrayList<>(); private final ArrayList args = new ArrayList<>(); + private final ArrayList optionalArgs = new ArrayList<>(); /** * Add an option to be parsed. @@ -299,10 +340,54 @@ public CLIBuilder arg(String name, Parse parse) { * @return a builder to be used to continue creating the command line. */ public CLIBuilder arg(String name, Parse parse, Assoc assoc) { + if (!optionalArgs.isEmpty()) { + throw new IllegalStateException("Cannot have a required argument after adding in an optional argument"); + } args.add(new Arg(name, parse, assoc)); return this; } + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder optionalArg(String name) { + return optionalArg(name, null, null); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder optionalArg(String name, Assoc assoc) { + return optionalArg(name, null, assoc); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder optionalArg(String name, Parse parse) { + return optionalArg(name, parse, null); + } + + /** + * Add a named argument that is optional. + * @param name the name of the argument. + * @param parse an optional function to transform the string to something else. If null a NOOP is used. + * @param assoc an association command to decide what to do if the argument appears multiple times. If null INTO_LIST is used. + * @return a builder to be used to continue creating the command line. + */ + public CLIBuilder optionalArg(String name, Parse parse, Assoc assoc) { + optionalArgs.add(new Arg(name, parse, assoc)); + return this; + } + /** * Parse the command line arguments. * @param rawArgs the string arguments to be parsed. @@ -341,6 +426,8 @@ public Map parse(String... rawArgs) throws Exception { ret.put(opt.shortName, current); } } + List fullArgs = new ArrayList<>(args); + fullArgs.addAll(optionalArgs); List stringArgs = cl.getArgList(); if (args.size() > stringArgs.size()) { throw new RuntimeException("Wrong number of arguments at least " + args.size() @@ -349,10 +436,10 @@ public Map parse(String... rawArgs) throws Exception { int argIndex = 0; int stringArgIndex = 0; - if (args.size() > 0) { - while (argIndex < args.size()) { - Arg arg = args.get(argIndex); - boolean isLastArg = (argIndex == (args.size() - 1)); + if (fullArgs.size() > 0) { + while (argIndex < fullArgs.size()) { + Arg arg = fullArgs.get(argIndex); + boolean isLastArg = (argIndex == (fullArgs.size() - 1)); Object current = null; int maxStringIndex = isLastArg ? stringArgs.size() : (stringArgIndex + 1); for (; stringArgIndex < maxStringIndex; stringArgIndex++) { diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java index 9a71facda86..7eb89f286e7 100644 --- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java +++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java @@ -14,10 +14,19 @@ import java.io.FileReader; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import org.apache.storm.Config; import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +37,7 @@ public class UploadCredentials { public static void main(String[] args) throws Exception { Map cl = CLI.opt("f", "file", null) .arg("topologyName", CLI.FIRST_WINS) - .arg("rawCredentials", CLI.INTO_LIST) + .optionalArg("rawCredentials", CLI.INTO_LIST) .parse(args); String credentialFile = (String) cl.get("f"); @@ -52,7 +61,37 @@ public static void main(String[] args) throws Exception { credentialsMap.put(rawCredentials.get(i), rawCredentials.get(i + 1)); } } - StormSubmitter.pushCredentials(topologyName, new HashMap<>(), credentialsMap); + + Map topologyConf = new HashMap<>(); + //Try to get the topology conf from nimbus, so we can reuse it. + try (NimbusClient nc = NimbusClient.getConfiguredClient(new HashMap<>())) { + Nimbus.Iface client = nc.getClient(); + ClusterSummary summary = client.getClusterInfo(); + for (TopologySummary topo : summary.get_topologies()) { + if (topologyName.equals(topo.get_name())) { + //We found the topology, lets get the conf + String topologyId = topo.get_id(); + topologyConf = (Map) JSONValue.parse(client.getTopologyConf(topologyId)); + LOG.info("Using topology conf from {} as basis for getting new creds", topologyId); + + Map commandLine = Utils.readCommandLineOpts(); + List clCreds = (List)commandLine.get(Config.TOPOLOGY_AUTO_CREDENTIALS); + List topoCreds = (List)topologyConf.get(Config.TOPOLOGY_AUTO_CREDENTIALS); + + if (clCreds != null) { + Set extra = new HashSet<>(clCreds); + if (topoCreds != null) { + extra.removeAll(topoCreds); + } + if (!extra.isEmpty()) { + LOG.warn("The topology {} is not using {} but they were included here.", topologyId, extra); + } + } + break; + } + } + } + StormSubmitter.pushCredentials(topologyName, topologyConf, credentialsMap); LOG.info("Uploaded new creds to topology: {}", topologyName); } } diff --git a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java index fd06cbe5684..09e2db8f8eb 100644 --- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java +++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java @@ -18,6 +18,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class TestCLI { @@ -62,6 +63,49 @@ public void testSimple() throws Exception { assertEquals("value2", f.get("key2")); } + + @Test + public void testOptional() throws Exception { + Map values = CLI.optionalArg("A", CLI.LAST_WINS) + .parse("TEST"); + + assertEquals(1, values.size()); + assertEquals("TEST", values.get("A")); + + values = CLI.optionalArg("A", CLI.LAST_WINS) + .parse(); + + assertEquals(1, values.size()); + assertEquals(null, values.get("A")); + + + values = CLI.optionalArg("A", CLI.LAST_WINS) + .parse("THIS", "IS", "A", "TEST"); + + assertEquals(1, values.size()); + assertEquals("TEST", values.get("A")); + + values = CLI.arg("A", CLI.LAST_WINS) + .optionalArg("B", CLI.LAST_WINS) + .parse("THIS", "IS", "A", "TEST"); + + assertEquals(2, values.size()); + assertEquals("THIS", values.get("A")); + assertEquals("TEST", values.get("B")); + } + + @Test + public void argAfterOptional() throws Exception { + try { + CLI.optionalArg("A", CLI.LAST_WINS) + .arg("B"); + + fail("Expected an exception to be thrown by now"); + } catch (IllegalStateException is) { + //Expected + } + } + private static final class PairParse implements CLI.Parse { @Override From f237f3207afa6761c1948e0b0af33175337a3924 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 5 Jul 2018 11:30:12 -0500 Subject: [PATCH 2/3] STORM-3134: addressed review comments --- .../org/apache/storm/command/UploadCredentials.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java index 7eb89f286e7..101f8f2ca98 100644 --- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java +++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java @@ -86,6 +86,16 @@ public static void main(String[] args) throws Exception { if (!extra.isEmpty()) { LOG.warn("The topology {} is not using {} but they were included here.", topologyId, extra); } + + //Now check for autoCreds that are missing from the command line, but only if the + // command line is used. + if (topoCreds != null) { + Set missing = new HashSet<>(topoCreds); + missing.removeAll(clCreds); + if (!missing.isEmpty()) { + LOG.warn("The topology {} is using {} but they were not included here.", topologyId, missing); + } + } } break; } From c705eea6628504ee6c74cecf0f0e82c3356ef247 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 10 Jul 2018 11:27:53 -0500 Subject: [PATCH 3/3] STORM-3134: Improved Documentation --- bin/storm.py | 34 ++++++++++++++++++++-------- docs/Command-line-client.md | 44 ++++++++++++++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/bin/storm.py b/bin/storm.py index e11a8c06040..93eee5bca59 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -640,15 +640,31 @@ def kill_workers(*args): extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")]) def admin(*args): - """Syntax: [storm admin cmd] - - This is a proxy of nimbus and allow to execute admin commands. As of now it supports - command to remove corrupt topologies. - Nimbus doesn't clean up corrupted topologies automatically. This command should clean - up corrupt topologies i.e.topologies whose codes are not available on blobstore. - In future this command would support more admin commands. - Supported command - storm admin remove_corrupt_topologies + """Syntax: [storm admin cmd [options]] + + The storm admin command provides access to several operations that can help + an administrator debug or fix a cluster. + + remove_corrupt_topologies - This command should be run on a nimbus node as + the same user nimbus runs as. It will go directly to zookeeper + blobstore + and find topologies that appear to be corrupted because of missing blobs. + It will kill those topologies. + + zk_cli [options] - This command will launch a zookeeper cli pointing to the + storm zookeeper instance logged in as the nimbus user. It should be run on + a nimbus server as the user nimbus runs as. + -s --server : Set the connection string to use, + defaults to storm connection string. + -t --time-out : Set the timeout to use, defaults to storm + zookeeper timeout. + -w --write: Allow for writes, defaults to read only, we don't want to + cause problems. + -n --no-root: Don't include the storm root on the default connection string. + -j --jaas : Include a jaas file that should be used when + authenticating with ZK defaults to the + java.security.auth.login.config conf. + + creds topology_id - Print the credential keys for a topology. """ exec_storm_class( "org.apache.storm.command.AdminCommands", diff --git a/docs/Command-line-client.md b/docs/Command-line-client.md index 7bc678ddf8a..ce889e76a41 100644 --- a/docs/Command-line-client.md +++ b/docs/Command-line-client.md @@ -8,6 +8,7 @@ This page describes all the commands that are possible with the "storm" command These commands are: 1. jar +1. local 1. sql 1. kill 1. activate @@ -15,6 +16,7 @@ These commands are: 1. rebalance 1. repl 1. classpath +1. server_classpath 1. localconfvalue 1. remoteconfvalue 1. nimbus @@ -36,6 +38,7 @@ These commands are: 1. shell 1. upload-credentials 1. version +1. admin 1. help ### jar @@ -54,6 +57,12 @@ Complete example of both options is here: `./bin/storm jar example/storm-starter When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology. +### local + +Syntax: `storm jar topology-jar-path class ...` + +The local command acts just like `storm jar` except instead of submitting a topology to a cluster it will run the cluster in local mode. This means an embedded version of the storm daemons will be run within the same process as your topology for 30 seconds before it shuts down automatically. As such the classpath of your topology will be extended to include everything needed to run those daemons. + ### sql Syntax: `storm sql sql-file topology-name` @@ -102,6 +111,12 @@ Syntax: `storm classpath` Prints the classpath used by the storm client when running commands. +### server_classpath + +Syntax: `storm server_classpath` + +Prints the classpath used by the storm daemons. + ### localconfvalue Syntax: `storm localconfvalue conf-name` @@ -147,7 +162,7 @@ as arguments to the function. If no function is given the arguments must be pai *NOTE:* This is not really intended for production use. This is mostly because parsing out the results can be a pain. -Creating an actuall DRPC client only takes a few lines, so for production please go with that. +Creating an actual DRPC client only takes a few lines, so for production please go with that. ```java Config conf = new Config(); @@ -162,14 +177,14 @@ try (DRPCClient drpc = DRPCClient.getConfiguredClient(conf)) { `storm drpc-client exclaim a exclaim b test bar` This will submit 3 separate DRPC request. -1. funciton = "exclaim" args = "a" +1. function = "exclaim" args = "a" 2. function = "exclaim" args = "b" 3. function = "test" args = "bar" `storm drpc-client -f exclaim a b` This will submit 2 separate DRPC request. -1. funciton = "exclaim" args = "a" +1. function = "exclaim" args = "a" 2. function = "exclaim" args = "b" ### blobstore @@ -310,6 +325,29 @@ Syntax: `storm version` Prints the version number of this Storm release. +### admin + +Syntax: `storm admin [options]` + +The storm admin command provides access to several operations that can help an administrator debug or fix a cluster. + +`remove_corrupt_topologies` - This command should be run on a nimbus node as the same user nimbus runs as. It will go directly to zookeeper + blobstore and find topologies that appear to be corrupted because of missing blobs. It will kill those topologies. + + `zk_cli [options]` - This command will launch a zookeeper cli pointing to the storm zookeeper instance logged in as the nimbus user. It should be run on a nimbus server as the user nimbus runs as. + + * `-s --server `: Set the connection string to use, + defaults to storm connection string. + * `-t --time-out `: Set the timeout to use, defaults to storm + zookeeper timeout. + * `-w --write`: Allow for writes, defaults to read only, we don't want to + cause problems. + * `-n --no-root`: Don't include the storm root on the default connection string. + * `-j --jaas `: Include a jaas file that should be used when + authenticating with ZK defaults to the + java.security.auth.login.config conf. + +`creds ` - Print the credential keys for a topology. + ### help Syntax: `storm help [command]`