Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: fbrubacher/storm-contrib
base: 5fb325c805
...
head fork: fbrubacher/storm-contrib
compare: d96dedef22
  • 2 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
View
4 storm-ml/src/main/java/com/twitter/storm/example/MainOnlineTopology.java
@@ -20,7 +20,7 @@
public static void main(String[] args) throws Exception {
MemcachedClient memcache = new MemcachedClient(AddrUtil.getAddresses(MEMCACHED_SERVERS));
- OperationFuture promise = memcache.set("model", 0, "[0.1, 0.1]");
+ OperationFuture promise = memcache.set("model", 0, "[0.1, -0.1]");
promise.get();
Config topology_conf = new Config();
@@ -30,7 +30,7 @@ public static void main(String[] args) throws Exception {
else
topology_name = args[0];
- MLTopologyBuilder ml_topology_builder = new MLTopologyBuilder(topology_name);
+ MLTopologyBuilder ml_topology_builder = new MLTopologyBuilder(topology_name, MEMCACHED_SERVERS);
ml_topology_builder.setTrainingSpout(new ExampleTrainingSpout());
ml_topology_builder.setTrainingBolt(new LocalLearner(2, MEMCACHED_SERVERS));
View
12 storm-ml/src/main/java/com/twitter/storm/primitives/MLTopologyBuilder.java
@@ -10,9 +10,8 @@
public class MLTopologyBuilder {
- public static final String MEMCACHED_SERVERS = "127.0.0.1:11211";
-
String topology_prefix;
+ String memcached_servers;
BaseTrainingSpout training_spout;
Number training_spout_parallelism;
@@ -25,12 +24,13 @@
IRichBolt rich_evaluation_bolt;
Number evaluation_bolt_parallelism;
- public MLTopologyBuilder(String topologyPrefix) {
+ public MLTopologyBuilder(String topologyPrefix, String memcached_servers) {
+ this.memcached_servers = memcached_servers;
this.topology_prefix = topologyPrefix;
}
public TopologyBuilder prepareTopology(String drpcFunctionName, ILocalDRPC drpc) {
- return prepareTopology(drpcFunctionName, drpc, 1.0, 0.0, 0.5, MEMCACHED_SERVERS);
+ return prepareTopology(drpcFunctionName, drpc, 1.0, 0.0, 0.5);
}
public void setTrainingSpout(BaseTrainingSpout exampleTrainingSpout, Number parallelism) {
@@ -83,7 +83,7 @@ public void setEvaluationBolt(IRichBolt evaluation_bolt) {
}
public TopologyBuilder prepareTopology(String drpcFunctionName, ILocalDRPC drpc, double bias, double threshold,
- double learning_rate, String memcached_servers) {
+ double learning_rate) {
TopologyBuilder topology_builder = new TopologyBuilder();
// training
@@ -97,7 +97,7 @@ public TopologyBuilder prepareTopology(String drpcFunctionName, ILocalDRPC drpc,
topology_builder.setBolt(this.topology_prefix + "-training-bolt", this.rich_training_bolt,
this.training_bolt_parallelism).shuffleGrouping(this.topology_prefix + "-training-spout");
}
- topology_builder.setBolt("aggregator", new Aggregator(MEMCACHED_SERVERS)).globalGrouping(
+ topology_builder.setBolt("aggregator", new Aggregator(this.memcached_servers)).globalGrouping(
this.topology_prefix + "-training-bolt");
// evaluation
View
4 storm-ml/src/main/java/com/twitter/storm/primitives/example/ExampleTrainingSpout.java
@@ -10,12 +10,12 @@
public static double get_label(Double x, Double y) {
// arbitrary expected output (for testing purposes)
- return (2 * x + -1 < y) ? 1.0 : -1.0;
+ return (2 * x + -1 > y) ? 1.0 : -1.0;
}
public void nextTuple() {
if (this.samples_count < this.max_samples) {
- Double x = 100 * Math.random();
+ Double x = 10 * Math.random();
Double y = 5.0;
double label = ExampleTrainingSpout.get_label(x, y);

No commit comments for this range

Something went wrong with that request. Please try again.