Skip to content
Browse files

backported batchsubtopologybuilder

  • Loading branch information...
1 parent b69c964 commit e50dbc4ad4628821482f804c680f0feda5ca2008 @nathanmarz nathanmarz committed Jun 19, 2012
Showing with 431 additions and 0 deletions.
  1. +11 −0 storm-backport/project.clj
  2. +420 −0 storm-backport/src/jvm/storm/backport/BatchSubtopologyBuilder.java
View
11 storm-backport/project.clj
@@ -0,0 +1,11 @@
+(defproject storm/storm-backport "0.7.3"
+ :source-path "src/clj"
+ :java-source-path "src/jvm"
+ :javac-options {:debug "true" :fork "true"}
+ :repositories {"releases" "http://artifactory.local.twitter.com/libs-releases-local"
+ "snapshots" "http://artifactory.local.twitter.com/libs-snapshots-local"
+ "artifactory" "http://artifactory.local.twitter.com/repo"}
+ :dependencies []
+ :dev-dependencies [[storm "0.7.3"]
+ [org.clojure/clojure "1.4.0"]]
+)
View
420 storm-backport/src/jvm/storm/backport/BatchSubtopologyBuilder.java
@@ -0,0 +1,420 @@
+package storm.backport;
+
+import backtype.storm.Constants;
+import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.topology.BaseConfigurationDeclarer;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.InputDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.coordination.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BatchSubtopologyBuilder {
+ Map<String, Component> _bolts = new HashMap<String, Component>();
+ Component _masterBolt;
+ String _masterId;
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
+ Integer p = boltParallelism == null ? null : boltParallelism.intValue();
+ _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
+ _masterId = masterBoltId;
+ }
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
+ this(masterBoltId, masterBolt, null);
+ }
+
+ public BoltDeclarer getMasterDeclarer() {
+ return new BoltDeclarerImpl(_masterBolt);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
+ return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
+ return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
+ }
+
+ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
+ Integer p = null;
+ if(parallelism!=null) p = parallelism.intValue();
+ Component component = new Component(bolt, p);
+ _bolts.put(id, component);
+ return new BoltDeclarerImpl(component);
+ }
+
+ public void extendTopology(TopologyBuilder builder) {
+ BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
+ for(InputDeclaration decl: _masterBolt.declarations) {
+ decl.declare(declarer);
+ }
+ for(Map conf: _masterBolt.componentConfs) {
+ declarer.addConfigurations(conf);
+ }
+ for(String id: _bolts.keySet()) {
+ Component component = _bolts.get(id);
+ Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
+ for(String c: componentBoltSubscriptions(component)) {
+ SourceArgs source;
+ if(c.equals(_masterId)) {
+ source = SourceArgs.single();
+ } else {
+ source = SourceArgs.all();
+ }
+ coordinatedArgs.put(c, source);
+ }
+
+
+ BoltDeclarer input = builder.setBolt(id,
+ new CoordinatedBolt(component.bolt,
+ coordinatedArgs,
+ null),
+ component.parallelism);
+ for(Map conf: component.componentConfs) {
+ input.addConfigurations(conf);
+ }
+ for(String c: componentBoltSubscriptions(component)) {
+ input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
+ }
+ for(InputDeclaration d: component.declarations) {
+ d.declare(input);
+ }
+ }
+ }
+
+ private Set<String> componentBoltSubscriptions(Component component) {
+ Set<String> ret = new HashSet<String>();
+ for(InputDeclaration d: component.declarations) {
+ ret.add(d.getComponent());
+ }
+ return ret;
+ }
+
+ private static class Component {
+ public IRichBolt bolt;
+ public Integer parallelism;
+ public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+ public List<Map> componentConfs = new ArrayList<Map>();
+
+ public Component(IRichBolt bolt, Integer parallelism) {
+ this.bolt = bolt;
+ this.parallelism = parallelism;
+ }
+ }
+
+ private static interface InputDeclaration {
+ void declare(InputDeclarer declarer);
+ String getComponent();
+ }
+
+ private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
+ Component _component;
+
+ public BoltDeclarerImpl(Component component) {
+ _component = component;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, streamId, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, streamId, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.grouping(stream, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return stream.get_componentId();
+ }
+ });
+ return this;
+ }
+
+ private void addDeclaration(InputDeclaration declaration) {
+ _component.declarations.add(declaration);
+ }
+
+ @Override
+ public BoltDeclarer addConfigurations(Map conf) {
+ _component.componentConfs.add(conf);
+ return this;
+ }
+ }
+}

0 comments on commit e50dbc4

Please sign in to comment.
Something went wrong with that request. Please try again.