Skip to content

Commit

Permalink
Update topologyBuilder to the latest version
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrubacher committed Aug 4, 2012
1 parent d79f349 commit adb01bf
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 47 deletions.
39 changes: 21 additions & 18 deletions storm-ml/src/main/java/com/twitter/MainOnlineTopology.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import backtype.storm.Config; import backtype.storm.Config;
import backtype.storm.LocalCluster; import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC; import backtype.storm.LocalDRPC;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils; import backtype.storm.utils.Utils;


import com.twitter.algorithms.Aggregator;
import com.twitter.storm.primitives.EvaluationBolt; import com.twitter.storm.primitives.EvaluationBolt;
import com.twitter.storm.primitives.LocalLearner; import com.twitter.storm.primitives.LocalLearner;
import com.twitter.storm.primitives.MLTopologyBuilder;
import com.twitter.storm.primitives.TrainingSpout; import com.twitter.storm.primitives.TrainingSpout;


public class MainOnlineTopology { public class MainOnlineTopology {
Expand All @@ -18,25 +16,30 @@ public class MainOnlineTopology {
static Double bias = 1.0; static Double bias = 1.0;


public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();


builder.setSpout("example_spitter", new TrainingSpout()); Config topology_conf = new Config();
builder.setBolt("local_learner", new LocalLearner(2, MEMCACHED_SERVERS), 1).shuffleGrouping("example_spitter"); String topology_name;
builder.setBolt("aggregator", new Aggregator(MEMCACHED_SERVERS)).globalGrouping("local_learner"); if (args == null || args.length == 0)
topology_name = "perceptron";
else
topology_name = args[0];


LinearDRPCTopologyBuilder drpc_builder = new LinearDRPCTopologyBuilder("evaluate"); MLTopologyBuilder ml_topology_builder = new MLTopologyBuilder(topology_name);
drpc_builder.addBolt(new EvaluationBolt(bias, threshold, MEMCACHED_SERVERS), 3);


Config conf = new Config(); ml_topology_builder.setTrainingSpout(new TrainingSpout());
conf.setDebug(true); ml_topology_builder.setTrainingBolt(new LocalLearner(2, MEMCACHED_SERVERS));
LocalCluster cluster = new LocalCluster(); ml_topology_builder.setEvaluationBolt(new EvaluationBolt(1.0, 2.0, MEMCACHED_SERVERS));
cluster.submitTopology("learning", conf, builder.createTopology());
// cluster.submitTopology("evaluation", conf, drpc_builder.createLocalTopology(drpc));


Utils.sleep(10000); if (args == null || args.length == 0) {
cluster.killTopology("learning"); LocalDRPC drpc = new LocalDRPC();
cluster.shutdown(); LocalCluster cluster = new LocalCluster();


cluster.submitTopology(topology_name, topology_conf,
ml_topology_builder.createLocalTopology("evaluate", drpc));

Utils.sleep(10000);
cluster.killTopology("perceptron");
cluster.shutdown();
}
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void execute(Tuple tuple) {
MathUtil.plus(aggregateWeights, weight); MathUtil.plus(aggregateWeights, weight);
} }
totalUpdateWeight += parallelUpdateWeight; totalUpdateWeight += parallelUpdateWeight;
LOG.info("aggregate weights" + aggregateWeights);
MathUtil.times(aggregateWeights, 1.0 / totalUpdateWeight); MathUtil.times(aggregateWeights, 1.0 / totalUpdateWeight);
LOG.info("aggregate weights" + aggregateWeights);
if (aggregateWeights != null) { if (aggregateWeights != null) {
memcache.set("model", 3600 * 24, aggregateWeights); memcache.set("model", 3600 * 24, aggregateWeights);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,64 +4,130 @@
import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults; import backtype.storm.drpc.ReturnResults;
import backtype.storm.generated.StormTopology; import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;


public class MLTopologyBuilder { public class MLTopologyBuilder {


public static final String MEMCACHED_SERVERS = "127.0.0.1:11211"; public static final String MEMCACHED_SERVERS = "127.0.0.1:11211";
private BaseRichBolt trainingBolt;
private BaseRichSpout trainingSpout;


public TopologyBuilder prepareTopology(ILocalDRPC drpc) { String topology_prefix;
return prepareTopology(drpc, 3.0, 0.0, 3.0, MEMCACHED_SERVERS);
TrainingSpout training_spout;
Number training_spout_parallelism;

IBasicBolt basic_training_bolt;
IRichBolt rich_training_bolt;
Number training_bolt_parallelism;

IBasicBolt basic_evaluation_bolt;
IRichBolt rich_evaluation_bolt;
Number evaluation_bolt_parallelism;

public MLTopologyBuilder(String topologyPrefix) {
this.topology_prefix = topologyPrefix;
}

public TopologyBuilder prepareTopology(String drpcFunctionName, ILocalDRPC drpc) {
return prepareTopology(drpcFunctionName, drpc, 1.0, 0.0, 0.5, MEMCACHED_SERVERS);
}

public void setTrainingSpout(TrainingSpout trainingSpout, Number parallelism) {
this.training_spout = trainingSpout;
this.training_spout_parallelism = training_spout_parallelism;
}

public void setTrainingSpout(TrainingSpout trainingSpout) {
setTrainingSpout(trainingSpout, 1);
}

public void setTrainingBolt(IBasicBolt training_bolt, Number parallelism) {
this.basic_training_bolt = training_bolt;
this.rich_training_bolt = null;
this.training_bolt_parallelism = training_bolt_parallelism;
} }


public void setTrainingBolt(BaseRichBolt trainingBolt) { public void setTrainingBolt(IBasicBolt training_bolt) {
this.trainingBolt = trainingBolt; setTrainingBolt(training_bolt, 1);
} }


public void setTrainingSpout(BaseRichSpout trainingSpout) { public void setTrainingBolt(IRichBolt training_bolt, Number parallelism) {
this.trainingSpout = trainingSpout; this.rich_training_bolt = training_bolt;
this.basic_training_bolt = null;
this.training_bolt_parallelism = training_bolt_parallelism;
} }


public TopologyBuilder prepareTopology(ILocalDRPC drpc, double bias, double threshold, double learning_rate, public void setTrainingBolt(IRichBolt training_bolt) {
String memcached_servers) { setTrainingBolt(training_bolt, 1);
}

public void setEvaluationBolt(IBasicBolt evaluation_bolt, Number parallelism) {
this.basic_evaluation_bolt = evaluation_bolt;
this.rich_evaluation_bolt = null;
this.evaluation_bolt_parallelism = evaluation_bolt_parallelism;
}

public void setEvaluationBolt(IBasicBolt evaluation_bolt) {
setEvaluationBolt(evaluation_bolt, 1);
}

public void setEvaluationBolt(IRichBolt evaluation_bolt, Number parallelism) {
this.rich_evaluation_bolt = evaluation_bolt;
this.basic_evaluation_bolt = null;
this.evaluation_bolt_parallelism = evaluation_bolt_parallelism;
}

public void setEvaluationBolt(IRichBolt evaluation_bolt) {
setEvaluationBolt(evaluation_bolt, 1);
}

public TopologyBuilder prepareTopology(String drpcFunctionName, ILocalDRPC drpc, double bias, double threshold,
double learning_rate, String memcached_servers) {
TopologyBuilder topology_builder = new TopologyBuilder(); TopologyBuilder topology_builder = new TopologyBuilder();


// training // training
topology_builder.setSpout("training-spout", new ExampleTrainingSpout()); topology_builder.setSpout(this.topology_prefix + "-training-spout", this.training_spout,
this.training_spout_parallelism);


topology_builder.setBolt("training-bolt", new LocalLearner(bias, threshold, learning_rate, MEMCACHED_SERVERS)) if (this.rich_training_bolt == null) {
.shuffleGrouping("training-spout"); topology_builder.setBolt(this.topology_prefix + "-training-bolt", this.basic_training_bolt,
this.training_bolt_parallelism).shuffleGrouping(this.topology_prefix + "-training-spout");
} else {
topology_builder.setBolt(this.topology_prefix + "-training-bolt", this.rich_training_bolt,
this.training_bolt_parallelism).shuffleGrouping(this.topology_prefix + "-training-spout");
}


// evaluation // evaluation
DRPCSpout drpc_spout; DRPCSpout drpc_spout;

if (drpc != null) if (drpc != null)
drpc_spout = new DRPCSpout("evaluate", drpc); drpc_spout = new DRPCSpout(drpcFunctionName, drpc);
else else
drpc_spout = new DRPCSpout("evaluate"); drpc_spout = new DRPCSpout(drpcFunctionName);

topology_builder.setSpout("drpc-spout", drpc_spout);


topology_builder.setBolt( topology_builder.setSpout(this.topology_prefix + "-drpc-spout", drpc_spout);
"drpc-evaluation",
new EvaluationBolt(PerceptronDRPCTopology.bias, PerceptronDRPCTopology.threshold,
PerceptronDRPCTopology.MEMCACHED_SERVERS)).shuffleGrouping("drpc-spout");


topology_builder.setBolt("drpc-return", new ReturnResults()).shuffleGrouping("drpc-evaluation"); if (this.rich_evaluation_bolt == null) {
topology_builder.setBolt(this.topology_prefix + "-drpc-evaluation", this.basic_evaluation_bolt,
this.evaluation_bolt_parallelism).shuffleGrouping(this.topology_prefix + "-drpc-spout");
} else {
topology_builder.setBolt(this.topology_prefix + "-drpc-evaluation", this.rich_evaluation_bolt,
this.evaluation_bolt_parallelism).shuffleGrouping(this.topology_prefix + "-drpc-spout");
}


topology_builder.setBolt(this.topology_prefix + "-drpc-return", new ReturnResults()).shuffleGrouping(
this.topology_prefix + "-drpc-evaluation");
// return // return
return topology_builder; return topology_builder;


} }


public StormTopology createLocalTopology(ILocalDRPC drpc) { public StormTopology createLocalTopology(String drpcFunctionName, ILocalDRPC drpc) {
return prepareTopology(drpc).createTopology(); return prepareTopology(drpcFunctionName, drpc).createTopology();
} }


public StormTopology createRemoteTopology() { public StormTopology createRemoteTopology(String drpcFunctionName) {
return prepareTopology(null).createTopology(); return prepareTopology(drpcFunctionName, null).createTopology();
} }
} }

0 comments on commit adb01bf

Please sign in to comment.