From 7e81e54e16feba742ec8501d7b9497ce2c2bf68f Mon Sep 17 00:00:00 2001 From: Shyam Rajendran Date: Wed, 10 Jun 2015 09:53:48 -0500 Subject: [PATCH 1/3] Allow ShellBolt,ShellSpout to set env vars (particularly PATH) --- .../jvm/backtype/storm/spout/ShellSpout.java | 26 ++++++++++++------ .../jvm/backtype/storm/task/ShellBolt.java | 10 +++++++ .../backtype/storm/utils/ShellProcess.java | 27 +++++++++++++++---- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index ece11eeca45..1abee523e8b 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -25,29 +25,31 @@ import backtype.storm.multilang.SpoutMsg; import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; -import java.util.Map; +import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import clojure.lang.RT; -import com.google.common.util.concurrent.MoreExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ShellSpout implements ISpout { public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class); private SpoutOutputCollector _collector; private String[] _command; + private Map env = new HashMap(); private ShellProcess _process; - + private TopologyContext _context; - + private SpoutMsg _spoutMsg; private int workerTimeoutMills; @@ -62,6 +64,11 @@ public ShellSpout(String... command) { _command = command; } + public ShellSpout setEnv(Map env) { + this.env = env; + return this; + } + public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; @@ -70,6 +77,9 @@ public void open(Map stormConf, TopologyContext context, workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); + if (!env.isEmpty()) { + _process.setEnv(env); + } Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index eac8a90e3fa..b246784d416 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -74,6 +74,7 @@ public class ShellBolt implements IBolt { Map _inputs = new ConcurrentHashMap(); private String[] _command; + private Map env = new HashMap(); private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; @@ -98,6 +99,12 @@ public ShellBolt(String... command) { _command = command; } + + public ShellBolt setEnv(Map env) { + this.env = env; + return this; + } + public void prepare(Map stormConf, TopologyContext context, final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); @@ -112,6 +119,9 @@ public void prepare(Map stormConf, TopologyContext context, workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); + if (!env.isEmpty()) { + _process.setEnv(env); + } //subprocesses must send their pid first thing Number subpid = _process.launch(stormConf, context); diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java index 82eabf18225..8134be73202 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java @@ -18,16 +18,14 @@ package backtype.storm.utils; import backtype.storm.Config; -import backtype.storm.multilang.ISerializer; -import backtype.storm.multilang.BoltMsg; -import backtype.storm.multilang.NoOutputException; -import backtype.storm.multilang.ShellMsg; -import backtype.storm.multilang.SpoutMsg; +import backtype.storm.multilang.*; import backtype.storm.task.TopologyContext; + import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +38,7 @@ public class ShellProcess implements Serializable { private Process _subprocess; private InputStream processErrorStream; private String[] command; + private Map env = new HashMap(); public ISerializer serializer; public Number pid; public String componentName; @@ -48,8 +47,26 @@ public ShellProcess(String[] command) { this.command = command; } + public void setEnv(Map env) { + this.env = env; + } + + private void modifyEnvironment(Map buildEnv) { + for (Map.Entry entry : env.entrySet()) { + if (entry.getKey().equals("PATH")) { + buildEnv.put("PATH", buildEnv.get("PATH") + ":" + env.get("PATH")); + } else { + buildEnv.put(entry.getKey(), entry.getValue()); + } + } + } + public Number launch(Map conf, TopologyContext context) { ProcessBuilder builder = new ProcessBuilder(command); + if (!env.isEmpty()) { + Map buildEnv = builder.environment(); + modifyEnvironment(buildEnv); + } builder.directory(new File(context.getCodeDir())); ShellLogger = LoggerFactory.getLogger(context.getThisComponentId()); From 87772f2a1fd84a3a561543659544ac6cdb7c4537 Mon Sep 17 00:00:00 2001 From: Shyam Rajendran Date: Tue, 30 Jun 2015 15:44:02 -0500 Subject: [PATCH 2/3] STORM-160 Incorporating the review comments. --- .../jvm/backtype/storm/spout/ShellSpout.java | 17 ++++++++--------- .../src/jvm/backtype/storm/task/ShellBolt.java | 1 - .../jvm/backtype/storm/utils/ShellProcess.java | 12 +++++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index 1abee523e8b..91e2629b0d5 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -25,20 +25,19 @@ import backtype.storm.multilang.SpoutMsg; import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; -import clojure.lang.RT; -import com.google.common.util.concurrent.MoreExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.List; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ShellSpout implements ISpout { public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class); @@ -47,9 +46,9 @@ public class ShellSpout implements ISpout { private String[] _command; private Map env = new HashMap(); private ShellProcess _process; - + private TopologyContext _context; - + private SpoutMsg _spoutMsg; private int workerTimeoutMills; diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index b246784d416..814b3fadfe1 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -99,7 +99,6 @@ public ShellBolt(String... command) { _command = command; } - public ShellBolt setEnv(Map env) { this.env = env; return this; diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java index 8134be73202..300bf575c13 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java @@ -18,14 +18,16 @@ package backtype.storm.utils; import backtype.storm.Config; -import backtype.storm.multilang.*; +import backtype.storm.multilang.ISerializer; +import backtype.storm.multilang.BoltMsg; +import backtype.storm.multilang.NoOutputException; +import backtype.storm.multilang.ShellMsg; +import backtype.storm.multilang.SpoutMsg; import backtype.storm.task.TopologyContext; - import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,8 +55,8 @@ public void setEnv(Map env) { private void modifyEnvironment(Map buildEnv) { for (Map.Entry entry : env.entrySet()) { - if (entry.getKey().equals("PATH")) { - buildEnv.put("PATH", buildEnv.get("PATH") + ":" + env.get("PATH")); + if ("PATH".equals(entry.getKey())) { + buildEnv.put("PATH", buildEnv.get("PATH") + File.pathSeparatorChar + env.get("PATH")); } else { buildEnv.put(entry.getKey(), entry.getValue()); } From 2d27c64c433fc3178651d318dbc409c086f9fd6d Mon Sep 17 00:00:00 2001 From: Shyam Rajendran Date: Fri, 3 Jul 2015 10:44:07 -0500 Subject: [PATCH 3/3] - Fix for imports missing - Tested on local machine ( mac ) with changes to WordCountTopologyNode.java and WordCountTopology.java by setting the environment variable for Spout and Bolt. - Unwanted debug removed --- storm-core/src/jvm/backtype/storm/spout/ShellSpout.java | 1 + storm-core/src/jvm/backtype/storm/utils/ShellProcess.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index 91e2629b0d5..a69b9b2b4da 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -26,6 +26,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; import java.util.Map; +import java.util.HashMap; import java.util.List; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java index 300bf575c13..d1b280023a8 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,6 +64,7 @@ private void modifyEnvironment(Map buildEnv) { } } + public Number launch(Map conf, TopologyContext context) { ProcessBuilder builder = new ProcessBuilder(command); if (!env.isEmpty()) {