From 2931487f0cbcb67992e67deba40bb5fe68261fd6 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sun, 18 Jan 2015 21:07:07 -0800 Subject: [PATCH 1/2] STORM-469 --- .../src/ui/public/templates/component-page-template.html | 2 +- .../src/ui/public/templates/topology-page-template.html | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/storm-core/src/ui/public/templates/component-page-template.html b/storm-core/src/ui/public/templates/component-page-template.html index 1e916e37458..5e6936850e8 100644 --- a/storm-core/src/ui/public/templates/component-page-template.html +++ b/storm-core/src/ui/public/templates/component-page-template.html @@ -479,7 +479,7 @@

Errors

{{errorHost}} {{errorPort}} - {{error}} + {{error}} {{/componentErrors}} diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html index 0a104467275..4b67a2f5012 100644 --- a/storm-core/src/ui/public/templates/topology-page-template.html +++ b/storm-core/src/ui/public/templates/topology-page-template.html @@ -242,7 +242,7 @@

Spouts ({{windowHint}})

{{errorHost}} {{errorPort}} - {{lastError}} + {{lastError}} {{/spouts}} @@ -331,7 +331,7 @@

Bolts ({{windowHint}})

{{errorHost}} {{errorPort}} - {{lastError}} + {{lastError}} {{/bolts}} From ef19e71ab55e8de03ddc776c93256ffdaff949d6 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sun, 18 Jan 2015 22:30:53 -0800 Subject: [PATCH 2/2] STORM-528 --- .../starter/bolt/RollingCountBoltTest.java | 4 +- .../test/storm/kafka/bolt/KafkaBoltTest.java | 6 +- pom.xml | 6 - storm-core/dependency-reduced-pom.xml | 720 ++++++++++++++++++ storm-core/maven-eclipse.xml | 8 + .../backtype/storm/clojure/ClojureBolt.java | 4 +- .../storm/coordination/BatchBoltExecutor.java | 6 +- .../storm/coordination/CoordinatedBolt.java | 4 +- .../jvm/backtype/storm/drpc/JoinResult.java | 2 +- .../backtype/storm/drpc/KeyedFairBolt.java | 4 +- .../backtype/storm/drpc/ReturnResults.java | 5 +- .../storm/metric/MetricsConsumerBolt.java | 5 +- .../jvm/backtype/storm/metric/SystemBolt.java | 5 +- .../jvm/backtype/storm/spout/ShellSpout.java | 1 + .../src/jvm/backtype/storm/task/IBolt.java | 3 +- .../jvm/backtype/storm/task/ShellBolt.java | 9 +- .../storm/task/WorkerTopologyContext.java | 7 + .../storm/testing/NonRichBoltTracker.java | 4 +- .../storm/testing/PythonShellMetricsBolt.java | 4 +- .../storm/testing/TestAggregatesCounter.java | 2 +- .../testing/TestEventOrderCheckBolt.java | 4 +- .../storm/testing/TestGlobalCount.java | 4 +- .../storm/testing/TestPlannerBolt.java | 2 +- .../storm/testing/TupleCaptureBolt.java | 2 +- .../storm/topology/BasicBoltExecutor.java | 2 +- .../TransactionalSpoutBatchExecutor.java | 5 +- .../trident/topology/TridentBoltExecutor.java | 6 +- 27 files changed, 797 insertions(+), 37 deletions(-) create mode 100644 storm-core/dependency-reduced-pom.xml create mode 100644 storm-core/maven-eclipse.xml diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java index bc31ba06d4d..19100e2e730 100644 --- a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java +++ b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java @@ -53,7 +53,7 @@ public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() Map conf = mock(Map.class); TopologyContext context = mock(TopologyContext.class); OutputCollector collector = mock(OutputCollector.class); - bolt.prepare(conf, context, collector); + bolt.prepare(conf, context, collector,""); // when bolt.execute(tickTuple); @@ -73,7 +73,7 @@ public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceive Map conf = mock(Map.class); TopologyContext context = mock(TopologyContext.class); OutputCollector collector = mock(OutputCollector.class); - bolt.prepare(conf, context, collector); + bolt.prepare(conf, context, collector, ""); // when bolt.execute(normalTuple); diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java index 2a56f84ecef..efb13a45684 100644 --- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java @@ -112,8 +112,9 @@ private KafkaBolt generateStringSerializerBolt() { props.put("metadata.broker.list", broker.getBrokerConnectionString()); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); + String codeDir = ""; config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); - bolt.prepare(config, null, new OutputCollector(collector)); + bolt.prepare(config, null, new OutputCollector(collector), codeDir); return bolt; } @@ -122,8 +123,9 @@ private KafkaBolt generateDefaultSerializerBolt() { Properties props = new Properties(); props.put("metadata.broker.list", broker.getBrokerConnectionString()); props.put("request.required.acks", "1"); + String codeDir = ""; config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); - bolt.prepare(config, null, new OutputCollector(collector)); + bolt.prepare(config, null, new OutputCollector(collector), codeDir); return bolt; } diff --git a/pom.xml b/pom.xml index 05e2be33706..4b45d180299 100644 --- a/pom.xml +++ b/pom.xml @@ -462,12 +462,6 @@ netty ${netty.version} - - org.clojars.runa - conjure - ${conjure.version} - test - org.clojure clojure-contrib diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml new file mode 100644 index 00000000000..e9389a67379 --- /dev/null +++ b/storm-core/dependency-reduced-pom.xml @@ -0,0 +1,720 @@ + + + + storm + org.apache.storm + 0.10.0-SNAPSHOT + + 4.0.0 + org.apache.storm + storm-core + Storm Core + Storm Core Java API and Clojure implementation. + + src/jvm + test/jvm + + + ../conf + + + META-INF + ../ + + NOTICE + + + + + + src/dev + + + test/resources + + + + + com.theoryinpractise + clojure-maven-plugin + true + + + compile-clojure + compile + + compile + + + + test-clojure + test + + test-with-junit + + + ${argLine} ${test.extra.args} + + + + + + src/clj + + + test/clj + + false + true + + none + + + + + maven-surefire-report-plugin + + + ${project.build.directory}/test-reports + + + + + maven-shade-plugin + 2.2 + + + package + + shade + + + + + + org.apache.storm + maven-shade-clojure-transformer + ${project.version} + + + + false + true + true + false + + + org.apache.thrift:* + io.netty:netty + com.google.guava:guava + org.apache.httpcomponents:http* + org.apache.zookeeper:zookeeper + org.apache.curator:* + + + + + org.apache.thrift + org.apache.thrift7 + + + org.jboss.netty + org.apache.storm.netty + + + com.google.common + org.apache.storm.guava + + + com.google.thirdparty + org.apache.storm.guava.thirdparty + + + org.apache.http + org.apache.storm.http + + + org.apache.zookeeper + org.apache.storm.zookeeper + + + org.apache.curator + org.apache.storm.curator + + + + + + + + org.apache.thrift:* + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + + + + io.netty:netty + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + + + + commons-httpclient:commons-httpclient + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + META-INF/README.txt + + + + org.apache.zookeeper:zookeeper + + LICENSE.txt + + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + false + false + true + runtime + + + + + + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + + + coverage + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + src/clj + + + + + + + maven-antrun-plugin + 1.7 + + + pre-test-jacoco-clean + process-test-classes + + run + + + + + + + + + + + org.jacoco + jacoco-maven-plugin + 0.7.2.201409121644 + + + prepare-agent + + prepare-agent + + + true + + backtype/storm/metric/api/IMetricsConsumer$DataPointFieldAccess + backtype/storm/metric/api/IMetricsConsumer$TaskInfoFieldAccess + backtype/storm/testing/TestSerObjectFieldAccess + + + + + report + prepare-package + + report + + + + backtype/storm/generated/* + + + backtype/*/*/*/* + backtype/*/*/* + backtype/*/* + backtype/* + zilch/* + storm/*/*/*/* + storm/*/*/* + storm/*/* + storm/* + + + + + + + + + + native + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + generate-sources + + exec + + + + + sh + + -c + mkdir -p ${project.build.directory}/; cp -rufv ${basedir}/src/native/ ${project.build.directory}/ + + + + + org.codehaus.mojo + make-maven-plugin + + + compile + compile + + autoreconf + configure + make-install + + + + test + test + + test + + + + + ${project.build.directory}/native/worker-launcher + + -i + + + + CFLAGS + -DEXEC_CONF_DIR=${worker-launcher.conf.dir} ${worker-launcher.additional_cflags} + + + ${project.build.directory}/native/worker-launcher + /usr/local + ${project.build.directory}/native/target + + + + + + + + + org.clojure + clojure + 1.5.1 + compile + + + clj-time + clj-time + 0.8.0 + compile + + + joda-time + joda-time + 2.3 + compile + + + compojure + compojure + 1.1.3 + compile + + + org.clojure + core.incubator + 0.1.0 + compile + + + org.clojure + tools.macro + 0.1.0 + compile + + + clout + clout + 1.0.1 + compile + + + ring + ring-core + 1.1.5 + compile + + + commons-fileupload + commons-fileupload + 1.2.1 + compile + + + javax.servlet + servlet-api + 2.5 + compile + + + hiccup + hiccup + 0.3.6 + compile + + + ring + ring-devel + 1.3.0 + compile + + + clj-stacktrace + clj-stacktrace + 0.2.7 + compile + + + ns-tracker + ns-tracker + 0.2.2 + compile + + + org.clojure + tools.namespace + 0.2.4 + compile + + + org.clojure + java.classpath + 0.2.2 + compile + + + ring + ring-jetty-adapter + 1.3.0 + compile + + + ring + ring-servlet + 1.3.0 + compile + + + org.eclipse.jetty + jetty-server + 7.6.13.v20130916 + compile + + + org.eclipse.jetty.orbit + javax.servlet + 2.5.0.v201103041518 + compile + + + org.eclipse.jetty + jetty-http + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-io + 7.6.13.v20130916 + compile + + + ring + ring-anti-forgery + 1.0.0 + compile + + + crypto-random + crypto-random + 1.2.0 + compile + + + crypto-equality + crypto-equality + 1.0.0 + compile + + + org.eclipse.jetty + jetty-servlet + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-security + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-servlets + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-continuation + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-client + 7.6.13.v20130916 + compile + + + org.eclipse.jetty + jetty-util + 7.6.13.v20130916 + compile + + + org.clojure + tools.logging + 0.2.3 + compile + + + org.clojure + math.numeric-tower + 0.0.1 + compile + + + org.clojure + tools.cli + 0.2.4 + compile + + + commons-io + commons-io + 2.4 + compile + + + org.apache.commons + commons-exec + 1.1 + compile + + + commons-lang + commons-lang + 2.5 + compile + + + com.googlecode.json-simple + json-simple + 1.1 + compile + + + com.twitter + carbonite + 1.4.0 + compile + + + com.esotericsoftware.kryo + kryo + 2.21 + compile + + + com.esotericsoftware.reflectasm + reflectasm + 1.07 + shaded + compile + + + org.ow2.asm + asm + 4.0 + compile + + + com.esotericsoftware.minlog + minlog + 1.2 + compile + + + org.objenesis + objenesis + 1.2 + compile + + + com.twitter + chill-java + 0.3.5 + compile + + + org.yaml + snakeyaml + 1.11 + compile + + + commons-logging + commons-logging + 1.1.3 + compile + + + commons-codec + commons-codec + 1.6 + compile + + + com.googlecode.disruptor + disruptor + 2.10.1 + compile + + + org.jgrapht + jgrapht-core + 0.9.0 + compile + + + ch.qos.logback + logback-classic + 1.0.13 + compile + + + ch.qos.logback + logback-core + 1.0.13 + compile + + + org.slf4j + slf4j-api + 1.7.5 + compile + + + org.slf4j + log4j-over-slf4j + 1.6.6 + compile + + + org.apache.hadoop + hadoop-auth + 2.4.0 + compile + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + + + jline + jline + 2.11 + compile + + + + /etc/storm + + + diff --git a/storm-core/maven-eclipse.xml b/storm-core/maven-eclipse.xml new file mode 100644 index 00000000000..ee65ed2b245 --- /dev/null +++ b/storm-core/maven-eclipse.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java b/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java index 5de9bde98e6..4ad8f30fb48 100644 --- a/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java +++ b/storm-core/src/jvm/backtype/storm/clojure/ClojureBolt.java @@ -53,7 +53,7 @@ public ClojureBolt(List fnSpec, List confSpec, List params, Map(); } diff --git a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java index 6f337a63497..f64380a6ffd 100644 --- a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java +++ b/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java @@ -204,14 +204,14 @@ public CoordinatedBolt(IRichBolt delegate, Map sourceArgs, I _idStreamSpec = idStreamSpec; } - public void prepare(Map config, TopologyContext context, OutputCollector collector) { + public void prepare(Map config, TopologyContext context, OutputCollector collector, String codeDir) { TimeCacheMap.ExpiredCallback callback = null; if(_delegate instanceof TimeoutCallback) { callback = new TimeoutItems(); } _tracked = new TimeCacheMap(context.maxTopologyMessageTimeout(), callback); _collector = collector; - _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector))); + _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)), codeDir); for(String component: Utils.get(context.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap()) diff --git a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java b/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java index b74b97ed629..2c39c09b384 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java +++ b/storm-core/src/jvm/backtype/storm/drpc/JoinResult.java @@ -44,7 +44,7 @@ public JoinResult(String returnComponent) { this.returnComponent = returnComponent; } - public void prepare(Map map, TopologyContext context, OutputCollector collector) { + public void prepare(Map map, TopologyContext context, OutputCollector collector, String codeDir) { _collector = collector; } diff --git a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java b/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java index 113163dd360..809092357f7 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java +++ b/storm-core/src/jvm/backtype/storm/drpc/KeyedFairBolt.java @@ -45,11 +45,11 @@ public KeyedFairBolt(IBasicBolt delegate) { } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { if(_delegate instanceof FinishedCallback) { _callback = (FinishedCallback) _delegate; } - _delegate.prepare(stormConf, context, collector); + _delegate.prepare(stormConf, context, collector, codeDir); _rrQueue = new KeyedRoundRobinQueue(); _executor = new Thread(new Runnable() { public void run() { diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java index 3d50679ca7a..4a2cdcbf58c 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java +++ b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.thrift.TException; @@ -49,9 +51,10 @@ public class ReturnResults extends BaseRichBolt { Map _clients = new HashMap(); @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _conf = stormConf; _collector = collector; + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local"); } diff --git a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java index d8eb3bf5ef8..1c21c539d07 100644 --- a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -24,6 +24,8 @@ import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; +import org.apache.commons.lang.StringUtils; + import java.util.Collection; import java.util.Map; @@ -39,13 +41,14 @@ public MetricsConsumerBolt(String consumerClassName, Object registrationArgument } @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { try { _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance(); } catch (Exception e) { throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); } + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector); _collector = collector; } diff --git a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java index 492bc2da757..acb025fb2ab 100644 --- a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java +++ b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java @@ -27,6 +27,7 @@ import clojure.lang.AFn; import clojure.lang.IFn; import clojure.lang.RT; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,7 @@ public Object getValueAndReset() { } @Override - public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { if(_prepareWasCalled && !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) { throw new RuntimeException("A single worker should have 1 SystemBolt instance."); } @@ -100,6 +101,8 @@ public void prepare(final Map stormConf, TopologyContext context, OutputCollecto final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean(); + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } + context.registerMetric("uptimeSecs", new IMetric() { @Override public Object getValueAndReset() { diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index c79d175e6e5..2779f8dbaf6 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.StringUtils; import clojure.lang.RT; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; diff --git a/storm-core/src/jvm/backtype/storm/task/IBolt.java b/storm-core/src/jvm/backtype/storm/task/IBolt.java index 58362c81794..a4e9a0282fc 100644 --- a/storm-core/src/jvm/backtype/storm/task/IBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/IBolt.java @@ -51,8 +51,9 @@ public interface IBolt extends Serializable { * @param stormConf The Storm configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. + * @param codeDir Override codeDir path on context to use ruby, python or js path.Empty if need to use same as context codeDir path. */ - void prepare(Map stormConf, TopologyContext context, OutputCollector collector); + void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir); /** * Process a single tuple of input. The Tuple object contains metadata on it diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 308ec6703e5..709778b6cff 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.StringUtils; import java.util.*; import java.util.concurrent.*; @@ -82,7 +83,7 @@ public class ShellBolt implements IBolt { private Thread _readerThread; private Thread _writerThread; - + private TopologyContext _context; private int workerTimeoutMills; @@ -98,16 +99,18 @@ public ShellBolt(String... command) { _command = command; } + @Override public void prepare(Map stormConf, TopologyContext context, - final OutputCollector collector) { + final OutputCollector collector, String codeDir) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); + this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); } _rand = new Random(); _collector = collector; _context = context; + if(!StringUtils.isEmpty(codeDir)) { _context.setCodeDir(codeDir); } workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); diff --git a/storm-core/src/jvm/backtype/storm/task/WorkerTopologyContext.java b/storm-core/src/jvm/backtype/storm/task/WorkerTopologyContext.java index 9c2f49b4eee..fa25cce4d6a 100644 --- a/storm-core/src/jvm/backtype/storm/task/WorkerTopologyContext.java +++ b/storm-core/src/jvm/backtype/storm/task/WorkerTopologyContext.java @@ -87,6 +87,13 @@ public String getCodeDir() { return _codeDir; } + /** + * Sets the location of the external resources for this worker on the + * local filesystem. These external resources typically include bolts implemented + * in other languages, such as Ruby or Python. + */ + public void setCodeDir(String codeDir) { _codeDir = codeDir; } + /** * If this task spawns any subprocesses, those subprocesses must immediately * write their PID to this directory on the local filesystem to ensure that diff --git a/storm-core/src/jvm/backtype/storm/testing/NonRichBoltTracker.java b/storm-core/src/jvm/backtype/storm/testing/NonRichBoltTracker.java index ccbb67f3e81..79d4a2e8bc4 100644 --- a/storm-core/src/jvm/backtype/storm/testing/NonRichBoltTracker.java +++ b/storm-core/src/jvm/backtype/storm/testing/NonRichBoltTracker.java @@ -35,8 +35,8 @@ public NonRichBoltTracker(IBolt delegate, String id) { _trackId = id; } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - _delegate.prepare(stormConf, context, collector); + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { + _delegate.prepare(stormConf, context, collector, codeDir); } public void execute(Tuple input) { diff --git a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java index 4b85ce8c5bf..4844eba90eb 100644 --- a/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java +++ b/storm-core/src/jvm/backtype/storm/testing/PythonShellMetricsBolt.java @@ -33,8 +33,8 @@ public PythonShellMetricsBolt(String[] command) { super(command); } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - super.prepare(stormConf, context, collector); + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { + super.prepare(stormConf, context, collector, codeDir); CountShellMetric cMetric = new CountShellMetric(); context.registerMetric("my-custom-shell-metric", cMetric, 5); diff --git a/storm-core/src/jvm/backtype/storm/testing/TestAggregatesCounter.java b/storm-core/src/jvm/backtype/storm/testing/TestAggregatesCounter.java index e8c0a61eb93..75757b4517b 100644 --- a/storm-core/src/jvm/backtype/storm/testing/TestAggregatesCounter.java +++ b/storm-core/src/jvm/backtype/storm/testing/TestAggregatesCounter.java @@ -36,7 +36,7 @@ public class TestAggregatesCounter extends BaseRichBolt { Map _counts; OutputCollector _collector; - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _collector = collector; _counts = new HashMap(); } diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java index 1f8036217a4..7d54c11625f 100644 --- a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java +++ b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +42,10 @@ public class TestEventOrderCheckBolt extends BaseRichBolt { OutputCollector _collector; Map recentEventId = new HashMap(); - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _collector = collector; _count = 0; + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } } public void execute(Tuple input) { diff --git a/storm-core/src/jvm/backtype/storm/testing/TestGlobalCount.java b/storm-core/src/jvm/backtype/storm/testing/TestGlobalCount.java index 5ef464a5efe..26d91913d77 100644 --- a/storm-core/src/jvm/backtype/storm/testing/TestGlobalCount.java +++ b/storm-core/src/jvm/backtype/storm/testing/TestGlobalCount.java @@ -25,6 +25,7 @@ import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Values; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +36,10 @@ public class TestGlobalCount extends BaseRichBolt { private int _count; OutputCollector _collector; - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _collector = collector; _count = 0; + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } } public void execute(Tuple input) { diff --git a/storm-core/src/jvm/backtype/storm/testing/TestPlannerBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestPlannerBolt.java index 0d30b265013..74ba6ac3e1c 100644 --- a/storm-core/src/jvm/backtype/storm/testing/TestPlannerBolt.java +++ b/storm-core/src/jvm/backtype/storm/testing/TestPlannerBolt.java @@ -27,7 +27,7 @@ public class TestPlannerBolt extends BaseRichBolt { - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { } diff --git a/storm-core/src/jvm/backtype/storm/testing/TupleCaptureBolt.java b/storm-core/src/jvm/backtype/storm/testing/TupleCaptureBolt.java index e16357654f6..fe2e68fd6dc 100644 --- a/storm-core/src/jvm/backtype/storm/testing/TupleCaptureBolt.java +++ b/storm-core/src/jvm/backtype/storm/testing/TupleCaptureBolt.java @@ -40,7 +40,7 @@ public TupleCaptureBolt() { emitted_tuples.put(_name, new HashMap>()); } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _collector = collector; } diff --git a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java index 6c9cdc18ce0..11e194873e1 100644 --- a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java +++ b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java @@ -39,7 +39,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, String codeDir) { _bolt.prepare(stormConf, context); _collector = new BasicOutputCollector(collector); } diff --git a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java index 53aacae8836..b2695a4b474 100644 --- a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java +++ b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java @@ -27,6 +27,8 @@ import java.math.BigInteger; import java.util.Map; import java.util.TreeMap; + +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +46,9 @@ public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) { } @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + public void prepare(Map conf, TopologyContext context, OutputCollector collector, String codeDir) { _collector = new BatchOutputCollectorImpl(collector); + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } _emitter = _spout.getEmitter(conf, context); } diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index 4dfccc65a1f..1326f795eac 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.Map; import java.util.Set; + +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.ToStringBuilder; import storm.trident.spout.IBatchID; @@ -193,11 +195,13 @@ private void updateTaskCounts(List tasks) { TopologyContext _context; @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + public void prepare(Map conf, TopologyContext context, OutputCollector collector, String codeDir) { _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L; _lastRotate = System.currentTimeMillis(); _batches = new RotatingMap(2); _context = context; + if(!StringUtils.isEmpty(codeDir)) { context.setCodeDir(codeDir); } + _collector = collector; _coordCollector = new CoordinatedOutputCollector(collector); _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector));