/
DectationTopology.java
63 lines (49 loc) · 2.52 KB
/
DectationTopology.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package topology;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import resa.topology.ResaTopologyBuilder;
import resa.util.ConfigUtil;
import resa.util.ResaConfig;
import static resa.util.ConfigUtil.getInt;
/**
* Created by ding on 14-7-3.
*/
public class DectationTopology implements Constant {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Enter path to config file!");
System.exit(0);
}
Config conf = ConfigUtil.readConfig(args[0]);
// TopologyBuilder builder = new WritableTopologyBuilder();
TopologyBuilder builder = new ResaTopologyBuilder();
String host = (String) conf.get("redis.host");
int port = ConfigUtil.getInt(conf, "redis.port", 6379);
String queue = (String) conf.get("redis.queue");
builder.setSpout("image-input", new ImageSource(host, port, queue), getInt(conf, "vd.spout.parallelism", 1));
builder.setBolt("feat-ext", new FeatureExtracter(), getInt(conf, "vd.feat-ext.parallelism", 1))
.shuffleGrouping("image-input", STREAM_IMG_OUTPUT)
.setNumTasks(getInt(conf, "vd.feat-ext.tasks", 1));
builder.setBolt("matcher", new Matcher(), getInt(conf, "vd.matcher.parallelism", 1))
.allGrouping("feat-ext", STREAM_FEATURE_DESC)
.setNumTasks(getInt(conf, "vd.matcher.tasks", 1));
builder.setBolt("aggregator", new Aggregater(), getInt(conf, "vd.aggregator.parallelism", 1))
.fieldsGrouping("feat-ext", STREAM_FEATURE_COUNT, new Fields(FIELD_FRAME_ID))
.fieldsGrouping("matcher", STREAM_MATCH_IMAGES, new Fields(FIELD_FRAME_ID))
.setNumTasks(getInt(conf, "vd.aggregator.tasks", 1));
int numWorkers = ConfigUtil.getInt(conf, "vd-worker.count", 1);
conf.setNumWorkers(numWorkers);
conf.setMaxSpoutPending(ConfigUtil.getInt(conf, "vd-MaxSpoutPending", 0));
conf.setStatsSampleRate(1.0);
ResaConfig resaConfig = ResaConfig.create();
resaConfig.putAll(conf);
if (ConfigUtil.getBoolean(conf, "vd.metric.resa", false)) {
resaConfig.addDrsSupport();
resaConfig.put(ResaConfig.REBALANCE_WAITING_SECS, 0);
System.out.println("ResaMetricsCollector is registered");
}
StormSubmitter.submitTopology("resa-vd-JB", resaConfig, builder.createTopology());
}
}